From bd5bf769048a1cca1fa6107e47a3f24a78c234a0 Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Fri, 12 Jun 2015 06:57:20 +0200 Subject: [PATCH] Improved connection management, preventing multiple connections to the same node, and improved broadcast handling. --- .../bitmessage/DefaultMessageListener.java | 14 +++-- .../bitmessage/ports/AddressRepository.java | 3 + .../dissem/bitmessage/utils/Collections.java | 57 +++++++++++++++++++ .../bitmessage/utils/CollectionsTest.java | 35 ++++++++++++ .../bitmessage/networking/Connection.java | 20 ++++++- .../bitmessage/networking/NetworkNode.java | 16 +++--- .../repository/JdbcAddressRepository.java | 20 +++++-- .../migration/V1.2__Create_address_table.sql | 1 + .../repository/JdbcAddressRepositoryTest.java | 26 ++++++++- 9 files changed, 172 insertions(+), 20 deletions(-) create mode 100644 domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java create mode 100644 domain/src/test/java/ch/dissem/bitmessage/utils/CollectionsTest.java diff --git a/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java b/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java index b9576e5..d034a79 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java +++ b/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.List; import static ch.dissem.bitmessage.entity.Plaintext.Status.*; @@ -109,7 +110,6 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { ctx.getAddressRepo().save(address); } } catch (DecryptionFailedException ignore) { - LOG.debug(ignore.getMessage(), ignore); } } @@ -129,21 +129,25 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { } break; } catch (DecryptionFailedException ignore) { - LOG.trace(ignore.getMessage(), ignore); } } } protected void receive(ObjectMessage object, Broadcast broadcast) throws IOException { - // TODO this should work fine as-is, but checking the tag might be more efficient -// V5Broadcast v5 = broadcast instanceof V5Broadcast ? (V5Broadcast) broadcast : null; - for (BitmessageAddress subscription : ctx.getAddressRepo().getSubscriptions()) { + byte[] tag = broadcast instanceof V5Broadcast ? ((V5Broadcast) broadcast).getTag() : null; + for (BitmessageAddress subscription : ctx.getAddressRepo().getSubscriptions(broadcast.getVersion())) { + if (tag != null && !Arrays.equals(tag, subscription.getTag())) { + continue; + } try { broadcast.decrypt(subscription.getPublicDecryptionKey()); if (!object.isSignatureValid(broadcast.getPlaintext().getFrom().getPubkey())) { LOG.warn("Broadcast with IV " + object.getInventoryVector() + " was successfully decrypted, but signature check failed. Ignoring."); } else { + broadcast.getPlaintext().setStatus(RECEIVED); + broadcast.getPlaintext().addLabels(ctx.getMessageRepository().getLabels(Label.Type.INBOX, Label.Type.UNREAD)); broadcast.getPlaintext().setInventoryVector(object.getInventoryVector()); + ctx.getMessageRepository().save(broadcast.getPlaintext()); listener.receive(broadcast.getPlaintext()); } } catch (DecryptionFailedException ignore) { diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java b/domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java index cb01028..7cd6be8 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java @@ -31,6 +31,9 @@ public interface AddressRepository { List getIdentities(); List getSubscriptions(); + + List getSubscriptions(long broadcastVersion); + /** * Returns all Bitmessage addresses that have no private key. */ diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java new file mode 100644 index 0000000..1feb8a4 --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java @@ -0,0 +1,57 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +public class Collections { + private final static Random RANDOM = new Random(); + + /** + * Returns a random subset of the given collection, or a copy of the collection if it's not larger than count. + * The randomness + */ + public static List selectRandom(int count, Collection collection) { + ArrayList result = new ArrayList<>(count); + if (collection.size() <= count) { + result.addAll(collection); + } else { + double collectionRest = collection.size(); + double resultRest = count; + int skipMax = (int) Math.ceil(collectionRest / resultRest); + int skip = RANDOM.nextInt(skipMax); + for (T item : collection) { + collectionRest--; + if (skip > 0) { + skip--; + } else { + result.add(item); + resultRest--; + if (resultRest == 0){ + break; + } + skipMax = (int) Math.ceil(collectionRest / resultRest); + skip = RANDOM.nextInt(skipMax); + } + } + } + return result; + } +} diff --git a/domain/src/test/java/ch/dissem/bitmessage/utils/CollectionsTest.java b/domain/src/test/java/ch/dissem/bitmessage/utils/CollectionsTest.java new file mode 100644 index 0000000..91e42b5 --- /dev/null +++ b/domain/src/test/java/ch/dissem/bitmessage/utils/CollectionsTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +import org.junit.Test; + +import java.util.LinkedList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class CollectionsTest { + @Test + public void ensureSelectRandomReturnsMaximumPossibleItems() throws Exception { + List list = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + list.add(i); + } + assertEquals(9, Collections.selectRandom(9, list).size()); + } +} \ No newline at end of file diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index bbec1d9..9b191dc 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -29,10 +29,13 @@ import ch.dissem.bitmessage.utils.Security; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.List; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; @@ -161,7 +164,7 @@ public class Connection implements Runnable { listener.receive(objectMessage); ctx.getInventory().storeObject(objectMessage); } catch (InsufficientProofOfWorkException e) { -// DebugUtils.saveToFile(objectMessage); + LOG.warn(e.getMessage()); } catch (IOException e) { LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); DebugUtils.saveToFile(objectMessage); @@ -217,5 +220,18 @@ public class Connection implements Runnable { .build()); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Connection that = (Connection) o; + return Objects.equals(node, that.node); + } + + @Override + public int hashCode() { + return Objects.hash(node); + } + public enum State {SERVER, CLIENT, ACTIVE, DISCONNECTED} } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java index b379a2c..93fc12f 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java @@ -21,6 +21,7 @@ import ch.dissem.bitmessage.InternalContext.ContextHolder; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.ports.NetworkHandler; +import ch.dissem.bitmessage.utils.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,8 +88,8 @@ public class NetworkNode implements NetworkHandler, ContextHolder { } } } - if (connections.size() < 1) { - List addresses = ctx.getNodeRegistry().getKnownAddresses(8, ctx.getStreams()); + if (connections.size() < 8) { + List addresses = ctx.getNodeRegistry().getKnownAddresses(8 - connections.size(), ctx.getStreams()); for (NetworkAddress address : addresses) { try { startConnection(new Connection(ctx, CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener)); @@ -96,7 +97,6 @@ public class NetworkNode implements NetworkHandler, ContextHolder { LOG.debug(e.getMessage(), e); } } - // FIXME: prevent connecting twice to the same node } try { Thread.sleep(30000); @@ -131,6 +131,10 @@ public class NetworkNode implements NetworkHandler, ContextHolder { private void startConnection(Connection c) { synchronized (connections) { + // prevent connecting twice to the same node + if (connections.contains(c)) { + return; + } connections.add(c); } pool.execute(c); @@ -138,12 +142,10 @@ public class NetworkNode implements NetworkHandler, ContextHolder { @Override public void offer(final InventoryVector iv) { - // TODO: - // - should offer to (random) 8 nodes during 8 seconds (if possible) - // - should probably offer later if no connection available at the moment? synchronized (connections) { LOG.debug(connections.size() + " connections available to offer " + iv); - for (Connection connection : connections) { + List random8 = Collections.selectRandom(8, this.connections); + for (Connection connection : random8) { connection.offer(iv); } } diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java index eb0117d..f1e55d6 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java @@ -75,6 +75,15 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito return find("subscribed = '1'"); } + @Override + public List getSubscriptions(long broadcastVersion) { + if (broadcastVersion > 4) { + return find("subscribed = '1' AND version > 3"); + } else { + return find("subscribed = '1' AND version <= 3"); + } + } + @Override public List getContacts() { return find("private_key IS NULL"); @@ -155,12 +164,13 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito private void insert(BitmessageAddress address) throws IOException, SQLException { try (Connection connection = config.getConnection()) { PreparedStatement ps = connection.prepareStatement( - "INSERT INTO Address (address, alias, public_key, private_key, subscribed) VALUES (?, ?, ?, ?, ?)"); + "INSERT INTO Address (address, version, alias, public_key, private_key, subscribed) VALUES (?, ?, ?, ?, ?, ?)"); ps.setString(1, address.getAddress()); - ps.setString(2, address.getAlias()); - writePubkey(ps, 3, address.getPubkey()); - writeBlob(ps, 4, address.getPrivateKey()); - ps.setBoolean(5, address.isSubscribed()); + ps.setLong(2, address.getVersion()); + ps.setString(3, address.getAlias()); + writePubkey(ps, 4, address.getPubkey()); + writeBlob(ps, 5, address.getPrivateKey()); + ps.setBoolean(6, address.isSubscribed()); ps.executeUpdate(); } } diff --git a/repositories/src/main/resources/db/migration/V1.2__Create_address_table.sql b/repositories/src/main/resources/db/migration/V1.2__Create_address_table.sql index 821c8be..e8d40ed 100644 --- a/repositories/src/main/resources/db/migration/V1.2__Create_address_table.sql +++ b/repositories/src/main/resources/db/migration/V1.2__Create_address_table.sql @@ -1,5 +1,6 @@ CREATE TABLE Address ( address VARCHAR(40) NOT NULL PRIMARY KEY, + version BIGINT NOT NULL, alias VARCHAR(255), public_key BLOB, private_key BLOB, diff --git a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcAddressRepositoryTest.java b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcAddressRepositoryTest.java index 3d06a6b..80740ca 100644 --- a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcAddressRepositoryTest.java +++ b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcAddressRepositoryTest.java @@ -81,8 +81,26 @@ public class JdbcAddressRepositoryTest { @Test public void testGetSubscriptions() throws Exception { + addSubscription("BM-2cXxfcSetKnbHJX2Y85rSkaVpsdNUZ5q9h"); + addSubscription("BM-2D9Vc5rFxxR5vTi53T9gkLfemViHRMVLQZ"); + addSubscription("BM-2D9QKN4teYRvoq2fyzpiftPh9WP9qggtzh"); List subscriptions = repo.getSubscriptions(); - assertEquals(0, subscriptions.size()); + assertEquals(3, subscriptions.size()); + } + + @Test + public void testGetSubscriptionsForVersion() throws Exception { + addSubscription("BM-2cXxfcSetKnbHJX2Y85rSkaVpsdNUZ5q9h"); + addSubscription("BM-2D9Vc5rFxxR5vTi53T9gkLfemViHRMVLQZ"); + addSubscription("BM-2D9QKN4teYRvoq2fyzpiftPh9WP9qggtzh"); + + List subscriptions; + + subscriptions = repo.getSubscriptions(5); + assertEquals(1, subscriptions.size()); + + subscriptions = repo.getSubscriptions(4); + assertEquals(2, subscriptions.size()); } @Test @@ -123,4 +141,10 @@ public class JdbcAddressRepositoryTest { assertNotNull(address); assertNotNull(address.getPrivateKey()); } + + private void addSubscription(String address) { + BitmessageAddress subscription = new BitmessageAddress(address); + subscription.setSubscribed(true); + repo.save(subscription); + } } \ No newline at end of file