Major fixes and improvements to the network module, fixing problems where objects where requested multiple times or not at all in some situations.

This commit is contained in:
Christian Basler 2016-01-16 08:47:50 +01:00
parent 549c8854ed
commit 8764642878
6 changed files with 202 additions and 80 deletions

View File

@ -40,6 +40,7 @@ import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
import static ch.dissem.bitmessage.utils.UnixTime.DAY; import static ch.dissem.bitmessage.utils.UnixTime.DAY;
import static ch.dissem.bitmessage.utils.UnixTime.HOUR; import static ch.dissem.bitmessage.utils.UnixTime.HOUR;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/** /**
* <p>Use this class if you want to create a Bitmessage client.</p> * <p>Use this class if you want to create a Bitmessage client.</p>
@ -183,6 +184,40 @@ public class BitmessageContext {
}); });
} }
public void send(final Plaintext msg) {
if (msg.getFrom() == null || msg.getFrom().getPrivateKey() == null) {
throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key.");
}
pool.submit(new Runnable() {
@Override
public void run() {
BitmessageAddress to = msg.getTo();
if (to.getPubkey() == null) {
tryToFindMatchingPubkey(to);
}
if (to.getPubkey() == null) {
LOG.info("Public key is missing from recipient. Requesting.");
requestPubkey(msg.getFrom(), to);
msg.setStatus(PUBKEY_REQUESTED);
ctx.getMessageRepository().save(msg);
} else {
LOG.info("Sending message.");
msg.setStatus(DOING_PROOF_OF_WORK);
ctx.getMessageRepository().save(msg);
ctx.send(
msg.getFrom(),
to,
new Msg(msg),
+2 * DAY
);
msg.setStatus(SENT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
ctx.getMessageRepository().save(msg);
}
}
});
}
private void requestPubkey(BitmessageAddress requestingIdentity, BitmessageAddress address) { private void requestPubkey(BitmessageAddress requestingIdentity, BitmessageAddress address) {
ctx.send( ctx.send(
requestingIdentity, requestingIdentity,
@ -330,7 +365,7 @@ public class BitmessageContext {
CustomCommandHandler customCommandHandler; CustomCommandHandler customCommandHandler;
Listener listener; Listener listener;
int connectionLimit = 150; int connectionLimit = 150;
long connectionTTL = 12 * HOUR; long connectionTTL = 30 * MINUTE;
boolean sendPubkeyOnIdentityCreation = true; boolean sendPubkeyOnIdentityCreation = true;
long pubkeyTTL = 28; long pubkeyTTL = 28;
@ -420,9 +455,9 @@ public class BitmessageContext {
* Time to live in seconds for public keys the client sends. Defaults to the maximum of 28 days, * Time to live in seconds for public keys the client sends. Defaults to the maximum of 28 days,
* but on weak devices smaller values might be desirable. * but on weak devices smaller values might be desirable.
* <p> * <p>
* Please be aware that this might cause some problems where you can't receive a message (the * Please be aware that this might cause some problems where you can't receive a message (the
* sender can't receive your public key) in some special situations. Also note that it's probably * sender can't receive your public key) in some special situations. Also note that it's probably
* not a good idea to set it too low. * not a good idea to set it too low.
* </p> * </p>
*/ */
public Builder pubkeyTTL(long days) { public Builder pubkeyTTL(long days) {

View File

@ -44,10 +44,10 @@ public class GetData implements MessagePayload {
} }
@Override @Override
public void write(OutputStream stream) throws IOException { public void write(OutputStream out) throws IOException {
Encode.varInt(inventory.size(), stream); Encode.varInt(inventory.size(), out);
for (InventoryVector iv : inventory) { for (InventoryVector iv : inventory) {
iv.write(stream); iv.write(out);
} }
} }

View File

@ -16,6 +16,9 @@
package ch.dissem.bitmessage.utils; package ch.dissem.bitmessage.utils;
import java.util.Arrays;
import java.util.Objects;
/** /**
* Some property that has a name, a value and/or other properties. This can be used for any purpose, but is for now * Some property that has a name, a value and/or other properties. This can be used for any purpose, but is for now
* used to contain different status information. It is by default displayed in some JSON inspired human readable * used to contain different status information. It is by default displayed in some JSON inspired human readable
@ -43,12 +46,19 @@ public class Property {
return value; return value;
} }
public Property getProperty(String name) { /**
* Returns the property if available or <code>null</code> otherwise.
* Subproperties can be requested by submitting the sequence of properties.
*/
public Property getProperty(String... name) {
if (name == null || name.length == 0) return null;
for (Property p : properties) { for (Property p : properties) {
if (name == null) { if (Objects.equals(name[0], p.name)) {
if (p.name == null) return p; if (name.length == 1)
} else { return p;
if (name.equals(p.name)) return p; else
return p.getProperty(Arrays.copyOfRange(name, 1, name.length));
} }
} }
return null; return null;

View File

@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.Connection.State.*; import static ch.dissem.bitmessage.networking.Connection.State.*;
import static ch.dissem.bitmessage.utils.Singleton.security; import static ch.dissem.bitmessage.utils.Singleton.security;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
@ -49,7 +50,7 @@ import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/** /**
* A connection to a specific node * A connection to a specific node
*/ */
public class Connection { class Connection {
public static final int READ_TIMEOUT = 2000; public static final int READ_TIMEOUT = 2000;
private static final Logger LOG = LoggerFactory.getLogger(Connection.class); private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
private static final int CONNECT_TIMEOUT = 5000; private static final int CONNECT_TIMEOUT = 5000;
@ -63,10 +64,12 @@ public class Connection {
private final NetworkAddress host; private final NetworkAddress host;
private final NetworkAddress node; private final NetworkAddress node;
private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>(); private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
private final Map<InventoryVector, Long> requestedObjects; private final Set<InventoryVector> commonRequestedObjects;
private final Set<InventoryVector> requestedObjects;
private final long syncTimeout; private final long syncTimeout;
private final ReaderRunnable reader = new ReaderRunnable(); private final ReaderRunnable reader = new ReaderRunnable();
private final WriterRunnable writer = new WriterRunnable(); private final WriterRunnable writer = new WriterRunnable();
private final DefaultNetworkHandler networkHandler;
private volatile State state; private volatile State state;
private InputStream in; private InputStream in;
@ -75,39 +78,45 @@ public class Connection {
private long[] streams; private long[] streams;
private int readTimeoutCounter; private int readTimeoutCounter;
private boolean socketInitialized; private boolean socketInitialized;
private long lastObjectTime;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
ConcurrentMap<InventoryVector, Long> requestedObjectsMap) throws IOException { Set<InventoryVector> requestedObjectsMap) throws IOException {
this(context, mode, listener, socket, requestedObjectsMap, this(context, mode, listener, socket, requestedObjectsMap,
Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)),
new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(),
0); 0);
} }
public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener,
ConcurrentMap<InventoryVector, Long> requestedObjectsMap) { Set<InventoryVector> requestedObjectsMap) {
this(context, mode, listener, new Socket(), requestedObjectsMap, this(context, mode, listener, new Socket(), requestedObjectsMap,
Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)),
node, 0); node, 0);
} }
private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket,
Map<InventoryVector, Long> requestedObjectsMap, NetworkAddress node, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> requestedObjects, NetworkAddress node, long syncTimeout) {
this.startTime = UnixTime.now(); this.startTime = UnixTime.now();
this.ctx = context; this.ctx = context;
this.mode = mode; this.mode = mode;
this.state = CONNECTING; this.state = CONNECTING;
this.listener = listener; this.listener = listener;
this.socket = socket; this.socket = socket;
this.requestedObjects = requestedObjectsMap; this.commonRequestedObjects = commonRequestedObjects;
this.requestedObjects = requestedObjects;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node; this.node = node;
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
this.ivCache = new ConcurrentHashMap<>(); this.ivCache = new ConcurrentHashMap<>();
this.networkHandler = (DefaultNetworkHandler) ctx.getNetworkHandler();
} }
public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener,
long timeoutInSeconds) throws IOException { long timeoutInSeconds) throws IOException {
return new Connection(ctx, Mode.CLIENT, listener, new Socket(address, port), return new Connection(ctx, Mode.SYNC, listener, new Socket(address, port),
new HashMap<InventoryVector, Long>(), new HashSet<InventoryVector>(),
new HashSet<InventoryVector>(),
new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), new NetworkAddress.Builder().ip(address).port(port).stream(1).build(),
timeoutInSeconds); timeoutInSeconds);
} }
@ -141,25 +150,26 @@ public class Connection {
return true; return true;
} }
if (msg == null) { if (msg == null) {
if (requestedObjects.isEmpty() && sendingQueue.isEmpty())
return true;
readTimeoutCounter++; readTimeoutCounter++;
return readTimeoutCounter > 1; return readTimeoutCounter > 1;
} else {
readTimeoutCounter = 0;
return false;
} }
readTimeoutCounter = 0;
if (!(msg.getPayload() instanceof Addr) && !(msg.getPayload() instanceof GetData)
&& requestedObjects.isEmpty() && sendingQueue.isEmpty()) {
LOG.info("Synchronisation completed");
return true;
}
return false;
} }
private void activateConnection() { private void activateConnection() {
LOG.info("Successfully established connection with node " + node); LOG.info("Successfully established connection with node " + node);
state = ACTIVE; state = ACTIVE;
sendAddresses(); if (mode != SYNC) {
sendAddresses();
ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
}
sendInventory(); sendInventory();
node.setTime(UnixTime.now()); node.setTime(UnixTime.now());
ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
} }
private void cleanupIvCache() { private void cleanupIvCache() {
@ -187,41 +197,15 @@ public class Connection {
} }
} }
private void updateRequestedObjects(List<InventoryVector> missing) {
Long now = UnixTime.now();
Long fiveMinutesAgo = now - 5 * MINUTE;
Long tenMinutesAgo = now - 10 * MINUTE;
List<InventoryVector> stillMissing = new LinkedList<>();
for (Map.Entry<InventoryVector, Long> entry : requestedObjects.entrySet()) {
if (entry.getValue() < fiveMinutesAgo) {
stillMissing.add(entry.getKey());
// If it's still not available after 10 minutes, we won't look for it
// any longer (except it's announced again)
if (entry.getValue() < tenMinutesAgo) {
requestedObjects.remove(entry.getKey());
}
}
}
for (InventoryVector iv : missing) {
requestedObjects.put(iv, now);
}
if (!stillMissing.isEmpty()) {
LOG.debug(stillMissing.size() + " items are still missing.");
missing.addAll(stillMissing);
}
}
private void receiveMessage(MessagePayload messagePayload) { private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) { switch (messagePayload.getCommand()) {
case INV: case INV:
Inv inv = (Inv) messagePayload; Inv inv = (Inv) messagePayload;
updateIvCache(inv.getInventory()); updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams); List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(requestedObjects.keySet()); missing.removeAll(commonRequestedObjects);
LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are " LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are "
+ missing.size() + " missing."); + missing.size() + " missing.");
updateRequestedObjects(missing);
send(new GetData.Builder().inventory(missing).build()); send(new GetData.Builder().inventory(missing).build());
break; break;
case GETDATA: case GETDATA:
@ -234,24 +218,33 @@ public class Connection {
case OBJECT: case OBJECT:
ObjectMessage objectMessage = (ObjectMessage) messagePayload; ObjectMessage objectMessage = (ObjectMessage) messagePayload;
try { try {
requestedObjects.remove(objectMessage.getInventoryVector());
if (ctx.getInventory().contains(objectMessage)) { if (ctx.getInventory().contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
break; break;
} }
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
listener.receive(objectMessage); listener.receive(objectMessage);
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
ctx.getInventory().storeObject(objectMessage); ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network: // offer object to some random nodes so it gets distributed throughout the network:
// FIXME: don't do this while we catch up after initialising our first connection // FIXME: don't do this while we catch up after initialising our first connection
// (that might be a bit tricky to do) // (that might be a bit tricky to do)
ctx.getNetworkHandler().offer(objectMessage.getInventoryVector()); networkHandler.offer(objectMessage.getInventoryVector());
lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) { } catch (InsufficientProofOfWorkException e) {
LOG.warn(e.getMessage()); LOG.warn(e.getMessage());
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active // DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (IOException e) { } catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally { } finally {
requestedObjects.remove(objectMessage.getInventoryVector()); if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) {
LOG.debug("Received object that wasn't requested.");
// if (!requestedObjects.isEmpty()) {
// DebugUtils.saveToFile(objectMessage);
// LOG.debug(objectMessage.getInventoryVector() + " was not in "
// + requestedObjects.toString());
// }
}
} }
break; break;
case ADDR: case ADDR:
@ -282,10 +275,16 @@ public class Connection {
public void disconnect() { public void disconnect() {
state = DISCONNECTED; state = DISCONNECTED;
// Make sure objects that are still missing are requested from other nodes
networkHandler.request(requestedObjects);
} }
private void send(MessagePayload payload) { void send(MessagePayload payload) {
try { try {
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
}
new NetworkMessage(payload).write(out); new NetworkMessage(payload).write(out);
} catch (IOException e) { } catch (IOException e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
@ -338,7 +337,7 @@ public class Connection {
return writer; return writer;
} }
public enum Mode {SERVER, CLIENT} public enum Mode {SERVER, CLIENT, SYNC}
public enum State {CONNECTING, ACTIVE, DISCONNECTED} public enum State {CONNECTING, ACTIVE, DISCONNECTED}
@ -347,11 +346,15 @@ public class Connection {
public void run() { public void run() {
try (Socket socket = Connection.this.socket) { try (Socket socket = Connection.this.socket) {
initSocket(socket); initSocket(socket);
if (mode == CLIENT) { if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
} }
while (state != DISCONNECTED) { while (state != DISCONNECTED) {
Thread.sleep(100); if (requestedObjects.isEmpty()) {
Thread.sleep(1000);
} else {
Thread.sleep(100);
}
try { try {
NetworkMessage msg = Factory.getNetworkMessage(version, in); NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null) if (msg == null)
@ -377,6 +380,7 @@ public class Connection {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
break; break;
case CLIENT: case CLIENT:
case SYNC:
activateConnection(); activateConnection();
break; break;
} }
@ -391,6 +395,7 @@ public class Connection {
activateConnection(); activateConnection();
break; break;
case CLIENT: case CLIENT:
case SYNC:
// NO OP // NO OP
break; break;
} }
@ -407,7 +412,7 @@ public class Connection {
+ msg.getPayload().getCommand() + "'"); + msg.getPayload().getCommand() + "'");
} }
} }
if (socket.isClosed() || syncFinished(msg)) disconnect(); if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect();
} catch (SocketTimeoutException ignore) { } catch (SocketTimeoutException ignore) {
if (state == ACTIVE) { if (state == ACTIVE) {
if (syncFinished(null)) disconnect(); if (syncFinished(null)) disconnect();
@ -429,16 +434,20 @@ public class Connection {
} }
} }
private boolean checkOpenRequests() {
return !requestedObjects.isEmpty() && lastObjectTime > 0 && (UnixTime.now() - lastObjectTime) > 2 * MINUTE;
}
public class WriterRunnable implements Runnable { public class WriterRunnable implements Runnable {
@Override @Override
public void run() { public void run() {
try (Socket socket = Connection.this.socket) { try (Socket socket = Connection.this.socket) {
initSocket(socket); initSocket(socket);
while (state != DISCONNECTED) { while (state != DISCONNECTED) {
if (sendingQueue.size() > 0) { if (!sendingQueue.isEmpty()) {
send(sendingQueue.poll()); send(sendingQueue.poll());
} else { } else {
Thread.sleep(100); Thread.sleep(1000);
} }
} }
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {

View File

@ -19,6 +19,7 @@ package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.InternalContext.ContextHolder; import ch.dissem.bitmessage.InternalContext.ContextHolder;
import ch.dissem.bitmessage.entity.CustomMessage; import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
@ -41,8 +42,9 @@ import java.util.concurrent.*;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.Connection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
import static java.util.Collections.newSetFromMap;
/** /**
* Handles all the networky stuff. * Handles all the networky stuff.
@ -50,13 +52,14 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc;
public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
public final static int NETWORK_MAGIC_NUMBER = 8; public final static int NETWORK_MAGIC_NUMBER = 8;
private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class); private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class);
private final List<Connection> connections = new LinkedList<>(); private static final Random RANDOM = new Random();
private final Collection<Connection> connections = new ConcurrentLinkedQueue<>();
private final ExecutorService pool; private final ExecutorService pool;
private InternalContext ctx; private InternalContext ctx;
private ServerSocket serverSocket; private ServerSocket serverSocket;
private volatile boolean running; private volatile boolean running;
private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(); private Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000));
public DefaultNetworkHandler() { public DefaultNetworkHandler() {
pool = Executors.newCachedThreadPool(new ThreadFactory() { pool = Executors.newCachedThreadPool(new ThreadFactory() {
@ -134,6 +137,8 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
}); });
pool.execute(new Runnable() { pool.execute(new Runnable() {
public Connection initialConnection;
@Override @Override
public void run() { public void run() {
try { try {
@ -152,25 +157,38 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) { for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) {
Connection c = iterator.next(); Connection c = iterator.next();
if (now - c.getStartTime() > ctx.getConnectionTTL()) { // Just in case they were all created at the same time, don't disconnect
// all at once.
if (now - c.getStartTime() + RANDOM.nextInt(5 * MINUTE) > ctx.getConnectionTTL()) {
c.disconnect(); c.disconnect();
} }
if (c.getState() == DISCONNECTED) { switch (c.getState()) {
// Remove the current element from the iterator and the list. case DISCONNECTED:
iterator.remove(); iterator.remove();
} break;
if (c.getState() == ACTIVE) { case ACTIVE:
active++; active++;
break;
} }
} }
} }
if (active < NETWORK_MAGIC_NUMBER) { if (active < NETWORK_MAGIC_NUMBER) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses( List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects)); Connection c = new Connection(ctx, CLIENT, address, listener, requestedObjects);
if (first) {
initialConnection = c;
first = false;
}
startConnection(c);
} }
Thread.sleep(10000); Thread.sleep(10000);
} else if (initialConnection != null) {
initialConnection.disconnect();
initialConnection = null;
Thread.sleep(10000);
} else { } else {
Thread.sleep(30000); Thread.sleep(30000);
} }
@ -209,6 +227,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
c.disconnect(); c.disconnect();
} }
} }
requestedObjects.clear();
} }
private void startConnection(Connection c) { private void startConnection(Connection c) {
@ -272,7 +291,56 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
return new Property("network", null, return new Property("network", null,
new Property("connectionManager", running ? "running" : "stopped"), new Property("connectionManager", running ? "running" : "stopped"),
new Property("connections", null, streamProperties) new Property("connections", null, streamProperties),
new Property("requestedObjects", requestedObjects.size())
); );
} }
void request(Set<InventoryVector> inventoryVectors) {
if (!running || inventoryVectors.isEmpty()) return;
synchronized (connections) {
Map<Connection, List<InventoryVector>> distribution = new HashMap<>();
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>());
}
}
Iterator<InventoryVector> iterator = inventoryVectors.iterator();
boolean firstRound = true;
InventoryVector next = iterator.next();
while (firstRound || iterator.hasNext()) {
if (!firstRound) {
next = iterator.next();
firstRound = true;
} else {
firstRound = false;
}
for (Connection connection : distribution.keySet()) {
if (connection.knowsOf(next)) {
List<InventoryVector> ivs = distribution.get(connection);
if (ivs.size() == 50_000) {
connection.send(new GetData.Builder().inventory(ivs).build());
ivs.clear();
}
ivs.add(next);
iterator.remove();
if (iterator.hasNext()) {
next = iterator.next();
firstRound = true;
} else {
firstRound = false;
break;
}
}
}
}
for (Connection connection : distribution.keySet()) {
List<InventoryVector> ivs = distribution.get(connection);
if (!ivs.isEmpty()) {
connection.send(new GetData.Builder().inventory(ivs).build());
}
}
}
}
} }

View File

@ -101,7 +101,7 @@ public class NetworkHandlerTest {
Property status; Property status;
do { do {
Thread.yield(); Thread.yield();
status = node.status().getProperty("network").getProperty("connections").getProperty("stream 0"); status = node.status().getProperty("network", "connections", "stream 0");
} while (status == null); } while (status == null);
assertEquals(1, status.getProperty("outgoing").getValue()); assertEquals(1, status.getProperty("outgoing").getValue());
} finally { } finally {
@ -109,7 +109,7 @@ public class NetworkHandlerTest {
} }
} }
@Test(timeout = 5_000) @Test(timeout = 10_000)
public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception { public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception {
peerInventory.init( peerInventory.init(
"V4Pubkey.payload", "V4Pubkey.payload",