diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index 54897cb..7f7b6ec 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -60,6 +60,9 @@ public abstract class AbstractConnection { protected volatile State state; protected long lastObjectTime; + private final long syncTimeout; + private int readTimeoutCounter; + protected long peerNonce; protected int version; protected long[] streams; @@ -70,12 +73,13 @@ public abstract class AbstractConnection { NetworkAddress node, NetworkHandler.MessageListener listener, Set commonRequestedObjects, - boolean threadsafe) { + long syncTimeout, boolean threadsafe) { this.ctx = context; this.mode = mode; this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.node = node; this.listener = listener; + this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); if (threadsafe) { this.ivCache = new ConcurrentHashMap<>(); this.sendingQueue = new ConcurrentLinkedDeque<>(); @@ -107,6 +111,9 @@ public abstract class AbstractConnection { receiveMessage(payload); break; + case DISCONNECTED: + break; + default: handleCommand(payload); break; @@ -283,6 +290,33 @@ public abstract class AbstractConnection { } } + @SuppressWarnings("RedundantIfStatement") + protected boolean syncFinished(NetworkMessage msg) { + if (mode != SYNC) { + return false; + } + if (Thread.interrupted()) { + return true; + } + if (state != ACTIVE) { + return false; + } + if (syncTimeout < UnixTime.now()) { + LOG.info("Synchronization timed out"); + return true; + } + if (msg == null) { + if (requestedObjects.isEmpty() && sendingQueue.isEmpty()) + return true; + + readTimeoutCounter++; + return readTimeoutCounter > 1; + } else { + readTimeoutCounter = 0; + return false; + } + } + public void disconnect() { state = DISCONNECTED; diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 3fe1b62..d9cb2cc 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -56,13 +56,11 @@ class Connection extends AbstractConnection { private final long startTime; private final Socket socket; - private final long syncTimeout; private final ReaderRunnable reader = new ReaderRunnable(); private final WriterRunnable writer = new WriterRunnable(); private InputStream in; private OutputStream out; - private int readTimeoutCounter; private boolean socketInitialized; public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, @@ -80,10 +78,9 @@ class Connection extends AbstractConnection { private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, Set commonRequestedObjects, NetworkAddress node, long syncTimeout) { - super(context, mode, node, listener, commonRequestedObjects, true); + super(context, mode, node, listener, commonRequestedObjects, syncTimeout, true); this.startTime = UnixTime.now(); this.socket = socket; - this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); } public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, @@ -110,33 +107,6 @@ class Connection extends AbstractConnection { return node; } - @SuppressWarnings("RedundantIfStatement") - private boolean syncFinished(NetworkMessage msg) { - if (mode != SYNC) { - return false; - } - if (Thread.interrupted()) { - return true; - } - if (state != ACTIVE) { - return false; - } - if (syncTimeout < UnixTime.now()) { - LOG.info("Synchronization timed out"); - return true; - } - if (msg == null) { - if (requestedObjects.isEmpty() && sendingQueue.isEmpty()) - return true; - - readTimeoutCounter++; - return readTimeoutCounter > 1; - } else { - readTimeoutCounter = 0; - return false; - } - } - @Override protected void send(MessagePayload payload) { try { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index 36bb463..1626c87 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -42,11 +42,12 @@ public class ConnectionInfo extends AbstractConnection { private ByteBuffer in = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private V3MessageReader reader = new V3MessageReader(); + private boolean syncFinished; public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, NetworkHandler.MessageListener listener, - Set commonRequestedObjects) { - super(context, mode, node, listener, commonRequestedObjects, false); + Set commonRequestedObjects, long syncTimeout) { + super(context, mode, node, listener, commonRequestedObjects, syncTimeout, false); out.flip(); if (mode == CLIENT || mode == SYNC) { send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build()); @@ -77,14 +78,20 @@ public class ConnectionInfo extends AbstractConnection { reader.update(in); if (!reader.getMessages().isEmpty()) { Iterator iterator = reader.getMessages().iterator(); + NetworkMessage msg = null; while (iterator.hasNext()) { - NetworkMessage msg = iterator.next(); + msg = iterator.next(); handleMessage(msg.getPayload()); iterator.remove(); } + syncFinished = syncFinished(msg); } } + public boolean isSyncFinished() { + return syncFinished; + } + @Override protected void send(MessagePayload payload) { sendingQueue.add(payload); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index cfb6c2e..c7b4412 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -43,6 +43,7 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; +import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED; import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.nio.channels.SelectionKey.OP_READ; @@ -65,7 +66,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private ServerSocketChannel serverChannel; @Override - public Future synchronize(final InetAddress server, final int port, final MessageListener listener, long timeoutInSeconds) { + public Future synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) { return pool.submit(new Callable() { @Override public Void call() throws Exception { @@ -75,11 +76,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex channel.configureBlocking(false); ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), - listener, new HashSet()); + listener, new HashSet(), timeoutInSeconds); while (channel.isConnected() && - (connection.getState() != ACTIVE - || connection.getSendingQueue().isEmpty() - || requestedObjects.isEmpty())) { + (connection.getState() != ACTIVE || connection.isSyncFinished())) { write(requestedObjects, channel, connection); read(channel, connection); Thread.sleep(10); @@ -138,7 +137,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex throw new ApplicationException(e); } final Set requestedObjects = new HashSet<>(); - start("connection listener", new Runnable() { + thread("connection listener", new Runnable() { @Override public void run() { try { @@ -152,9 +151,10 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex new ConnectionInfo(ctx, SERVER, new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), listener, - requestedObjects + requestedObjects, 0 )); } catch (AsynchronousCloseException ignore) { + LOG.trace(ignore.getMessage()); } catch (IOException e) { LOG.error(e.getMessage(), e); } @@ -169,7 +169,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } }); - start("connection starter", new Runnable() { + thread("connection starter", new Runnable() { @Override public void run() { while (selector.isOpen()) { @@ -184,7 +184,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex new ConnectionInfo(ctx, CLIENT, address, listener, - requestedObjects + requestedObjects, 0 )); } catch (AsynchronousCloseException ignore) { } catch (IOException e) { @@ -200,7 +200,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } }); - start("processor", new Runnable() { + thread("processor", new Runnable() { @Override public void run() { try { @@ -220,7 +220,12 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex read(channel, connection); } if (connection.getSendingQueue().isEmpty()) { - key.interestOps(OP_READ); + if (connection.getState() == DISCONNECTED) { + key.interestOps(0); + key.channel().close(); + } else { + key.interestOps(OP_READ); + } } else { key.interestOps(OP_READ | OP_WRITE); } @@ -269,7 +274,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } } - private void start(String threadName, Runnable runnable) { + private void thread(String threadName, Runnable runnable) { Thread thread = new Thread(runnable, threadName); thread.setDaemon(true); thread.setPriority(Thread.MIN_PRIORITY); diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index 1b7994a..cc0fc04 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -194,12 +194,11 @@ public class NetworkHandlerTest { assertThat(response.getData(), is(data)); } - @Test(timeout = 5_000, expected = NodeException.class) + @Test(expected = NodeException.class) public void ensureCustomMessageWithoutResponseYieldsException() throws Exception { byte[] data = cryptography().randomBytes(8); data[0] = (byte) 0; CustomMessage request = new CustomMessage("test request", data); - node.startup(); CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);