From dad05d835b8cab9fdbebb60c58c546df809f7d8b Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Thu, 1 Sep 2016 07:35:46 +0200 Subject: [PATCH] Created an improved JdbcNodeRegistry and removed MemoryNodeRegistry, as it doesn't properly work with the way nodes are handled and disseminated in the new PyBitmessage client. The new one should work a lot more stable. --- .../bitmessage/ports/MemoryNodeRegistry.java | 125 ------------- .../bitmessage/ports/NodeRegistryHelper.java | 54 ++++++ .../bitmessage/ports/NodeRegistryTest.java | 99 ----------- .../java/ch/dissem/bitmessage/demo/Main.java | 3 +- .../networking/nio/ConnectionInfo.java | 2 +- .../networking/nio/NioNetworkHandler.java | 8 +- .../repository/JdbcNodeRegistry.java | 167 ++++++++++++++++++ .../db/migration/V3.3__Create_table_node.sql | 9 + .../repository/JdbcNodeRegistryTest.java | 48 +++-- 9 files changed, 267 insertions(+), 248 deletions(-) delete mode 100644 core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java create mode 100644 core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistryHelper.java delete mode 100644 core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java create mode 100644 repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java create mode 100644 repositories/src/main/resources/db/migration/V3.3__Create_table_node.sql diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java b/core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java deleted file mode 100644 index a56b4ff..0000000 --- a/core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.ports; - -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.ApplicationException; -import ch.dissem.bitmessage.utils.UnixTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -import static ch.dissem.bitmessage.utils.Collections.selectRandom; -import static ch.dissem.bitmessage.utils.UnixTime.HOUR; -import static java.util.Collections.newSetFromMap; - -public class MemoryNodeRegistry implements NodeRegistry { - private static final Logger LOG = LoggerFactory.getLogger(MemoryNodeRegistry.class); - - private final Map> stableNodes = new ConcurrentHashMap<>(); - private final Map> knownNodes = new ConcurrentHashMap<>(); - - private void loadStableNodes() { - try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { - Scanner scanner = new Scanner(in); - long stream = 0; - Set streamSet = null; - while (scanner.hasNext()) { - try { - String line = scanner.nextLine().trim(); - if (line.startsWith("[stream")) { - stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); - streamSet = new HashSet<>(); - stableNodes.put(stream, streamSet); - } else if (streamSet != null && !line.isEmpty() && !line.startsWith("#")) { - int portIndex = line.lastIndexOf(':'); - InetAddress[] inetAddresses = InetAddress.getAllByName(line.substring(0, portIndex)); - int port = Integer.valueOf(line.substring(portIndex + 1)); - for (InetAddress inetAddress : inetAddresses) { - streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); - } - } - } catch (IOException e) { - LOG.warn(e.getMessage(), e); - } - } - if (LOG.isDebugEnabled()) { - for (Map.Entry> e : stableNodes.entrySet()) { - LOG.debug("Stream " + e.getKey() + ": loaded " + e.getValue().size() + " bootstrap nodes."); - } - } - } catch (IOException e) { - throw new ApplicationException(e); - } - } - - @Override - public List getKnownAddresses(int limit, long... streams) { - List result = new LinkedList<>(); - for (long stream : streams) { - Set known = knownNodes.get(stream); - if (known != null && !known.isEmpty()) { - for (NetworkAddress node : known) { - if (node.getTime() > UnixTime.now(-3 * HOUR)) { - result.add(node); - } else { - known.remove(node); - } - } - } - if (result.isEmpty()) { - if (stableNodes.isEmpty() || stableNodes.get(stream).isEmpty()) { - loadStableNodes(); - } - Set nodes = stableNodes.get(stream); - if (nodes != null && !nodes.isEmpty()) { - // To reduce load on stable nodes, only return one - result.add(selectRandom(nodes)); - } - } - } - return selectRandom(limit, result); - } - - @Override - public void offerAddresses(List addresses) { - for (NetworkAddress node : addresses) { - if (node.getTime() <= UnixTime.now()) { - if (!knownNodes.containsKey(node.getStream())) { - synchronized (knownNodes) { - if (!knownNodes.containsKey(node.getStream())) { - knownNodes.put( - node.getStream(), - newSetFromMap(new ConcurrentHashMap()) - ); - } - } - } - if (node.getTime() <= UnixTime.now()) { - // TODO: This isn't quite correct - // If the node is already known, the one with the more recent time should be used - knownNodes.get(node.getStream()).add(node); - } - } - } - } -} diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistryHelper.java b/core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistryHelper.java new file mode 100644 index 0000000..63d70eb --- /dev/null +++ b/core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistryHelper.java @@ -0,0 +1,54 @@ +package ch.dissem.bitmessage.ports; + +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.exception.ApplicationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.util.*; + +/** + * Helper class to kick start node registries. + */ +public class NodeRegistryHelper { + private static final Logger LOG = LoggerFactory.getLogger(NodeRegistryHelper.class); + + public static Map> loadStableNodes() { + try (InputStream in = NodeRegistryHelper.class.getClassLoader().getResourceAsStream("nodes.txt")) { + Scanner scanner = new Scanner(in); + long stream = 0; + Map> result = new HashMap<>(); + Set streamSet = null; + while (scanner.hasNext()) { + try { + String line = scanner.nextLine().trim(); + if (line.startsWith("[stream")) { + stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); + streamSet = new HashSet<>(); + result.put(stream, streamSet); + } else if (streamSet != null && !line.isEmpty() && !line.startsWith("#")) { + int portIndex = line.lastIndexOf(':'); + InetAddress[] inetAddresses = InetAddress.getAllByName(line.substring(0, portIndex)); + int port = Integer.valueOf(line.substring(portIndex + 1)); + for (InetAddress inetAddress : inetAddresses) { + streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); + } + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + } + } + if (LOG.isDebugEnabled()) { + for (Map.Entry> e : result.entrySet()) { + LOG.debug("Stream " + e.getKey() + ": loaded " + e.getValue().size() + " bootstrap nodes."); + } + } + return result; + } catch (IOException e) { + throw new ApplicationException(e); + } + } +} diff --git a/core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java b/core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java deleted file mode 100644 index 993e3a9..0000000 --- a/core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.ports; - -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.utils.UnixTime; -import org.junit.Test; - -import java.util.Arrays; - -import static ch.dissem.bitmessage.utils.UnixTime.HOUR; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.assertThat; - -public class NodeRegistryTest { - private NodeRegistry registry = new MemoryNodeRegistry(); - - @Test - public void ensureGetKnownNodesWithoutStreamsYieldsEmpty() { - assertThat(registry.getKnownAddresses(10), empty()); - } - - /** - * Please note that this test fails if there is no internet connection, - * as the initial nodes' IP addresses are determined by DNS lookup. - */ - @Test - public void ensureGetKnownNodesForStream1YieldsResult() { - assertThat(registry.getKnownAddresses(10, 1), hasSize(1)); - } - - @Test - public void ensureNodeIsStored() { - registry.offerAddresses(Arrays.asList( - new NetworkAddress.Builder() - .ipv4(127, 0, 0, 1) - .port(42) - .stream(1) - .time(UnixTime.now()) - .build(), - new NetworkAddress.Builder() - .ipv4(127, 0, 0, 2) - .port(42) - .stream(1) - .time(UnixTime.now()) - .build(), - new NetworkAddress.Builder() - .ipv4(127, 0, 0, 2) - .port(42) - .stream(2) - .time(UnixTime.now()) - .build() - )); - assertThat(registry.getKnownAddresses(10, 1).size(), is(2)); - assertThat(registry.getKnownAddresses(10, 2).size(), is(1)); - assertThat(registry.getKnownAddresses(10, 1, 2).size(), is(3)); - } - - @Test - public void ensureOldNodesAreRemoved() { - registry.offerAddresses(Arrays.asList( - new NetworkAddress.Builder() - .ipv4(127, 0, 0, 1) - .port(42) - .stream(1) - .time(UnixTime.now()) - .build(), - new NetworkAddress.Builder() - .ipv4(127, 0, 0, 2) - .port(42) - .stream(1) - .time(UnixTime.now(-4 * HOUR)) - .build(), - new NetworkAddress.Builder() - .ipv4(127, 0, 0, 2) - .port(42) - .stream(2) - .time(UnixTime.now()) - .build() - )); - assertThat(registry.getKnownAddresses(10, 1).size(), is(1)); - assertThat(registry.getKnownAddresses(10, 2).size(), is(1)); - assertThat(registry.getKnownAddresses(10, 1, 2).size(), is(2)); - } -} diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java index 84e8ce0..c2cc7a5 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java @@ -20,7 +20,6 @@ import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.networking.nio.NioNetworkHandler; -import ch.dissem.bitmessage.ports.MemoryNodeRegistry; import ch.dissem.bitmessage.ports.NodeRegistry; import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.wif.WifExporter; @@ -82,7 +81,7 @@ public class Main { } }); } else { - ctxBuilder.nodeRegistry(new MemoryNodeRegistry()); + ctxBuilder.nodeRegistry(new JdbcNodeRegistry(jdbcConfig)); } if (options.exportWIF != null || options.importWIF != null) { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index fb4cf58..8ba5b66 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -129,7 +129,7 @@ public class ConnectionInfo extends AbstractConnection { } @Override - public synchronized void disconnect() { + public void disconnect() { super.disconnect(); if (reader != null) { reader.cleanup(); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index 7f04aa1..0cb0af9 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -26,7 +26,7 @@ import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.ports.NetworkHandler; -import ch.dissem.bitmessage.utils.Property; +import ch.dissem.bitmessage.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +37,7 @@ import java.net.NoRouteToHostException; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; +import java.util.Collections; import java.util.concurrent.*; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*; @@ -193,7 +194,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } } if (missing > 0) { - List addresses = ctx.getNodeRegistry().getKnownAddresses(missing, ctx.getStreams()); + List addresses = ctx.getNodeRegistry().getKnownAddresses(100, ctx.getStreams()); + addresses = selectRandom(missing, addresses); for (NetworkAddress address : addresses) { if (isConnectedTo(address)) { continue; @@ -389,7 +391,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex ConnectionInfo previous = null; do { for (ConnectionInfo connection : distribution.keySet()) { - if (connection == previous) { + if (connection == previous || previous == null) { next = iterator.next(); } if (connection.knowsOf(next)) { diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java new file mode 100644 index 0000000..7374447 --- /dev/null +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java @@ -0,0 +1,167 @@ +package ch.dissem.bitmessage.repository; + +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.exception.ApplicationException; +import ch.dissem.bitmessage.ports.NodeRegistry; +import ch.dissem.bitmessage.utils.Collections; +import ch.dissem.bitmessage.utils.SqlStrings; +import ch.dissem.bitmessage.utils.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static ch.dissem.bitmessage.ports.NodeRegistryHelper.loadStableNodes; +import static ch.dissem.bitmessage.utils.UnixTime.*; + +public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { + private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class); + private Map> stableNodes; + + public JdbcNodeRegistry(JdbcConfig config) { + super(config); + cleanUp(); + } + + private void cleanUp() { + try ( + Connection connection = config.getConnection(); + PreparedStatement ps = connection.prepareStatement( + "DELETE FROM Node WHERE time getKnownAddresses(int limit, long... streams) { + List result = new LinkedList<>(); + String query = + "SELECT stream, address, port, services, time" + + " FROM Node WHERE stream IN (" + SqlStrings.join(streams) + ")" + + " ORDER BY TIME DESC" + + " LIMIT " + limit; + try ( + Connection connection = config.getConnection(); + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(query) + ) { + while (rs.next()) { + result.add( + new NetworkAddress.Builder() + .stream(rs.getLong("stream")) + .ipv6(rs.getBytes("address")) + .port(rs.getInt("port")) + .services(rs.getLong("services")) + .time(rs.getLong("time")) + .build() + ); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ApplicationException(e); + } + if (result.isEmpty()) { + synchronized (this) { + if (stableNodes == null) { + stableNodes = loadStableNodes(); + } + } + for (long stream : streams) { + Set nodes = stableNodes.get(stream); + if (nodes != null) { + result.add(Collections.selectRandom(nodes)); + } + } + } + return result; + } + + @Override + public void offerAddresses(List nodes) { + cleanUp(); + nodes.stream() + .filter(node -> node.getTime() < now(+24 * HOUR) && node.getTime() > now(-28 * DAY)) + .forEach(node -> { + synchronized (this) { + NetworkAddress existing = loadExisting(node); + if (existing == null) { + insert(node); + } else if (node.getTime() > existing.getTime()) { + update(node); + } + } + }); + } + + private void insert(NetworkAddress node) { + try ( + Connection connection = config.getConnection(); + PreparedStatement ps = connection.prepareStatement( + "INSERT INTO Node (stream, address, port, services, time) " + + "VALUES (?, ?, ?, ?, ?)") + ) { + ps.setLong(1, node.getStream()); + ps.setBytes(2, node.getIPv6()); + ps.setInt(3, node.getPort()); + ps.setLong(4, node.getServices()); + ps.setLong(5, node.getTime()); + ps.executeUpdate(); + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + } + + private void update(NetworkAddress node) { + try ( + Connection connection = config.getConnection(); + PreparedStatement ps = connection.prepareStatement( + "UPDATE Node SET services=?, time=? WHERE stream=? AND address=? AND port=?") + ) { + ps.setLong(1, node.getServices()); + ps.setLong(2, node.getTime()); + ps.setLong(3, node.getStream()); + ps.setBytes(4, node.getIPv6()); + ps.setInt(5, node.getPort()); + ps.executeUpdate(); + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + } +} diff --git a/repositories/src/main/resources/db/migration/V3.3__Create_table_node.sql b/repositories/src/main/resources/db/migration/V3.3__Create_table_node.sql new file mode 100644 index 0000000..5d03bb5 --- /dev/null +++ b/repositories/src/main/resources/db/migration/V3.3__Create_table_node.sql @@ -0,0 +1,9 @@ +CREATE TABLE Node ( + stream BIGINT NOT NULL, + address BINARY(32) NOT NULL, + port INT NOT NULL, + services BIGINT NOT NULL, + time BIGINT NOT NULL, + PRIMARY KEY (stream, address, port) +); +CREATE INDEX idx_time on Node(time); diff --git a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java index f56d973..48ae664 100644 --- a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java +++ b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java @@ -17,8 +17,8 @@ package ch.dissem.bitmessage.repository; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.ports.MemoryNodeRegistry; import ch.dissem.bitmessage.ports.NodeRegistry; +import ch.dissem.bitmessage.utils.UnixTime; import org.junit.Before; import org.junit.Test; @@ -27,8 +27,15 @@ import java.util.Collections; import java.util.List; import static ch.dissem.bitmessage.utils.UnixTime.now; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +/** + * Please note that some tests fail if there is no internet connection, + * as the initial nodes' IP addresses are determined by DNS lookup. + */ public class JdbcNodeRegistryTest extends TestBase { private TestJdbcConfig config; private NodeRegistry registry; @@ -37,21 +44,26 @@ public class JdbcNodeRegistryTest extends TestBase { public void setUp() throws Exception { config = new TestJdbcConfig(); config.reset(); - registry = new MemoryNodeRegistry(); + registry = new JdbcNodeRegistry(config); registry.offerAddresses(Arrays.asList( - createAddress(1, 8444, 1, now()), - createAddress(2, 8444, 1, now()), - createAddress(3, 8444, 1, now()), - createAddress(4, 8444, 2, now()) + createAddress(1, 8444, 1, now()), + createAddress(2, 8444, 1, now()), + createAddress(3, 8444, 1, now()), + createAddress(4, 8444, 2, now()) )); } @Test - public void testInitNodes() throws Exception { + public void ensureGetKnownNodesWithoutStreamsYieldsEmpty() { + assertThat(registry.getKnownAddresses(10), empty()); + } + + @Test + public void ensurePredefinedNodeIsReturnedWhenDatabaseIsEmpty() throws Exception { config.reset(); List knownAddresses = registry.getKnownAddresses(2, 1); - assertEquals(2, knownAddresses.size()); + assertEquals(1, knownAddresses.size()); } @Test @@ -66,16 +78,16 @@ public class JdbcNodeRegistryTest extends TestBase { @Test public void testOfferAddresses() throws Exception { registry.offerAddresses(Arrays.asList( - createAddress(1, 8444, 1, now()), - createAddress(10, 8444, 1, now()), - createAddress(11, 8444, 1, now()) + createAddress(1, 8444, 1, now()), + createAddress(10, 8444, 1, now()), + createAddress(11, 8444, 1, now()) )); List knownAddresses = registry.getKnownAddresses(1000, 1); assertEquals(5, knownAddresses.size()); registry.offerAddresses(Collections.singletonList( - createAddress(1, 8445, 1, now()) + createAddress(1, 8445, 1, now()) )); knownAddresses = registry.getKnownAddresses(1000, 1); @@ -84,10 +96,10 @@ public class JdbcNodeRegistryTest extends TestBase { private NetworkAddress createAddress(int lastByte, int port, long stream, long time) { return new NetworkAddress.Builder() - .ipv6(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, lastByte) - .port(port) - .stream(stream) - .time(time) - .build(); + .ipv6(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, lastByte) + .port(port) + .stream(stream) + .time(time) + .build(); } -} \ No newline at end of file +}