From abc2f63aa6abcae08c3b371f8e62f1afe03884dd Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Mon, 20 Jun 2016 16:33:47 +0200 Subject: [PATCH] Some further fixes and improvements, not all tests working yet --- .../java/ch/dissem/bitmessage/SystemTest.java | 26 ++++++++++++- .../networking/AbstractConnection.java | 5 ++- .../networking/nio/ConnectionInfo.java | 6 +++ .../networking/nio/NioNetworkHandler.java | 38 +++++++++++-------- .../networking/NetworkHandlerTest.java | 2 +- 5 files changed, 58 insertions(+), 19 deletions(-) diff --git a/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java b/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java index f2f02f2..dac6f74 100644 --- a/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java +++ b/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java @@ -4,17 +4,23 @@ import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography; import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.networking.DefaultNetworkHandler; +import ch.dissem.bitmessage.networking.nio.NioNetworkHandler; import ch.dissem.bitmessage.ports.DefaultLabeler; import ch.dissem.bitmessage.ports.Labeler; +import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.utils.TTL; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -27,8 +33,11 @@ import static org.mockito.Matchers.any; /** * @author Christian Basler */ +@RunWith(Parameterized.class) public class SystemTest { private static int port = 6000; + private final NetworkHandler aliceNetworkHandler; + private final NetworkHandler bobNetworkHandler; private BitmessageContext alice; private TestListener aliceListener = new TestListener(); @@ -39,6 +48,19 @@ public class SystemTest { private TestListener bobListener = new TestListener(); private BitmessageAddress bobIdentity; + public SystemTest(NetworkHandler peer, NetworkHandler node) { + this.aliceNetworkHandler = peer; + this.bobNetworkHandler = node; + } + + @Parameterized.Parameters + public static List parameters() { + return Arrays.asList(new Object[][]{ + {new NioNetworkHandler(), new DefaultNetworkHandler()}, + {new NioNetworkHandler(), new NioNetworkHandler()} + }); + } + @Before public void setUp() { int alicePort = port++; @@ -54,7 +76,7 @@ public class SystemTest { .powRepo(new JdbcProofOfWorkRepository(aliceDB)) .port(alicePort) .nodeRegistry(new TestNodeRegistry(bobPort)) - .networkHandler(new DefaultNetworkHandler()) + .networkHandler(aliceNetworkHandler) .cryptography(new BouncyCryptography()) .listener(aliceListener) .labeler(aliceLabeler) @@ -70,7 +92,7 @@ public class SystemTest { .powRepo(new JdbcProofOfWorkRepository(bobDB)) .port(bobPort) .nodeRegistry(new TestNodeRegistry(alicePort)) - .networkHandler(new DefaultNetworkHandler()) + .networkHandler(bobNetworkHandler) .cryptography(new BouncyCryptography()) .listener(bobListener) .labeler(new DebugLabeler("Bob")) diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index 7f7b6ec..ac80aff 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -305,8 +305,11 @@ public abstract class AbstractConnection { LOG.info("Synchronization timed out"); return true; } + if (!sendingQueue.isEmpty()) { + return false; + } if (msg == null) { - if (requestedObjects.isEmpty() && sendingQueue.isEmpty()) + if (requestedObjects.isEmpty()) return true; readTimeoutCounter++; diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index 1626c87..afe3d14 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -88,6 +88,12 @@ public class ConnectionInfo extends AbstractConnection { } } + public void updateSyncStatus() { + if (!syncFinished) { + syncFinished = reader.getMessages().isEmpty() && syncFinished(null); + } + } + public boolean isSyncFinished() { return syncFinished; } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index c7b4412..5e6f056 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -77,12 +77,12 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), listener, new HashSet(), timeoutInSeconds); - while (channel.isConnected() && - (connection.getState() != ACTIVE || connection.isSyncFinished())) { + while (channel.isConnected() && !connection.isSyncFinished()) { write(requestedObjects, channel, connection); read(channel, connection); Thread.sleep(10); } + LOG.info("Synchronization finished"); } return null; } @@ -95,28 +95,34 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex channel.configureBlocking(true); ByteBuffer buffer = ByteBuffer.allocate(MAX_MESSAGE_SIZE); new NetworkMessage(request).write(buffer); + buffer.flip(); while (buffer.hasRemaining()) { channel.write(buffer); } buffer.clear(); V3MessageReader reader = new V3MessageReader(); - while (reader.getMessages().isEmpty()) { - channel.read(buffer); - buffer.flip(); - reader.update(buffer); + while (channel.isConnected() && reader.getMessages().isEmpty()) { + if (channel.read(buffer) > 0) { + buffer.flip(); + reader.update(buffer); + buffer.compact(); + } else { + throw new NodeException("No response from node " + server); + } + } + NetworkMessage networkMessage; + if (reader.getMessages().isEmpty()) { + throw new NodeException("No response from node " + server); + } else { + networkMessage = reader.getMessages().get(0); } - NetworkMessage networkMessage = reader.getMessages().get(0); if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) { return (CustomMessage) networkMessage.getPayload(); } else { - if (networkMessage == null) { - throw new NodeException("No response from node " + server); - } else { - throw new NodeException("Unexpected response from node " + - server + ": " + networkMessage.getPayload().getCommand()); - } + throw new NodeException("Unexpected response from node " + + server + ": " + networkMessage.getPayload().getCommand()); } } catch (IOException e) { throw new ApplicationException(e); @@ -272,6 +278,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex connection.updateReader(); buffer.compact(); } + connection.updateSyncStatus(); } private void thread(String threadName, Runnable runnable) { @@ -285,8 +292,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex public void stop() { try { serverChannel.socket().close(); - for (SelectionKey key : selector.keys()) { - key.channel().close(); + Iterator iterator = selector.keys().iterator(); + while (iterator.hasNext()) { + iterator.next().channel().close(); } selector.close(); } catch (IOException e) { diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index cc0fc04..24cedf7 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -254,7 +254,7 @@ public class NetworkHandlerTest { Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), mock(NetworkHandler.MessageListener.class), - 10); + 100); future.get(); assertInventorySize(1, nodeInventory); assertInventorySize(1, peerInventory);