diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index 73dce63..4ad759a 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -54,7 +54,7 @@ public abstract class AbstractConnection { protected final NetworkHandler.MessageListener listener; protected final Map ivCache; protected final Deque sendingQueue; - protected final Set commonRequestedObjects; + protected final Map commonRequestedObjects; protected final Set requestedObjects; protected volatile State state; @@ -71,7 +71,7 @@ public abstract class AbstractConnection { public AbstractConnection(InternalContext context, Mode mode, NetworkAddress node, - Set commonRequestedObjects, + Map commonRequestedObjects, long syncTimeout) { this.ctx = context; this.mode = mode; @@ -143,7 +143,7 @@ public abstract class AbstractConnection { int originalSize = inv.getInventory().size(); updateIvCache(inv.getInventory()); List missing = ctx.getInventory().getMissing(inv.getInventory(), streams); - missing.removeAll(commonRequestedObjects); + missing.removeAll(commonRequestedObjects.keySet()); LOG.trace("Received inventory with " + originalSize + " elements, of which are " + missing.size() + " missing."); send(new GetData.Builder().inventory(missing).build()); @@ -175,7 +175,7 @@ public abstract class AbstractConnection { } catch (IOException e) { LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); } finally { - if (!commonRequestedObjects.remove(objectMessage.getInventoryVector())) { + if (commonRequestedObjects.remove(objectMessage.getInventoryVector()) == null) { LOG.debug("Received object that wasn't requested."); } } @@ -205,6 +205,10 @@ public abstract class AbstractConnection { return ivCache.containsKey(iv); } + public boolean requested(InventoryVector iv) { + return requestedObjects.contains(iv); + } + private void cleanupIvCache() { Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); for (Map.Entry entry : ivCache.entrySet()) { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 64772e5..4dcf29c 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -36,9 +36,9 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; -import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; -import java.util.Set; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; @@ -64,20 +64,20 @@ class Connection extends AbstractConnection { private boolean socketInitialized; public Connection(InternalContext context, Mode mode, Socket socket, - Set requestedObjectsMap) throws IOException { + Map requestedObjectsMap) throws IOException { this(context, mode, socket, requestedObjectsMap, - new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), - 0); + new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), + 0); } public Connection(InternalContext context, Mode mode, NetworkAddress node, - Set requestedObjectsMap) { + Map requestedObjectsMap) { this(context, mode, new Socket(), requestedObjectsMap, - node, 0); + node, 0); } private Connection(InternalContext context, Mode mode, Socket socket, - Set commonRequestedObjects, NetworkAddress node, long syncTimeout) { + Map commonRequestedObjects, NetworkAddress node, long syncTimeout) { super(context, mode, node, commonRequestedObjects, syncTimeout); this.startTime = UnixTime.now(); this.socket = socket; @@ -86,9 +86,9 @@ class Connection extends AbstractConnection { public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, long timeoutInSeconds) throws IOException { return new Connection(ctx, SYNC, new Socket(address, port), - new HashSet(), - new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), - timeoutInSeconds); + new HashMap(), + new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), + timeoutInSeconds); } public long getStartTime() { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java index 42021f6..6d7ae04 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java @@ -18,7 +18,6 @@ package ch.dissem.bitmessage.networking; import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +31,13 @@ import static ch.dissem.bitmessage.networking.DefaultNetworkHandler.NETWORK_MAGI /** * @author Christian Basler */ +@Deprecated +@SuppressWarnings("deprecation") public class ConnectionOrganizer implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class); private final InternalContext ctx; private final DefaultNetworkHandler networkHandler; - private final NetworkHandler.MessageListener listener; private Connection initialConnection; @@ -45,7 +45,6 @@ public class ConnectionOrganizer implements Runnable { DefaultNetworkHandler networkHandler) { this.ctx = ctx; this.networkHandler = networkHandler; - this.listener = ctx.getNetworkListener(); } @Override @@ -87,7 +86,7 @@ public class ConnectionOrganizer implements Runnable { if (active < NETWORK_MAGIC_NUMBER) { List addresses = ctx.getNodeRegistry().getKnownAddresses( - NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); + NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); boolean first = active == 0 && initialConnection == null; for (NetworkAddress address : addresses) { Connection c = new Connection(ctx, CLIENT, address, networkHandler.requestedObjects); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index 1af62b5..b0966d4 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -39,7 +39,6 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; -import static java.util.Collections.newSetFromMap; /** * Handles all the networky stuff. @@ -59,7 +58,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { private ServerRunnable server; private volatile boolean running; - final Set requestedObjects = newSetFromMap(new ConcurrentHashMap(50_000)); + final Map requestedObjects = new ConcurrentHashMap<>(50_000); @Override public void setContext(InternalContext context) { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java index 3c67b95..0a4e7a2 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java @@ -31,6 +31,7 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; /** * @author Christian Basler */ +@Deprecated public class ServerRunnable implements Runnable, Closeable { private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class); private final InternalContext ctx; diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index 2e24883..012b43f 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -26,11 +26,10 @@ import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.networking.AbstractConnection; +import ch.dissem.bitmessage.utils.UnixTime; import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; +import java.util.*; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; @@ -46,7 +45,7 @@ public class ConnectionInfo extends AbstractConnection { private long lastUpdate = System.currentTimeMillis(); public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, - Set commonRequestedObjects, long syncTimeout) { + Map commonRequestedObjects, long syncTimeout) { super(context, mode, node, commonRequestedObjects, syncTimeout); headerOut.flip(); if (mode == CLIENT || mode == SYNC) { @@ -147,8 +146,12 @@ public class ConnectionInfo extends AbstractConnection { protected void send(MessagePayload payload) { sendingQueue.add(payload); if (payload instanceof GetData) { - requestedObjects.addAll(((GetData) payload).getInventory()); - commonRequestedObjects.addAll(((GetData) payload).getInventory()); + Long now = UnixTime.now(); + List inventory = ((GetData) payload).getInventory(); + requestedObjects.addAll(inventory); + for (InventoryVector iv : inventory) { + commonRequestedObjects.put(iv, now); + } } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index f9fa06a..b79174b 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -46,14 +46,14 @@ import static ch.dissem.bitmessage.utils.Collections.selectRandom; import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.nio.channels.SelectionKey.*; -import static java.util.Collections.newSetFromMap; /** * Network handler using java.nio, resulting in less threads. */ public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder { private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class); - private static final long REQUESTED_OBJECTS_MAX_TIME = 30 * 60_000; // 30 minutes + private static final long REQUESTED_OBJECTS_MAX_TIME = 2 * 60_000; // 2 minutes + private static final Long DELAYED = Long.MIN_VALUE; private final ExecutorService threadPool = Executors.newCachedThreadPool( pool("network") @@ -66,8 +66,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private ServerSocketChannel serverChannel; private Queue connectionQueue = new ConcurrentLinkedQueue<>(); private Map connections = new ConcurrentHashMap<>(); - private final Set requestedObjects = newSetFromMap(new ConcurrentHashMap(10_000)); - private long requestedObjectsTimeout = 0; + private final Map requestedObjects = new ConcurrentHashMap<>(10_000); private Thread starter; @@ -80,7 +79,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex channel.configureBlocking(false); ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), - new HashSet(), timeoutInSeconds); + new HashMap(), timeoutInSeconds); while (channel.isConnected() && !connection.isSyncFinished()) { write(channel, connection); read(channel, connection); @@ -147,7 +146,6 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } catch (IOException e) { throw new ApplicationException(e); } - requestedObjectsTimeout = System.currentTimeMillis() + REQUESTED_OBJECTS_MAX_TIME; requestedObjects.clear(); starter = thread("connection manager", new Runnable() { @@ -189,15 +187,22 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex // The list 'requested objects' helps to prevent downloading an object // twice. From time to time there is an error though, and an object is // never downloaded. To prevent a large list of failed objects and give - // them a chance to get downloaded again, let's clear the list from time - // to time. The timeout should be such that most of the initial object - // sync should be done by then, but small enough to prevent objects with - // a normal time out from not being downloaded at all. - long now = System.currentTimeMillis(); - if (now > requestedObjectsTimeout) { - requestedObjectsTimeout = now + REQUESTED_OBJECTS_MAX_TIME; - requestedObjects.clear(); + // them a chance to get downloaded again, we will attempt to download an + // object from another node after some time out. + long timedOut = System.currentTimeMillis() - REQUESTED_OBJECTS_MAX_TIME; + List delayed = new LinkedList<>(); + Iterator> iterator = requestedObjects.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry e = iterator.next(); + //noinspection NumberEquality + if (e.getValue() == DELAYED) { + iterator.remove(); + } else if (e.getValue() < timedOut) { + delayed.add(e.getKey()); + e.setValue(DELAYED); + } } + request(delayed); try { Thread.sleep(30_000); @@ -422,7 +427,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex break; } } - if (connection.knowsOf(next)) { + if (connection.knowsOf(next) && !connection.requested(next)) { List ivs = distribution.get(connection); if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { connection.send(new GetData.Builder().inventory(ivs).build()); @@ -442,7 +447,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } while (iterator.hasNext()); // remove objects nobody knows of - requestedObjects.removeAll(inventoryVectors); + for (InventoryVector iv : inventoryVectors) { + requestedObjects.remove(iv); + } for (ConnectionInfo connection : distribution.keySet()) { List ivs = distribution.get(connection); diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index d64a855..940f632 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -76,6 +76,7 @@ public class NetworkHandlerTest { } @Parameterized.Parameters + @SuppressWarnings("deprecation") public static List parameters() { return Arrays.asList(new Object[][]{ {new DefaultNetworkHandler(), new DefaultNetworkHandler()},