diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index cd48f21..1497425 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -71,6 +71,7 @@ public class Application { System.out.println("c) contacts"); System.out.println("s) subscriptions"); System.out.println("m) messages"); + System.out.println("?) info"); System.out.println("e) exit"); command = nextCommand(); @@ -89,6 +90,9 @@ public class Application { case "m": messages(); break; + case "?": + info(); + break; case "e": break; default: @@ -102,6 +106,11 @@ public class Application { ctx.shutdown(); } + private void info() { + System.out.println(); + System.out.println(ctx.status()); + } + private String nextCommand() { return scanner.nextLine().trim().toLowerCase(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index 54efdf8..bcfb463 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -25,6 +25,7 @@ import ch.dissem.bitmessage.entity.valueobject.PrivateKey; import ch.dissem.bitmessage.exception.DecryptionFailedException; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.*; +import ch.dissem.bitmessage.utils.Property; import ch.dissem.bitmessage.utils.Security; import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; @@ -245,6 +246,12 @@ public class BitmessageContext { } } + public Property status() { + return new Property("status", null, + ctx.getNetworkHandler().getNetworkStatus() + ); + } + public interface Listener { void receive(Plaintext plaintext); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java index 562f6e5..8412c84 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java @@ -121,6 +121,10 @@ public class NetworkAddress implements Streamable { Encode.int16(port, stream); } + public void setTime(long time) { + this.time = time; + } + public static final class Builder { private long time; private long stream; diff --git a/domain/src/main/java/ch/dissem/bitmessage/exception/NodeException.java b/domain/src/main/java/ch/dissem/bitmessage/exception/NodeException.java new file mode 100644 index 0000000..9ab2b7f --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/exception/NodeException.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.exception; + +/** + * An exception on the node that's severe enough to cause the client to disconnect this node. + * + * @author Ch. Basler + */ +public class NodeException extends RuntimeException { + public NodeException(String message) { + super(message); + } + + public NodeException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java index 85878a9..e4cbf80 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java @@ -22,11 +22,13 @@ import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.valueobject.PrivateKey; +import ch.dissem.bitmessage.exception.NodeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.net.SocketException; import java.net.SocketTimeoutException; /** @@ -38,8 +40,10 @@ public class Factory { public static NetworkMessage getNetworkMessage(int version, InputStream stream) throws SocketTimeoutException { try { return V3MessageFactory.read(stream); - } catch (SocketTimeoutException e) { + } catch (SocketTimeoutException | NodeException e) { throw e; + } catch (SocketException e) { + throw new NodeException(e.getMessage(), e); } catch (Exception e) { LOG.error(e.getMessage(), e); return null; diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java index 2ba0f9d..57d240d 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java @@ -21,6 +21,7 @@ import ch.dissem.bitmessage.entity.payload.GenericPayload; import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.utils.AccessCounter; import ch.dissem.bitmessage.utils.Decode; import ch.dissem.bitmessage.utils.Security; @@ -211,6 +212,6 @@ class V3MessageFactory { } pos++; } - throw new IOException("Failed to fine MAGIC bytes in stream"); + throw new NodeException("Failed to find MAGIC bytes in stream"); } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index 1d37df9..fc7ee52 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -20,8 +20,10 @@ import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.utils.Property; import java.io.IOException; +import java.util.Map; /** * Handles incoming messages @@ -33,6 +35,8 @@ public interface NetworkHandler { void offer(InventoryVector iv); + Property getNetworkStatus(); + interface MessageListener { void receive(ObjectMessage object) throws IOException; } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/DebugUtils.java b/domain/src/main/java/ch/dissem/bitmessage/utils/DebugUtils.java index e651980..e2090b9 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/DebugUtils.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/DebugUtils.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Map; public class DebugUtils { private final static Logger LOG = LoggerFactory.getLogger(DebugUtils.class); @@ -36,4 +37,12 @@ public class DebugUtils { LOG.debug(e.getMessage(), e); } } + + public static void inc(Map map, K key) { + if (map.containsKey(key)) { + map.put(key, map.get(key) + 1); + } else { + map.put(key, 1); + } + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java new file mode 100644 index 0000000..1030513 --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java @@ -0,0 +1,53 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +/** + * Created by chris on 14.06.15. + */ +public class Property { + private String name; + private Object value; + private Property[] properties; + + public Property(String name, Object value, Property... properties) { + this.name = name; + this.value = value; + this.properties = properties; + } + + @Override + public String toString() { + return toString(""); + } + + private String toString(String indentation) { + StringBuilder result = new StringBuilder(); + result.append(indentation).append(name).append(": "); + if (value != null || properties.length == 0) { + result.append(value); + } + if (properties.length > 0) { + result.append("{\n"); + for (Property property : properties) { + result.append(property.toString(indentation + " ")).append('\n'); + } + result.append(indentation).append("}"); + } + return result.toString(); + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java b/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java index cb21701..f5d8052 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java @@ -20,10 +20,14 @@ package ch.dissem.bitmessage.utils; * A simple utility class that simplifies using the second based time used in Bitmessage. */ public class UnixTime { + /** + * Length of an hour in seconds, intended for use with {@link #now(long)}. + */ + public static final long HOUR = 60 * 60; /** * Length of a day in seconds, intended for use with {@link #now(long)}. */ - public static final long DAY = 60 * 60 * 24; + public static final long DAY = 24 * HOUR; /** * Returns the time in second based Unix time ({@link System#currentTimeMillis()}/1000) diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index fcf7eee..1b9345b 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -22,23 +22,28 @@ import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException; +import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener; import ch.dissem.bitmessage.utils.DebugUtils; import ch.dissem.bitmessage.utils.Security; +import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; +import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.State.*; /** @@ -46,9 +51,11 @@ import static ch.dissem.bitmessage.networking.Connection.State.*; */ public class Connection implements Runnable { private final static Logger LOG = LoggerFactory.getLogger(Connection.class); + private static final int CONNECT_TIMEOUT = 10000; private InternalContext ctx; + private Mode mode; private State state; private Socket socket; private InputStream in; @@ -63,17 +70,30 @@ public class Connection implements Runnable { private Queue sendingQueue = new ConcurrentLinkedDeque<>(); - public Connection(InternalContext context, State state, Socket socket, MessageListener listener) throws IOException { + public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener) throws IOException { this.ctx = context; - this.state = state; + this.mode = mode; + this.state = CONNECTING; this.socket = socket; - this.in = socket.getInputStream(); - this.out = socket.getOutputStream(); this.listener = listener; this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(); } + public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener) { + this.ctx = context; + this.mode = mode; + this.state = CONNECTING; + this.socket = new Socket(); + this.node = node; + this.listener = listener; + this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); + } + + public Mode getMode() { + return mode; + } + public State getState() { return state; } @@ -84,58 +104,86 @@ public class Connection implements Runnable { @Override public void run() { - if (state == CLIENT) { - send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); - } - while (state != DISCONNECTED) { - try { - NetworkMessage msg = Factory.getNetworkMessage(version, in); - if (msg == null) - continue; - switch (state) { - case ACTIVE: - receiveMessage(msg.getPayload()); - sendQueue(); - break; + try (Socket socket = this.socket) { + if (!socket.isConnected()) { + socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); + } + this.in = socket.getInputStream(); + this.out = socket.getOutputStream(); + if (mode == CLIENT) { + send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + } + while (state != DISCONNECTED) { + try { + NetworkMessage msg = Factory.getNetworkMessage(version, in); + if (msg == null) + continue; + switch (state) { + case ACTIVE: + receiveMessage(msg.getPayload()); + sendQueue(); + break; - default: - switch (msg.getPayload().getCommand()) { - case VERSION: - Version payload = (Version) msg.getPayload(); - if (payload.getNonce() == ctx.getClientNonce()) { - LOG.info("Tried to connect to self, disconnecting."); - disconnect(); - } else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) { - this.version = payload.getVersion(); - this.streams = payload.getStreams(); - send(new VerAck()); - state = ACTIVE; - sendAddresses(); - sendInventory(); - } else { - LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting."); - disconnect(); - } - break; - case VERACK: - if (state == SERVER) { - send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); - } - break; - default: - throw new RuntimeException("Command 'version' or 'verack' expected, but was '" - + msg.getPayload().getCommand() + "'"); - } - } - if (socket.isClosed()) state = DISCONNECTED; - } catch (SocketTimeoutException ignore) { - if (state == ACTIVE) { - sendQueue(); + default: + switch (msg.getPayload().getCommand()) { + case VERSION: + Version payload = (Version) msg.getPayload(); + if (payload.getNonce() == ctx.getClientNonce()) { + LOG.info("Tried to connect to self, disconnecting."); + disconnect(); + } else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) { + this.version = payload.getVersion(); + this.streams = payload.getStreams(); + send(new VerAck()); + switch (mode) { + case SERVER: + send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + break; + case CLIENT: + activateConnection(); + break; + } + } else { + LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting."); + disconnect(); + } + break; + case VERACK: + switch (mode) { + case SERVER: + activateConnection(); + break; + case CLIENT: + // NO OP + break; + } + break; + default: + throw new RuntimeException("Command 'version' or 'verack' expected, but was '" + + msg.getPayload().getCommand() + "'"); + } + } + if (socket.isClosed()) state = DISCONNECTED; + } catch (SocketTimeoutException ignore) { + if (state == ACTIVE) { + sendQueue(); + } } } + } catch (IOException | NodeException e) { + LOG.debug("disconnection from node " + node + ": " + e.getMessage(), e); + disconnect(); } } + private void activateConnection() { + state = ACTIVE; + sendAddresses(); + sendInventory(); + node.setTime(UnixTime.now()); + ctx.getNodeRegistry().offerAddresses(Arrays.asList(node)); + } + private void sendQueue() { LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node); for (MessagePayload msg = sendingQueue.poll(); msg != null; msg = sendingQueue.poll()) { @@ -166,6 +214,8 @@ public class Connection implements Runnable { Security.checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); listener.receive(objectMessage); ctx.getInventory().storeObject(objectMessage); + // offer object to some random nodes so it gets distributed throughout the network: + ctx.getNetworkHandler().offer(objectMessage.getInventoryVector()); } catch (InsufficientProofOfWorkException e) { LOG.warn(e.getMessage()); } catch (IOException e) { @@ -199,12 +249,7 @@ public class Connection implements Runnable { } public void disconnect() { - try { - state = DISCONNECTED; - socket.close(); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); - } + state = DISCONNECTED; } private void send(MessagePayload payload) { @@ -236,5 +281,7 @@ public class Connection implements Runnable { return Objects.hash(node); } - public enum State {SERVER, CLIENT, ACTIVE, DISCONNECTED} + public enum Mode {SERVER, CLIENT} + + public enum State {CONNECTING, ACTIVE, DISCONNECTED} } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java index 93fc12f..84b010e 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java @@ -22,19 +22,23 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.utils.Collections; +import ch.dissem.bitmessage.utils.DebugUtils; +import ch.dissem.bitmessage.utils.Property; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static ch.dissem.bitmessage.networking.Connection.State.*; +import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; +import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; +import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; +import static ch.dissem.bitmessage.networking.Connection.State.DISCONNECTED; +import static ch.dissem.bitmessage.utils.DebugUtils.inc; /** * Handles all the networky stuff. @@ -91,11 +95,7 @@ public class NetworkNode implements NetworkHandler, ContextHolder { if (connections.size() < 8) { List addresses = ctx.getNodeRegistry().getKnownAddresses(8 - connections.size(), ctx.getStreams()); for (NetworkAddress address : addresses) { - try { - startConnection(new Connection(ctx, CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener)); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); - } + startConnection(new Connection(ctx, CLIENT, address, listener)); } } try { @@ -142,12 +142,55 @@ public class NetworkNode implements NetworkHandler, ContextHolder { @Override public void offer(final InventoryVector iv) { + List active = new LinkedList<>(); synchronized (connections) { - LOG.debug(connections.size() + " connections available to offer " + iv); - List random8 = Collections.selectRandom(8, this.connections); - for (Connection connection : random8) { - connection.offer(iv); + for (Connection connection : connections) { + if (connection.getState() == ACTIVE) { + active.add(connection); + } } } + LOG.debug(active.size() + " connections available to offer " + iv); + List random8 = Collections.selectRandom(8, active); + for (Connection connection : random8) { + connection.offer(iv); + } + } + + @Override + public Property getNetworkStatus() { + TreeSet streams = new TreeSet<>(); + TreeMap incomingConnections = new TreeMap<>(); + TreeMap outgoingConnections = new TreeMap<>(); + + synchronized (connections) { + for (Connection connection : connections) { + if (connection.getState() == ACTIVE) { + long stream = connection.getNode().getStream(); + streams.add(stream); + if (connection.getMode() == SERVER) { + inc(incomingConnections, stream); + } else { + inc(outgoingConnections, stream); + } + } + } + } + Property[] streamProperties = new Property[streams.size()]; + int i = 0; + for (Long stream : streams) { + int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0; + int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0; + streamProperties[i] = new Property("stream " + stream, + null, new Property("nodes", incoming + outgoing), + new Property("incoming", incoming), + new Property("outgoing", outgoing) + ); + i++; + } + return new Property("network", null, + new Property("connectionManager", connectionManager.isAlive() ? "running" : "stopped"), + new Property("connections", null, streamProperties) + ); } } diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcConfig.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcConfig.java index a379708..c991bb7 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcConfig.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcConfig.java @@ -42,7 +42,7 @@ public class JdbcConfig { } public JdbcConfig() { - this("jdbc:h2:~/jabit", "sa", null); + this("jdbc:h2:~/jabit;AUTO_SERVER=TRUE", "sa", null); } public Connection getConnection() { diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java index 43b29a9..df209eb 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java @@ -18,6 +18,7 @@ package ch.dissem.bitmessage.repository; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.ports.NodeRegistry; +import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,8 @@ import java.sql.*; import java.util.LinkedList; import java.util.List; +import static ch.dissem.bitmessage.utils.UnixTime.HOUR; + public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class); @@ -60,24 +63,30 @@ public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { @Override public void offerAddresses(List addresses) { try (Connection connection = config.getConnection()) { - PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ? AND stream = ?"); + PreparedStatement exists = connection.prepareStatement("SELECT time FROM Node WHERE ip = ? AND port = ? AND stream = ?"); PreparedStatement insert = connection.prepareStatement( "INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)"); PreparedStatement update = connection.prepareStatement( "UPDATE Node SET services = ?, time = ? WHERE ip = ? AND port = ? AND stream = ?"); + connection.setAutoCommit(false); for (NetworkAddress node : addresses) { exists.setBytes(1, node.getIPv6()); exists.setInt(2, node.getPort()); exists.setLong(3, node.getStream()); - if (exists.executeQuery().next()) { - update.setLong(1, node.getServices()); - update.setLong(2, node.getTime()); + ResultSet lastConnectionTime = exists.executeQuery(); + if (lastConnectionTime.next()) { + long time = lastConnectionTime.getLong("time"); + if (time < node.getTime() && node.getTime() < UnixTime.now()) { + time = node.getTime(); + update.setLong(1, node.getServices()); + update.setLong(2, time); - update.setBytes(3, node.getIPv6()); - update.setInt(4, node.getPort()); - update.setLong(5, node.getStream()); - update.executeUpdate(); - } else { + update.setBytes(3, node.getIPv6()); + update.setInt(4, node.getPort()); + update.setLong(5, node.getStream()); + update.executeUpdate(); + } + } else if (node.getTime() < UnixTime.now()) { insert.setBytes(1, node.getIPv6()); insert.setInt(2, node.getPort()); insert.setLong(3, node.getServices()); @@ -85,6 +94,14 @@ public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { insert.setLong(5, node.getTime()); insert.executeUpdate(); } + connection.commit(); + } + if (addresses.size() > 100) { + // Let's clean up after we received an update from another node. This way, we shouldn't end up with an + // empty node registry. + PreparedStatement cleanup = connection.prepareStatement("DELETE FROM Node WHERE time < ?"); + cleanup.setLong(1, UnixTime.now(-3 * HOUR)); + cleanup.execute(); } } catch (SQLException e) { LOG.error(e.getMessage(), e);