From d67c932fb211c555971ba9be6283f87834e7c0d8 Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Thu, 24 Sep 2015 08:09:20 +0200 Subject: [PATCH] Added synchronization code and unit test. Synchronisation fails if the trusted host has no new messages - this needs to be fixed (but shouldn't be an issue for real world applications) --- .../dissem/bitmessage/BitmessageContext.java | 40 +++-- .../ch/dissem/bitmessage/InternalContext.java | 31 ++-- .../ch/dissem/bitmessage/MessageCallback.java | 52 +++++++ .../bitmessage/ports/NetworkHandler.java | 16 +- .../ch/dissem/bitmessage/utils/Property.java | 19 +++ networking/build.gradle | 4 +- .../bitmessage/networking/Connection.java | 65 +++++--- .../networking/DefaultNetworkHandler.java | 16 +- .../networking/DefaultNetworkHandlerTest.java | 64 -------- .../networking/NetworkHandlerTest.java | 147 ++++++++++++++++++ .../bitmessage/networking/TestInventory.java | 76 +++++++++ .../networking/TestNodeRegistry.java | 44 ++++++ networking/src/test/resources/V1Msg.payload | Bin 0 -> 444 bytes .../src/test/resources/V4Pubkey.payload | Bin 0 -> 396 bytes .../src/test/resources/V5Broadcast.payload | Bin 0 -> 428 bytes 15 files changed, 457 insertions(+), 117 deletions(-) create mode 100644 domain/src/main/java/ch/dissem/bitmessage/MessageCallback.java delete mode 100644 networking/src/test/java/ch/dissem/bitmessage/networking/DefaultNetworkHandlerTest.java create mode 100644 networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java create mode 100644 networking/src/test/java/ch/dissem/bitmessage/networking/TestInventory.java create mode 100644 networking/src/test/java/ch/dissem/bitmessage/networking/TestNodeRegistry.java create mode 100644 networking/src/test/resources/V1Msg.payload create mode 100644 networking/src/test/resources/V4Pubkey.payload create mode 100644 networking/src/test/resources/V5Broadcast.payload diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index d7c04b5..2cd851e 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -21,13 +21,13 @@ import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.payload.Pubkey.Feature; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.entity.valueobject.PrivateKey; import ch.dissem.bitmessage.exception.DecryptionFailedException; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.*; import ch.dissem.bitmessage.utils.Property; -import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -179,19 +179,6 @@ public class BitmessageContext { ); } - private void send(long stream, ObjectPayload payload, long timeToLive) { - long expires = UnixTime.now(+timeToLive); - LOG.info("Expires at " + expires); - ObjectMessage object = new ObjectMessage.Builder() - .stream(stream) - .expiresTime(expires) - .payload(payload) - .build(); - ctx.getSecurity().doProofOfWork(object, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); - ctx.getInventory().storeObject(object); - ctx.getNetworkHandler().offer(object.getInventoryVector()); - } - public void startup(Listener listener) { this.listener = listener; ctx.getNetworkHandler().start(new DefaultMessageListener(ctx, listener)); @@ -280,6 +267,7 @@ public class BitmessageContext { MessageRepository messageRepo; ProofOfWorkEngine proofOfWorkEngine; Security security; + MessageCallback messageCallback; public Builder() { } @@ -319,6 +307,11 @@ public class BitmessageContext { return this; } + public Builder messageCallback(MessageCallback callback) { + this.messageCallback = callback; + return this; + } + public Builder proofOfWorkEngine(ProofOfWorkEngine proofOfWorkEngine) { this.proofOfWorkEngine = proofOfWorkEngine; return this; @@ -333,6 +326,25 @@ public class BitmessageContext { if (proofOfWorkEngine == null) { proofOfWorkEngine = new MultiThreadedPOWEngine(); } + if (messageCallback == null) { + messageCallback = new MessageCallback() { + @Override + public void proofOfWorkStarted(ObjectPayload message) { + } + + @Override + public void proofOfWorkCompleted(ObjectPayload message) { + } + + @Override + public void messageOffered(ObjectPayload message, InventoryVector iv) { + } + + @Override + public void messageAcknowledged(InventoryVector iv) { + } + }; + } return new BitmessageContext(this); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java index 05ccd51..403e9c4 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java @@ -49,12 +49,13 @@ public class InternalContext { private final AddressRepository addressRepository; private final MessageRepository messageRepository; private final ProofOfWorkEngine proofOfWorkEngine; + private final MessageCallback messageCallback; private final TreeSet streams = new TreeSet<>(); private final int port; - private long networkNonceTrialsPerByte = 1000; - private long networkExtraBytes = 1000; - private long clientNonce; + private final long clientNonce; + private final long networkNonceTrialsPerByte = 1000; + private final long networkExtraBytes = 1000; public InternalContext(BitmessageContext.Builder builder) { this.security = builder.security; @@ -65,11 +66,11 @@ public class InternalContext { this.messageRepository = builder.messageRepo; this.proofOfWorkEngine = builder.proofOfWorkEngine; this.clientNonce = security.randomNonce(); + this.messageCallback = builder.messageCallback; + this.port = builder.port; Singleton.initialize(security); - port = builder.port; - // TODO: streams of new identities and subscriptions should also be added. This works only after a restart. for (BitmessageAddress address : addressRepository.getIdentities()) { streams.add(address.getStream()); @@ -81,7 +82,10 @@ public class InternalContext { streams.add(1L); } - init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine, security); + init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine); + for (BitmessageAddress identity : addressRepository.getIdentities()) { + streams.add(identity.getStream()); + } } private void init(Object... objects) { @@ -169,7 +173,9 @@ public class InternalContext { } else if (payload instanceof Encrypted) { object.encrypt(to.getPubkey()); } + messageCallback.proofOfWorkStarted(payload); security.doProofOfWork(object, nonceTrialsPerByte, extraBytes); + messageCallback.proofOfWorkCompleted(payload); if (payload instanceof PlaintextHolder) { Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext(); plaintext.setInventoryVector(object.getInventoryVector()); @@ -177,6 +183,7 @@ public class InternalContext { } inventory.storeObject(object); networkHandler.offer(object.getInventoryVector()); + messageCallback.messageOffered(payload, object.getInventoryVector()); } catch (IOException e) { throw new RuntimeException(e); } @@ -193,16 +200,13 @@ public class InternalContext { .build(); response.sign(identity.getPrivateKey()); response.encrypt(security.createPublicKey(identity.getPublicDecryptionKey())); + messageCallback.proofOfWorkStarted(identity.getPubkey()); security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes); - if (response.isSigned()) { - response.sign(identity.getPrivateKey()); - } - if (response instanceof Encrypted) { - response.encrypt(security.createPublicKey(identity.getPublicDecryptionKey())); - } + messageCallback.proofOfWorkCompleted(identity.getPubkey()); inventory.storeObject(response); networkHandler.offer(response.getInventoryVector()); // TODO: save that the pubkey was just sent, and on which stream! + messageCallback.messageOffered(identity.getPubkey(), response.getInventoryVector()); } catch (IOException e) { throw new RuntimeException(e); } @@ -216,9 +220,12 @@ public class InternalContext { .expiresTime(expires) .payload(new GetPubkey(contact)) .build(); + messageCallback.proofOfWorkStarted(response.getPayload()); security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes); + messageCallback.proofOfWorkCompleted(response.getPayload()); inventory.storeObject(response); networkHandler.offer(response.getInventoryVector()); + messageCallback.messageOffered(response.getPayload(), response.getInventoryVector()); } public long getClientNonce() { diff --git a/domain/src/main/java/ch/dissem/bitmessage/MessageCallback.java b/domain/src/main/java/ch/dissem/bitmessage/MessageCallback.java new file mode 100644 index 0000000..d09ff97 --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/MessageCallback.java @@ -0,0 +1,52 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage; + +import ch.dissem.bitmessage.entity.payload.ObjectPayload; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; + +/** + * Callback for message sending events, mostly so the user can be notified when POW is done. + */ +public interface MessageCallback { + /** + * Called before calculation of proof of work begins. + */ + void proofOfWorkStarted(ObjectPayload message); + + /** + * Called after calculation of proof of work finished. + */ + void proofOfWorkCompleted(ObjectPayload message); + + /** + * Called once the message is offered to the network. Please note that this doesn't mean the message was sent, + * if the client is not connected to the network it's just stored in the inventory. + *

+ * Also, please note that this is where the original payload as well as the {@link InventoryVector} of the sent + * message is available. If the callback needs the IV for some reason, it should be retrieved here. (Plaintext + * and Broadcast messages will have their IV property set automatically though.) + *

+ */ + void messageOffered(ObjectPayload message, InventoryVector iv); + + /** + * This isn't called yet, as ACK messages aren't being processed yet. Also, this is only relevant for Plaintext + * messages. + */ + void messageAcknowledged(InventoryVector iv); +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index e07bd79..fd44358 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -27,12 +27,24 @@ import java.net.InetAddress; * Handles incoming messages */ public interface NetworkHandler { + /** + * Connects to the trusted host, fetches and offers new messages and disconnects afterwards. + */ + Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds); + + /** + * Start a full network node, accepting incoming connections and relaying objects. + */ void start(MessageListener listener); + /** + * Stop the full network node. + */ void stop(); - void synchronize(InetAddress trustedHost, int port, MessageListener listener) throws IOException; - + /** + * Offer new objects to up to 8 random nodes. + */ void offer(InventoryVector iv); Property getNetworkStatus(); diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java index 6fa0ec4..72abe58 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java @@ -35,6 +35,25 @@ public class Property { this.properties = properties; } + public String getName() { + return name; + } + + public Object getValue() { + return value; + } + + public Property getProperty(String name) { + for (Property p : properties) { + if (name == null) { + if (p.name == null) return p; + } else { + if (name.equals(p.name)) return p; + } + } + return null; + } + @Override public String toString() { return toString(""); diff --git a/networking/build.gradle b/networking/build.gradle index 07d268e..6e9366c 100644 --- a/networking/build.gradle +++ b/networking/build.gradle @@ -12,6 +12,8 @@ uploadArchives { dependencies { compile project(':domain') - testCompile 'org.slf4j:slf4j-simple:1.7.12' testCompile 'junit:junit:4.11' + testCompile 'org.slf4j:slf4j-simple:1.7.12' + testCompile 'org.mockito:mockito-core:1.10.19' + testCompile project(':security-bc') } \ No newline at end of file diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index d8054ff..bb8e62c 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -54,43 +55,55 @@ public class Connection implements Runnable { private final static Logger LOG = LoggerFactory.getLogger(Connection.class); private static final int CONNECT_TIMEOUT = 5000; private final ConcurrentMap ivCache; - private InternalContext ctx; - private Mode mode; + private final InternalContext ctx; + private final Mode mode; + private final Socket socket; + private final MessageListener listener; + private final NetworkAddress host; + private final NetworkAddress node; + private final Queue sendingQueue = new ConcurrentLinkedDeque<>(); + private final Map requestedObjects; + private final long syncTimeout; + private State state; - private Socket socket; private InputStream in; private OutputStream out; - private MessageListener listener; private int version; private long[] streams; - private NetworkAddress host; - private NetworkAddress node; - private Queue sendingQueue = new ConcurrentLinkedDeque<>(); - private ConcurrentMap requestedObjects; public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, ConcurrentMap requestedObjectsMap) throws IOException { - this(context, mode, listener, requestedObjectsMap); - this.socket = socket; - this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(); + this(context, mode, listener, socket, requestedObjectsMap, + new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), + 0); } public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, ConcurrentMap requestedObjectsMap) { - this(context, mode, listener, requestedObjectsMap); - this.socket = new Socket(); - this.node = node; + this(context, mode, listener, new Socket(), requestedObjectsMap, + node, 0); } - private Connection(InternalContext context, Mode mode, MessageListener listener, - ConcurrentMap requestedObjectsMap) { + private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, + Map requestedObjectsMap, NetworkAddress node, long syncTimeout) { this.ctx = context; this.mode = mode; this.state = CONNECTING; this.listener = listener; + this.socket = socket; this.requestedObjects = requestedObjectsMap; this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); - ivCache = new ConcurrentHashMap<>(); + this.node = node; + this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); + this.ivCache = new ConcurrentHashMap<>(); + } + + public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, + long timeoutInSeconds) throws IOException { + return new Connection(ctx, Mode.CLIENT, listener, new Socket(address, port), + new HashMap(), + new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), + timeoutInSeconds); } public Mode getMode() { @@ -168,7 +181,7 @@ public class Connection implements Runnable { + msg.getPayload().getCommand() + "'"); } } - if (socket.isClosed()) state = DISCONNECTED; + if (socket.isClosed() || syncFinished(msg)) disconnect(); } catch (SocketTimeoutException ignore) { if (state == ACTIVE) { sendQueue(); @@ -184,13 +197,27 @@ public class Connection implements Runnable { } } + @SuppressWarnings("RedundantIfStatement") + private boolean syncFinished(NetworkMessage msg) { + if (syncTimeout == 0 || state != ACTIVE) { + return false; + } + if (syncTimeout < UnixTime.now()) { + return true; + } + if (!(msg.getPayload() instanceof Addr) && requestedObjects.isEmpty() && sendingQueue.isEmpty()) { + return true; + } + return false; + } + private void activateConnection() { LOG.info("Successfully established connection with node " + node); state = ACTIVE; sendAddresses(); sendInventory(); node.setTime(UnixTime.now()); - ctx.getNodeRegistry().offerAddresses(Arrays.asList(node)); + ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node)); } private void sendQueue() { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index e73e8ed..aead8e8 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -66,6 +66,17 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { this.ctx = context; } + @Override + public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) { + try { + Thread t = new Thread(Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds)); + t.start(); + return t; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public void start(final MessageListener listener) { if (listener == null) { @@ -150,11 +161,6 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { } } - @Override - public void synchronize(InetAddress trustedHost, int port, MessageListener listener) throws IOException { - startConnection(new Connection(ctx, CLIENT, new Socket(trustedHost, port), listener, requestedObjects)); - } - private void startConnection(Connection c) { synchronized (connections) { // prevent connecting twice to the same node diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/DefaultNetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/DefaultNetworkHandlerTest.java deleted file mode 100644 index 3bec993..0000000 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/DefaultNetworkHandlerTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking; - -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.Version; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.utils.UnixTime; -import org.junit.Ignore; -import org.junit.Test; - -/** - * FIXME: there really should be sensible tests for the network handler - */ -public class DefaultNetworkHandlerTest { - private NetworkAddress localhost = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build(); - - // void start(MessageListener listener); - // void stop(); - // void offer(InventoryVector iv); - // Property getNetworkStatus(); - - @Ignore - @Test(expected = InterruptedException.class) - public void testSendMessage() throws Exception { - final Thread baseThread = Thread.currentThread(); - DefaultNetworkHandler net = new DefaultNetworkHandler(); -// net.setListener(localhost, new NetworkHandler.MessageListener() { -// @Override -// public void receive(ObjectPayload payload) { -// System.out.println(payload); -// baseThread.interrupt(); -// } -// }); - NetworkMessage ver = new NetworkMessage( - new Version.Builder() - .version(3) - .services(1) - .timestamp(UnixTime.now()) - .addrFrom(localhost) - .addrRecv(localhost) - .nonce(-1) - .userAgent("Test") - .streams(1, 2) - .build() - ); -// net.send(localhost, ver); - Thread.sleep(20000); - } -} diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java new file mode 100644 index 0000000..cd8a743 --- /dev/null +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -0,0 +1,147 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking; + +import ch.dissem.bitmessage.BitmessageContext; +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.ports.AddressRepository; +import ch.dissem.bitmessage.ports.MessageRepository; +import ch.dissem.bitmessage.ports.NetworkHandler; +import ch.dissem.bitmessage.security.bc.BouncySecurity; +import ch.dissem.bitmessage.utils.Property; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.net.InetAddress; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * FIXME: there really should be sensible tests for the network handler + */ +public class NetworkHandlerTest { + private static NetworkAddress localhost = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build(); + + private static TestInventory peerInventory; + private static TestInventory nodeInventory; + + private static BitmessageContext node; + private static NetworkHandler networkHandler; + + @BeforeClass + public static void setUp() { + peerInventory = new TestInventory(); + BitmessageContext peer = new BitmessageContext.Builder() + .addressRepo(Mockito.mock(AddressRepository.class)) + .inventory(peerInventory) + .messageRepo(Mockito.mock(MessageRepository.class)) + .port(6001) + .nodeRegistry(new TestNodeRegistry()) + .networkHandler(new DefaultNetworkHandler()) + .security(new BouncySecurity()) + .build(); + peer.startup(Mockito.mock(BitmessageContext.Listener.class)); + + nodeInventory = new TestInventory(); + networkHandler = new DefaultNetworkHandler(); + node = new BitmessageContext.Builder() + .addressRepo(Mockito.mock(AddressRepository.class)) + .inventory(nodeInventory) + .messageRepo(Mockito.mock(MessageRepository.class)) + .port(6002) + .nodeRegistry(new TestNodeRegistry(localhost)) + .networkHandler(networkHandler) + .security(new BouncySecurity()) + .build(); + } + + @Test(timeout = 20_000) + public void ensureNodesAreConnecting() { + try { + node.startup(Mockito.mock(BitmessageContext.Listener.class)); + Property status; + do { + Thread.yield(); + status = node.status().getProperty("network").getProperty("connections").getProperty("stream 0"); + } while (status == null); + assertEquals(1, status.getProperty("outgoing").getValue()); + } finally { + shutdown(node); + } + } + + @Test(timeout = 5_000) + public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception { + peerInventory.init( + "V4Pubkey.payload", + "V5Broadcast.payload" + ); + + nodeInventory.init( + "V1Msg.payload" + ); + + Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, + mock(NetworkHandler.MessageListener.class), + 10); + t.join(); + assertEquals(3, nodeInventory.getInventory().size()); + assertEquals(3, peerInventory.getInventory().size()); + } + + @Test(timeout = 5_000) + public void ensureObjectsAreSynchronizedIfOnlyPeerHasObjects() throws Exception { + peerInventory.init( + "V4Pubkey.payload", + "V5Broadcast.payload" + ); + + nodeInventory.init(); + + Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, + mock(NetworkHandler.MessageListener.class), + 10); + t.join(); + assertEquals(2, nodeInventory.getInventory().size()); + assertEquals(2, peerInventory.getInventory().size()); + } + + @Test(timeout = 5_000) + public void ensureObjectsAreSynchronizedIfOnlyNodeHasObjects() throws Exception { + peerInventory.init(); + + nodeInventory.init( + "V1Msg.payload" + ); + + Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, + mock(NetworkHandler.MessageListener.class), + 10); + t.join(); + assertEquals(1, nodeInventory.getInventory().size()); + assertEquals(1, peerInventory.getInventory().size()); + } + + private void shutdown(BitmessageContext node) { + node.shutdown(); + do { + Thread.yield(); + } while (node.isRunning()); + } +} diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/TestInventory.java b/networking/src/test/java/ch/dissem/bitmessage/networking/TestInventory.java new file mode 100644 index 0000000..040e730 --- /dev/null +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/TestInventory.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking; + +import ch.dissem.bitmessage.entity.ObjectMessage; +import ch.dissem.bitmessage.entity.payload.ObjectType; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.ports.Inventory; +import ch.dissem.bitmessage.utils.TestUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestInventory implements Inventory { + private final Map inventory; + + public TestInventory() { + this.inventory = new HashMap<>(); + } + + @Override + public List getInventory(long... streams) { + return new ArrayList<>(inventory.keySet()); + } + + @Override + public List getMissing(List offer, long... streams) { + return offer; + } + + @Override + public ObjectMessage getObject(InventoryVector vector) { + return inventory.get(vector); + } + + @Override + public List getObjects(long stream, long version, ObjectType... types) { + return new ArrayList<>(inventory.values()); + } + + @Override + public void storeObject(ObjectMessage object) { + inventory.put(object.getInventoryVector(), object); + } + + @Override + public void cleanup() { + + } + + public void init(String... resources) throws IOException { + inventory.clear(); + for (String resource : resources) { + int version = Integer.parseInt(resource.substring(1, 2)); + ObjectMessage obj = TestUtils.loadObjectMessage(version, resource); + inventory.put(obj.getInventoryVector(), obj); + } + } +} diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/TestNodeRegistry.java b/networking/src/test/java/ch/dissem/bitmessage/networking/TestNodeRegistry.java new file mode 100644 index 0000000..c3abd58 --- /dev/null +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/TestNodeRegistry.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking; + +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.ports.NodeRegistry; + +import java.util.Arrays; +import java.util.List; + +/** + * Empty {@link NodeRegistry} that doesn't do anything, but shouldn't break things either. + */ +class TestNodeRegistry implements NodeRegistry { + private List nodes; + + public TestNodeRegistry(NetworkAddress... nodes) { + this.nodes = Arrays.asList(nodes); + } + + @Override + public List getKnownAddresses(int limit, long... streams) { + return nodes; + } + + @Override + public void offerAddresses(List addresses) { + // Ignore + } +} diff --git a/networking/src/test/resources/V1Msg.payload b/networking/src/test/resources/V1Msg.payload new file mode 100644 index 0000000000000000000000000000000000000000..ddf21cda61a888ea022b4b8424eb96ee1db39ffe GIT binary patch literal 444 zcmV;t0Ym-(00008xvxtA00010gc-#E000630k}!J&+7f>uEI8@F)kNn;2iFat;p{^0*meHzJ*{^Fy|8tEC&7fCXYCRC6p;- zrwSGz0Ho|#|o zEVfmjrCC-W%N^r5)7#?V5j#CZa%OaP0g&+7Iut0XVpU6GrzwGJMd0*0#;F)6j2?3m zpz&%fvq&-208*9RwkBU10Rk0w2I?uhUA$B$(U5H_f%duH^Cj_3xbpYV?c%~y>8-%+dazRVTC$s5mxXUXECZIKhWTo mu6-%(T=mLftk~cJZQV_Vr9WEM=TwI`1+jj(Azs6mYB0@}ZPQNx literal 0 HcmV?d00001 diff --git a/networking/src/test/resources/V4Pubkey.payload b/networking/src/test/resources/V4Pubkey.payload new file mode 100644 index 0000000000000000000000000000000000000000..a5e4c5c40944e8ffc38b369e40840c06a416c5aa GIT binary patch literal 396 zcmV;70dxKU00000xAyk{00010UeGiE000350Z5r#d*j`5Pg%2ba1!PSq98oPb(^t% z;KgzK_V$mQZm9inp(ffPq;aqrDrn3?C<4j=Akdap1po|b4j_f0notM>d(L9Ml{C2@ z+T1h%bp#E0>i{5sJWl0^@?*BHMYKflgKudScHd!TX)VzSU6-cwk+pgTmYfiz1uQCj zSs4!l9plyrnujt2%tbA6X)Hz_q)G?EPXO!Uy0#j8MQVhpFP4*Qu>!6gP@)wYpVarh zgpUg62HESXiAQQMO_}jTWWG}erMJvUwXEpY1uLsPa(u$}|D#H;5*xal`le2&=b|6X zXDuFEZ4@AwqBs+8&pZ<&P7N~hBU_TcS#YQ9yYuq}#O&r(M5b62y}r$6<(Ba$=f(7# zTcA@q?ut~DeC+hIc@%8p@Hu+Tckgr{-rc17p<4cAWEv#WU`aSrVyLq*B74o@+$o~b q!`TVqn-`}!5^p-`%#4j-0g7a3xT-Zo<53UgE9%*VnnKWkIVoM&N4W9; literal 0 HcmV?d00001 diff --git a/networking/src/test/resources/V5Broadcast.payload b/networking/src/test/resources/V5Broadcast.payload new file mode 100644 index 0000000000000000000000000000000000000000..87c8c047cf284e7fe9a9f47dbbfa49c3817bbd67 GIT binary patch literal 428 zcmV;d0aN|}0000137ZE100010mbs?@000980Z5r#d*j`5Pg%2ba1!PSq98oPb(^t% z;KgzK_V$mQZr&z5B~$A7VIv`j|E{Z8{Q}AWAl#f{6t$GZv!n%i@Sw{$TJpn{a!1eo z=z%8?zJ0DxK>#4bE!e}!EtnzSQlhiRA31jD zOO*qpGJH6?%S3d~7}HaC-c+n%AQ()G1O?D561V0JBm9DO>u)Qr=g+aPQVh?!%*W*81aLJ~yN;T{l*>cnUb z*B%Nj&XS3pk4Wlw@7$u`pga`dcDrVb$+tBC{(ijJ-tx`97<96j=y~}-_OHR%gtS~T6Mu$Ih4{viO7qx3xfPsp%4y? WTaKw&M!dRV|MQaOU7}MPXm%~gpv-;% literal 0 HcmV?d00001