diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index ac80aff..f7d6fc3 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -82,13 +82,12 @@ public abstract class AbstractConnection { this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); if (threadsafe) { this.ivCache = new ConcurrentHashMap<>(); - this.sendingQueue = new ConcurrentLinkedDeque<>(); this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap(10_000)); } else { this.ivCache = new HashMap<>(); - this.sendingQueue = new LinkedList<>(); this.requestedObjects = new HashSet<>(); } + this.sendingQueue = new ConcurrentLinkedDeque<>(); this.state = CONNECTING; this.commonRequestedObjects = commonRequestedObjects; } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index 5e6f056..096c229 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -37,17 +37,21 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*; import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED; +import static ch.dissem.bitmessage.utils.Collections.selectRandom; import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; +import static java.util.Collections.newSetFromMap; +import static java.util.Collections.synchronizedSet; /** * Network handler using java.nio, resulting in less threads. @@ -64,6 +68,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private InternalContext ctx; private Selector selector; private ServerSocketChannel serverChannel; + private Set connections = synchronizedSet(newSetFromMap(new WeakHashMap())); @Override public Future synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) { @@ -77,6 +82,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), listener, new HashSet(), timeoutInSeconds); + connections.add(connection); while (channel.isConnected() && !connection.isSyncFinished()) { write(requestedObjects, channel, connection); read(channel, connection); @@ -121,8 +127,12 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) { return (CustomMessage) networkMessage.getPayload(); } else { - throw new NodeException("Unexpected response from node " + - server + ": " + networkMessage.getPayload().getCommand()); + if (networkMessage == null || networkMessage.getPayload() == null) { + throw new NodeException("Empty response from node " + server); + } else { + throw new NodeException("Unexpected response from node " + server + ": " + + networkMessage.getPayload().getClass()); + } } } catch (IOException e) { throw new ApplicationException(e); @@ -153,12 +163,13 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex try { SocketChannel accepted = serverChannel.accept(); accepted.configureBlocking(false); - accepted.register(selector, OP_READ | OP_WRITE, - new ConnectionInfo(ctx, SERVER, - new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), - listener, - requestedObjects, 0 - )); + ConnectionInfo connection = new ConnectionInfo(ctx, SERVER, + new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), + listener, + requestedObjects, 0 + ); + accepted.register(selector, OP_READ | OP_WRITE, connection); + connections.add(connection); } catch (AsynchronousCloseException ignore) { LOG.trace(ignore.getMessage()); } catch (IOException e) { @@ -186,12 +197,13 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex SocketChannel channel = SocketChannel.open( new InetSocketAddress(address.toInetAddress(), address.getPort())); channel.configureBlocking(false); - channel.register(selector, OP_READ | OP_WRITE, - new ConnectionInfo(ctx, CLIENT, - address, - listener, - requestedObjects, 0 - )); + ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, + address, + listener, + requestedObjects, 0 + ); + channel.register(selector, OP_READ | OP_WRITE, connection); + connections.add(connection); } catch (AsynchronousCloseException ignore) { } catch (IOException e) { LOG.error(e.getMessage(), e); @@ -235,6 +247,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } else { key.interestOps(OP_READ | OP_WRITE); } + if (connection.getState() == DISCONNECTED) { + connections.remove(connection); + } } keyIterator.remove(); } @@ -292,9 +307,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex public void stop() { try { serverChannel.socket().close(); - Iterator iterator = selector.keys().iterator(); - while (iterator.hasNext()) { - iterator.next().channel().close(); + for (SelectionKey selectionKey : selector.keys()) { + selectionKey.channel().close(); } selector.close(); } catch (IOException e) { @@ -304,12 +318,64 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex @Override public void offer(InventoryVector iv) { - // TODO + List target = new LinkedList<>(); + for (ConnectionInfo connection : connections) { + if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { + target.add(connection); + } + } + List randomSubset = selectRandom(NETWORK_MAGIC_NUMBER, target); + for (ConnectionInfo connection : randomSubset) { + connection.offer(iv); + } } @Override public void request(Collection inventoryVectors) { - // TODO + if (!isRunning()) return; + Iterator iterator = inventoryVectors.iterator(); + if (!iterator.hasNext()) { + return; + } + + Map> distribution = new HashMap<>(); + for (ConnectionInfo connection : connections) { + if (connection.getState() == ACTIVE) { + distribution.put(connection, new LinkedList()); + } + } + InventoryVector next = iterator.next(); + ConnectionInfo previous = null; + do { + for (ConnectionInfo connection : distribution.keySet()) { + if (connection == previous) { + next = iterator.next(); + } + if (connection.knowsOf(next)) { + List ivs = distribution.get(connection); + if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { + connection.send(new GetData.Builder().inventory(ivs).build()); + ivs.clear(); + } + ivs.add(next); + iterator.remove(); + + if (iterator.hasNext()) { + next = iterator.next(); + previous = connection; + } else { + break; + } + } + } + } while (iterator.hasNext()); + + for (ConnectionInfo connection : distribution.keySet()) { + List ivs = distribution.get(connection); + if (!ivs.isEmpty()) { + connection.send(new GetData.Builder().inventory(ivs).build()); + } + } } @Override @@ -318,17 +384,14 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex TreeMap incomingConnections = new TreeMap<>(); TreeMap outgoingConnections = new TreeMap<>(); - for (SelectionKey key : selector.keys()) { - if (key.attachment() instanceof ConnectionInfo) { - ConnectionInfo connection = (ConnectionInfo) key.attachment(); - if (connection.getState() == ACTIVE) { - long stream = connection.getNode().getStream(); - streams.add(stream); - if (connection.getMode() == SERVER) { - inc(incomingConnections, stream); - } else { - inc(outgoingConnections, stream); - } + for (ConnectionInfo connection : connections) { + if (connection.getState() == ACTIVE) { + long stream = connection.getNode().getStream(); + streams.add(stream); + if (connection.getMode() == SERVER) { + inc(incomingConnections, stream); + } else { + inc(outgoingConnections, stream); } } } diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index 24cedf7..85dec9e 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -66,7 +66,7 @@ public class NetworkHandlerTest { private final NetworkHandler nodeNetworkHandler; @Rule - public final TestRule timeout = new DisableOnDebug(Timeout.seconds(5)); + public final TestRule timeout = new DisableOnDebug(Timeout.seconds(10)); public NetworkHandlerTest(NetworkHandler peer, NetworkHandler node) { this.peerNetworkHandler = peer; diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java index 403754a..e2e247a 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java @@ -121,7 +121,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements builder.retries(rs.getInt("retries")); builder.nextTry(rs.getLong("next_try")); builder.labels(findLabels(connection, - "WHERE id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord")); + "id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord")); Plaintext message = builder.build(); message.setInitialHash(rs.getBytes("initial_hash")); result.add(message);