diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index 1497425..0e03880 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -43,11 +43,10 @@ public class Application { ctx = new BitmessageContext.Builder() .addressRepo(new JdbcAddressRepository(jdbcConfig)) .inventory(new JdbcInventory(jdbcConfig)) - .nodeRegistry(new JdbcNodeRegistry(jdbcConfig)) + .nodeRegistry(new MemoryNodeRegistry()) .messageRepo(new JdbcMessageRepository(jdbcConfig)) .networkHandler(new NetworkNode()) .port(48444) - .streams(1) .build(); ctx.startup(new BitmessageContext.Listener() { diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index 5414836..7445cd4 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -267,7 +267,6 @@ public class BitmessageContext { AddressRepository addressRepo; MessageRepository messageRepo; ProofOfWorkEngine proofOfWorkEngine; - TreeSet streams; public Builder() { } @@ -307,28 +306,12 @@ public class BitmessageContext { return this; } - public Builder streams(Collection streams) { - this.streams = new TreeSet<>(streams); - return this; - } - - public Builder streams(long... streams) { - this.streams = new TreeSet<>(); - for (long stream : streams) { - this.streams.add(stream); - } - return this; - } - public BitmessageContext build() { nonNull("inventory", inventory); nonNull("nodeRegistry", nodeRegistry); nonNull("networkHandler", networkHandler); nonNull("addressRepo", addressRepo); nonNull("messageRepo", messageRepo); - if (streams == null) { - streams(1); - } if (proofOfWorkEngine == null) { proofOfWorkEngine = new MultiThreadedPOWEngine(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java index e7f56f5..04e9398 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java @@ -49,7 +49,7 @@ public class InternalContext { private final MessageRepository messageRepository; private final ProofOfWorkEngine proofOfWorkEngine; - private final TreeSet streams; + private final TreeSet streams = new TreeSet<>(); private final int port; private long networkNonceTrialsPerByte = 1000; private long networkExtraBytes = 1000; @@ -65,7 +65,7 @@ public class InternalContext { this.clientNonce = Security.randomNonce(); port = builder.port; - streams = builder.streams; + streams.add(1L); // FIXME init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index fc7ee52..ed2f38a 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -16,14 +16,11 @@ package ch.dissem.bitmessage.ports; -import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.entity.ObjectMessage; -import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.utils.Property; import java.io.IOException; -import java.util.Map; /** * Handles incoming messages diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java index 32ef5eb..7eda028 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Collections.java @@ -57,4 +57,15 @@ public class Collections { } return result; } + + public static T selectRandom(Collection collection) { + int index = RANDOM.nextInt(collection.size()); + for (T item : collection) { + if (index == 0) { + return item; + } + index--; + } + throw new IllegalArgumentException("Empty collection? Size: " + collection.size()); + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java b/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java index ffd4df8..0d0d991 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java @@ -20,10 +20,14 @@ package ch.dissem.bitmessage.utils; * A simple utility class that simplifies using the second based time used in Bitmessage. */ public class UnixTime { + /** + * Length of a minute in seconds, intended for use with {@link #now(long)}. + */ + public static final int MINUTE = 60; /** * Length of an hour in seconds, intended for use with {@link #now(long)}. */ - public static final long HOUR = 60 * 60; + public static final long HOUR = 60 * MINUTE; /** * Length of a day in seconds, intended for use with {@link #now(long)}. */ diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 91bbc61..7dc07b2 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -37,14 +37,14 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Queue; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.State.*; +import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; /** * A connection to a specific node @@ -53,41 +53,44 @@ public class Connection implements Runnable { public static final int READ_TIMEOUT = 2000; private final static Logger LOG = LoggerFactory.getLogger(Connection.class); private static final int CONNECT_TIMEOUT = 5000; + private final ConcurrentMap ivCache; private InternalContext ctx; - private Mode mode; private State state; private Socket socket; private InputStream in; private OutputStream out; private MessageListener listener; - private int version; private long[] streams; - private NetworkAddress host; private NetworkAddress node; - private Queue sendingQueue = new ConcurrentLinkedDeque<>(); + private ConcurrentMap requestedObjects; - public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener) throws IOException { - this.ctx = context; - this.mode = mode; - this.state = CONNECTING; + public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, + ConcurrentMap requestedObjectsMap) throws IOException { + this(context, mode, listener, requestedObjectsMap); this.socket = socket; - this.listener = listener; - this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(); } - public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener) { + public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, + ConcurrentMap requestedObjectsMap) { + this(context, mode, listener, requestedObjectsMap); + this.socket = new Socket(); + this.node = node; + } + + private Connection(InternalContext context, Mode mode, MessageListener listener, + ConcurrentMap requestedObjectsMap) { this.ctx = context; this.mode = mode; this.state = CONNECTING; - this.socket = new Socket(); - this.node = node; this.listener = listener; + this.requestedObjects = requestedObjectsMap; this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); + ivCache = new ConcurrentHashMap<>(); } public Mode getMode() { @@ -199,13 +202,66 @@ public class Connection implements Runnable { } } + private void cleanupIvCache() { + Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); + for (Map.Entry entry : ivCache.entrySet()) { + if (entry.getValue() < fiveMinutesAgo) { + ivCache.remove(entry.getKey()); + } + } + } + + private void updateIvCache(InventoryVector... inventory) { + cleanupIvCache(); + Long now = UnixTime.now(); + for (InventoryVector iv : inventory) { + ivCache.put(iv, now); + } + } + + private void updateIvCache(List inventory) { + cleanupIvCache(); + Long now = UnixTime.now(); + for (InventoryVector iv : inventory) { + ivCache.put(iv, now); + } + } + + private void updateRequestedObjects(List missing) { + Long now = UnixTime.now(); + Long fiveMinutesAgo = now - 5 * MINUTE; + Long tenMinutesAgo = now - 10 * MINUTE; + List stillMissing = new LinkedList<>(); + for (Map.Entry entry : requestedObjects.entrySet()) { + if (entry.getValue() < fiveMinutesAgo) { + stillMissing.add(entry.getKey()); + // If it's still not available after 10 minutes, we won't look for it + // any longer (except it's announced again) + if (entry.getValue() < tenMinutesAgo) { + requestedObjects.remove(entry.getKey()); + } + } + } + + for (InventoryVector iv : missing) { + requestedObjects.put(iv, now); + } + if (!stillMissing.isEmpty()) { + LOG.debug(stillMissing.size() + " items are still missing."); + missing.addAll(stillMissing); + } + } + private void receiveMessage(MessagePayload messagePayload) { switch (messagePayload.getCommand()) { case INV: Inv inv = (Inv) messagePayload; + updateIvCache(inv.getInventory()); List missing = ctx.getInventory().getMissing(inv.getInventory(), streams); + missing.removeAll(requestedObjects.keySet()); LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are " + missing.size() + " missing."); + updateRequestedObjects(missing); send(new GetData.Builder().inventory(missing).build()); break; case GETDATA: @@ -276,6 +332,11 @@ public class Connection implements Runnable { sendingQueue.offer(new Inv.Builder() .addInventoryVector(iv) .build()); + updateIvCache(iv); + } + + public boolean knowsOf(InventoryVector iv) { + return ivCache.containsKey(iv); } @Override @@ -291,6 +352,13 @@ public class Connection implements Runnable { return Objects.hash(node); } + public void request(InventoryVector key) { + sendingQueue.offer(new GetData.Builder() + .addInventoryVector(key) + .build() + ); + } + public enum Mode {SERVER, CLIENT} public enum State {CONNECTING, ACTIVE, DISCONNECTED} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java index 47c01f6..f27f7ff 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -43,13 +45,17 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc; * Handles all the networky stuff. */ public class NetworkNode implements NetworkHandler, ContextHolder { + public final static int NETWORK_MAGIC_NUMBER = 8; private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class); private final ExecutorService pool; private final List connections = new LinkedList<>(); private InternalContext ctx; private ServerSocket serverSocket; + private Thread serverThread; private Thread connectionManager; + private ConcurrentMap requestedObjects = new ConcurrentHashMap<>(); + public NetworkNode() { pool = Executors.newCachedThreadPool(); } @@ -66,18 +72,21 @@ public class NetworkNode implements NetworkHandler, ContextHolder { } try { serverSocket = new ServerSocket(ctx.getPort()); - pool.execute(new Runnable() { + serverThread = new Thread(new Runnable() { @Override public void run() { - try { - Socket socket = serverSocket.accept(); - socket.setSoTimeout(10000); - startConnection(new Connection(ctx, SERVER, socket, listener)); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); + while (!serverSocket.isClosed()) { + try { + Socket socket = serverSocket.accept(); + socket.setSoTimeout(Connection.READ_TIMEOUT); + startConnection(new Connection(ctx, SERVER, socket, listener, requestedObjects)); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } } } - }); + }, "server"); + serverThread.start(); connectionManager = new Thread(new Runnable() { @Override public void run() { @@ -96,10 +105,11 @@ public class NetworkNode implements NetworkHandler, ContextHolder { } } } - if (active < 8) { - List addresses = ctx.getNodeRegistry().getKnownAddresses(8 - active, ctx.getStreams()); + if (active < NETWORK_MAGIC_NUMBER) { + List addresses = ctx.getNodeRegistry().getKnownAddresses( + NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); for (NetworkAddress address : addresses) { - startConnection(new Connection(ctx, CLIENT, address, listener)); + startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects)); } } Thread.sleep(30000); @@ -126,12 +136,12 @@ public class NetworkNode implements NetworkHandler, ContextHolder { } catch (IOException e) { LOG.debug(e.getMessage(), e); } + pool.shutdown(); synchronized (connections) { for (Connection c : connections) { c.disconnect(); } } - pool.shutdown(); } private void startConnection(Connection c) { @@ -147,17 +157,17 @@ public class NetworkNode implements NetworkHandler, ContextHolder { @Override public void offer(final InventoryVector iv) { - List active = new LinkedList<>(); + List target = new LinkedList<>(); synchronized (connections) { for (Connection connection : connections) { - if (connection.getState() == ACTIVE) { - active.add(connection); + if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { + target.add(connection); } } } - LOG.debug(active.size() + " connections available to offer " + iv); - List random8 = Collections.selectRandom(8, active); - for (Connection connection : random8) { + LOG.debug(target.size() + " connections available to offer " + iv); + List randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target); + for (Connection connection : randomSubset) { connection.offer(iv); } } diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java deleted file mode 100644 index 4b32946..0000000 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.repository; - -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.ports.NodeRegistry; -import ch.dissem.bitmessage.utils.UnixTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.sql.*; -import java.util.LinkedList; -import java.util.List; -import java.util.Scanner; - -import static ch.dissem.bitmessage.utils.UnixTime.HOUR; - -public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { - private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class); - - public JdbcNodeRegistry(JdbcConfig config) { - super(config); - } - - @Override - public List getKnownAddresses(int limit, long... streams) { - List result = doGetKnownNodes(limit, streams); - if (result.isEmpty()) { - try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { - Scanner scanner = new Scanner(in); - long stream = 1; - while (scanner.hasNext()) { - try { - String line = scanner.nextLine().trim(); - if (line.startsWith("#") || line.isEmpty()) { - // Ignore - continue; - } - if (line.startsWith("[stream")) { - stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); - } else { - int portIndex = line.lastIndexOf(':'); - InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex)); - int port = Integer.valueOf(line.substring(portIndex + 1)); - result.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); - } - } catch (IOException e) { - LOG.warn(e.getMessage(), e); - } - } - offerAddresses(result); - return doGetKnownNodes(limit, streams); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - return result; - } - - private List doGetKnownNodes(int limit, long... streams) { - List result = new LinkedList<>(); - try (Connection connection = config.getConnection()) { - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE stream IN (" + join(streams) + ") ORDER BY RANDOM() LIMIT " + limit); - while (rs.next()) { - result.add(new NetworkAddress.Builder() - .ipv6(rs.getBytes("ip")) - .port(rs.getInt("port")) - .services(rs.getLong("services")) - .stream(rs.getLong("stream")) - .time(rs.getLong("time")) - .build()); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - return result; - } - - @Override - public void offerAddresses(List addresses) { - try (Connection connection = config.getConnection()) { - PreparedStatement exists = connection.prepareStatement("SELECT time FROM Node WHERE ip = ? AND port = ? AND stream = ?"); - PreparedStatement insert = connection.prepareStatement( - "INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)"); - PreparedStatement update = connection.prepareStatement( - "UPDATE Node SET services = ?, time = ? WHERE ip = ? AND port = ? AND stream = ?"); - connection.setAutoCommit(false); - for (NetworkAddress node : addresses) { - exists.setBytes(1, node.getIPv6()); - exists.setInt(2, node.getPort()); - exists.setLong(3, node.getStream()); - ResultSet lastConnectionTime = exists.executeQuery(); - if (lastConnectionTime.next()) { - long time = lastConnectionTime.getLong("time"); - if (time < node.getTime() && node.getTime() <= UnixTime.now()) { - time = node.getTime(); - update.setLong(1, node.getServices()); - update.setLong(2, time); - - update.setBytes(3, node.getIPv6()); - update.setInt(4, node.getPort()); - update.setLong(5, node.getStream()); - update.executeUpdate(); - } - } else if (node.getTime() <= UnixTime.now()) { - insert.setBytes(1, node.getIPv6()); - insert.setInt(2, node.getPort()); - insert.setLong(3, node.getServices()); - insert.setLong(4, node.getStream()); - insert.setLong(5, node.getTime()); - insert.executeUpdate(); - } - connection.commit(); - } - if (addresses.size() > 100) { - // Let's clean up after we received an update from another node. This way, we shouldn't end up with an - // empty node registry. - PreparedStatement cleanup = connection.prepareStatement("DELETE FROM Node WHERE time < ?"); - cleanup.setLong(1, UnixTime.now(-3 * HOUR)); - cleanup.execute(); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - } -} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/MemoryNodeRegistry.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/MemoryNodeRegistry.java new file mode 100644 index 0000000..15a4134 --- /dev/null +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/MemoryNodeRegistry.java @@ -0,0 +1,115 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.repository; + +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.ports.NodeRegistry; +import ch.dissem.bitmessage.utils.UnixTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static ch.dissem.bitmessage.utils.Collections.selectRandom; +import static ch.dissem.bitmessage.utils.UnixTime.HOUR; +import static java.util.Collections.newSetFromMap; + +public class MemoryNodeRegistry implements NodeRegistry { + private static final Logger LOG = LoggerFactory.getLogger(MemoryNodeRegistry.class); + + private final Map> stableNodes = new HashMap<>(); + private final Map> knownNodes = new ConcurrentHashMap<>(); + + public MemoryNodeRegistry() { + try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { + Scanner scanner = new Scanner(in); + long stream = 0; + Set streamSet = null; + while (scanner.hasNext()) { + try { + String line = scanner.nextLine().trim(); + if (line.startsWith("#") || line.isEmpty()) { + // Ignore + continue; + } + if (line.startsWith("[stream")) { + stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); + streamSet = new HashSet<>(); + stableNodes.put(stream, streamSet); + } else if (streamSet != null) { + int portIndex = line.lastIndexOf(':'); + InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex)); + int port = Integer.valueOf(line.substring(portIndex + 1)); + streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List getKnownAddresses(int limit, long... streams) { + List result = new LinkedList<>(); + for (long stream : streams) { + Set known = knownNodes.get(stream); + if (known != null && !known.isEmpty()) { + for (NetworkAddress node : known) { + if (node.getTime() > UnixTime.now(-3 * HOUR)) { + result.add(node); + } else { + known.remove(node); + } + } + } else if (stableNodes.containsKey(stream)) { + // To reduce load on stable nodes, only return one + result.add(selectRandom(stableNodes.get(stream))); + } + } + return selectRandom(limit, result); + } + + @Override + public void offerAddresses(List addresses) { + for (NetworkAddress node : addresses) { + if (node.getTime() <= UnixTime.now()) { + if (!knownNodes.containsKey(node.getStream())) { + synchronized (knownNodes) { + if (!knownNodes.containsKey(node.getStream())) { + knownNodes.put( + node.getStream(), + newSetFromMap(new ConcurrentHashMap()) + ); + } + } + } + if (node.getTime() <= UnixTime.now()) { + // TODO: This isn't quite correct + // If the node is already known, the one with the more recent time should be used + knownNodes.get(node.getStream()).add(node); + } + } + } + } +} diff --git a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java index 2792cb0..d668822 100644 --- a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java +++ b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcNodeRegistryTest.java @@ -35,7 +35,7 @@ public class JdbcNodeRegistryTest { public void setUp() throws Exception { config = new TestJdbcConfig(); config.reset(); - registry = new JdbcNodeRegistry(config); + registry = new MemoryNodeRegistry(); registry.offerAddresses(Arrays.asList( createAddress(1, 8444, 1, now()),