diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java
index 1f34c9d..46d76d7 100644
--- a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java
+++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java
@@ -40,6 +40,7 @@ import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
import static ch.dissem.bitmessage.utils.UnixTime.DAY;
import static ch.dissem.bitmessage.utils.UnixTime.HOUR;
+import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/**
*
Use this class if you want to create a Bitmessage client.
@@ -183,6 +184,40 @@ public class BitmessageContext {
});
}
+ public void send(final Plaintext msg) {
+ if (msg.getFrom() == null || msg.getFrom().getPrivateKey() == null) {
+ throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key.");
+ }
+ pool.submit(new Runnable() {
+ @Override
+ public void run() {
+ BitmessageAddress to = msg.getTo();
+ if (to.getPubkey() == null) {
+ tryToFindMatchingPubkey(to);
+ }
+ if (to.getPubkey() == null) {
+ LOG.info("Public key is missing from recipient. Requesting.");
+ requestPubkey(msg.getFrom(), to);
+ msg.setStatus(PUBKEY_REQUESTED);
+ ctx.getMessageRepository().save(msg);
+ } else {
+ LOG.info("Sending message.");
+ msg.setStatus(DOING_PROOF_OF_WORK);
+ ctx.getMessageRepository().save(msg);
+ ctx.send(
+ msg.getFrom(),
+ to,
+ new Msg(msg),
+ +2 * DAY
+ );
+ msg.setStatus(SENT);
+ msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
+ ctx.getMessageRepository().save(msg);
+ }
+ }
+ });
+ }
+
private void requestPubkey(BitmessageAddress requestingIdentity, BitmessageAddress address) {
ctx.send(
requestingIdentity,
@@ -330,7 +365,7 @@ public class BitmessageContext {
CustomCommandHandler customCommandHandler;
Listener listener;
int connectionLimit = 150;
- long connectionTTL = 12 * HOUR;
+ long connectionTTL = 30 * MINUTE;
boolean sendPubkeyOnIdentityCreation = true;
long pubkeyTTL = 28;
@@ -420,9 +455,9 @@ public class BitmessageContext {
* Time to live in seconds for public keys the client sends. Defaults to the maximum of 28 days,
* but on weak devices smaller values might be desirable.
*
- * Please be aware that this might cause some problems where you can't receive a message (the
- * sender can't receive your public key) in some special situations. Also note that it's probably
- * not a good idea to set it too low.
+ * Please be aware that this might cause some problems where you can't receive a message (the
+ * sender can't receive your public key) in some special situations. Also note that it's probably
+ * not a good idea to set it too low.
*
*/
public Builder pubkeyTTL(long days) {
diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java b/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java
index b62e6e5..e272bbc 100644
--- a/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java
+++ b/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java
@@ -44,10 +44,10 @@ public class GetData implements MessagePayload {
}
@Override
- public void write(OutputStream stream) throws IOException {
- Encode.varInt(inventory.size(), stream);
+ public void write(OutputStream out) throws IOException {
+ Encode.varInt(inventory.size(), out);
for (InventoryVector iv : inventory) {
- iv.write(stream);
+ iv.write(out);
}
}
diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java
index e00d193..b823eb5 100644
--- a/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java
+++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Property.java
@@ -16,6 +16,9 @@
package ch.dissem.bitmessage.utils;
+import java.util.Arrays;
+import java.util.Objects;
+
/**
* Some property that has a name, a value and/or other properties. This can be used for any purpose, but is for now
* used to contain different status information. It is by default displayed in some JSON inspired human readable
@@ -43,12 +46,19 @@ public class Property {
return value;
}
- public Property getProperty(String name) {
+ /**
+ * Returns the property if available or null
otherwise.
+ * Subproperties can be requested by submitting the sequence of properties.
+ */
+ public Property getProperty(String... name) {
+ if (name == null || name.length == 0) return null;
+
for (Property p : properties) {
- if (name == null) {
- if (p.name == null) return p;
- } else {
- if (name.equals(p.name)) return p;
+ if (Objects.equals(name[0], p.name)) {
+ if (name.length == 1)
+ return p;
+ else
+ return p.getProperty(Arrays.copyOfRange(name, 1, name.length));
}
}
return null;
diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java
index 4d14fe2..3155645 100644
--- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java
+++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
+import static ch.dissem.bitmessage.networking.Connection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.Connection.State.*;
import static ch.dissem.bitmessage.utils.Singleton.security;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
@@ -49,7 +50,7 @@ import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/**
* A connection to a specific node
*/
-public class Connection {
+class Connection {
public static final int READ_TIMEOUT = 2000;
private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
private static final int CONNECT_TIMEOUT = 5000;
@@ -63,10 +64,12 @@ public class Connection {
private final NetworkAddress host;
private final NetworkAddress node;
private final Queue sendingQueue = new ConcurrentLinkedDeque<>();
- private final Map requestedObjects;
+ private final Set commonRequestedObjects;
+ private final Set requestedObjects;
private final long syncTimeout;
private final ReaderRunnable reader = new ReaderRunnable();
private final WriterRunnable writer = new WriterRunnable();
+ private final DefaultNetworkHandler networkHandler;
private volatile State state;
private InputStream in;
@@ -75,39 +78,45 @@ public class Connection {
private long[] streams;
private int readTimeoutCounter;
private boolean socketInitialized;
+ private long lastObjectTime;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
- ConcurrentMap requestedObjectsMap) throws IOException {
+ Set requestedObjectsMap) throws IOException {
this(context, mode, listener, socket, requestedObjectsMap,
+ Collections.newSetFromMap(new ConcurrentHashMap(10_000)),
new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(),
0);
}
public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener,
- ConcurrentMap requestedObjectsMap) {
+ Set requestedObjectsMap) {
this(context, mode, listener, new Socket(), requestedObjectsMap,
+ Collections.newSetFromMap(new ConcurrentHashMap(10_000)),
node, 0);
}
private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket,
- Map requestedObjectsMap, NetworkAddress node, long syncTimeout) {
+ Set commonRequestedObjects, Set requestedObjects, NetworkAddress node, long syncTimeout) {
this.startTime = UnixTime.now();
this.ctx = context;
this.mode = mode;
this.state = CONNECTING;
this.listener = listener;
this.socket = socket;
- this.requestedObjects = requestedObjectsMap;
+ this.commonRequestedObjects = commonRequestedObjects;
+ this.requestedObjects = requestedObjects;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node;
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
this.ivCache = new ConcurrentHashMap<>();
+ this.networkHandler = (DefaultNetworkHandler) ctx.getNetworkHandler();
}
public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener,
long timeoutInSeconds) throws IOException {
- return new Connection(ctx, Mode.CLIENT, listener, new Socket(address, port),
- new HashMap(),
+ return new Connection(ctx, Mode.SYNC, listener, new Socket(address, port),
+ new HashSet(),
+ new HashSet(),
new NetworkAddress.Builder().ip(address).port(port).stream(1).build(),
timeoutInSeconds);
}
@@ -141,25 +150,26 @@ public class Connection {
return true;
}
if (msg == null) {
+ if (requestedObjects.isEmpty() && sendingQueue.isEmpty())
+ return true;
+
readTimeoutCounter++;
return readTimeoutCounter > 1;
+ } else {
+ readTimeoutCounter = 0;
+ return false;
}
- readTimeoutCounter = 0;
- if (!(msg.getPayload() instanceof Addr) && !(msg.getPayload() instanceof GetData)
- && requestedObjects.isEmpty() && sendingQueue.isEmpty()) {
- LOG.info("Synchronisation completed");
- return true;
- }
- return false;
}
private void activateConnection() {
LOG.info("Successfully established connection with node " + node);
state = ACTIVE;
- sendAddresses();
+ if (mode != SYNC) {
+ sendAddresses();
+ ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
+ }
sendInventory();
node.setTime(UnixTime.now());
- ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
}
private void cleanupIvCache() {
@@ -187,41 +197,15 @@ public class Connection {
}
}
- private void updateRequestedObjects(List missing) {
- Long now = UnixTime.now();
- Long fiveMinutesAgo = now - 5 * MINUTE;
- Long tenMinutesAgo = now - 10 * MINUTE;
- List stillMissing = new LinkedList<>();
- for (Map.Entry entry : requestedObjects.entrySet()) {
- if (entry.getValue() < fiveMinutesAgo) {
- stillMissing.add(entry.getKey());
- // If it's still not available after 10 minutes, we won't look for it
- // any longer (except it's announced again)
- if (entry.getValue() < tenMinutesAgo) {
- requestedObjects.remove(entry.getKey());
- }
- }
- }
-
- for (InventoryVector iv : missing) {
- requestedObjects.put(iv, now);
- }
- if (!stillMissing.isEmpty()) {
- LOG.debug(stillMissing.size() + " items are still missing.");
- missing.addAll(stillMissing);
- }
- }
-
private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) {
case INV:
Inv inv = (Inv) messagePayload;
updateIvCache(inv.getInventory());
List missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
- missing.removeAll(requestedObjects.keySet());
+ missing.removeAll(commonRequestedObjects);
LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are "
+ missing.size() + " missing.");
- updateRequestedObjects(missing);
send(new GetData.Builder().inventory(missing).build());
break;
case GETDATA:
@@ -234,24 +218,33 @@ public class Connection {
case OBJECT:
ObjectMessage objectMessage = (ObjectMessage) messagePayload;
try {
+ requestedObjects.remove(objectMessage.getInventoryVector());
if (ctx.getInventory().contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
break;
}
- security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
listener.receive(objectMessage);
+ security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network:
// FIXME: don't do this while we catch up after initialising our first connection
// (that might be a bit tricky to do)
- ctx.getNetworkHandler().offer(objectMessage.getInventoryVector());
+ networkHandler.offer(objectMessage.getInventoryVector());
+ lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) {
LOG.warn(e.getMessage());
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally {
- requestedObjects.remove(objectMessage.getInventoryVector());
+ if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) {
+ LOG.debug("Received object that wasn't requested.");
+// if (!requestedObjects.isEmpty()) {
+// DebugUtils.saveToFile(objectMessage);
+// LOG.debug(objectMessage.getInventoryVector() + " was not in "
+// + requestedObjects.toString());
+// }
+ }
}
break;
case ADDR:
@@ -282,10 +275,16 @@ public class Connection {
public void disconnect() {
state = DISCONNECTED;
+
+ // Make sure objects that are still missing are requested from other nodes
+ networkHandler.request(requestedObjects);
}
- private void send(MessagePayload payload) {
+ void send(MessagePayload payload) {
try {
+ if (payload instanceof GetData) {
+ requestedObjects.addAll(((GetData) payload).getInventory());
+ }
new NetworkMessage(payload).write(out);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
@@ -338,7 +337,7 @@ public class Connection {
return writer;
}
- public enum Mode {SERVER, CLIENT}
+ public enum Mode {SERVER, CLIENT, SYNC}
public enum State {CONNECTING, ACTIVE, DISCONNECTED}
@@ -347,11 +346,15 @@ public class Connection {
public void run() {
try (Socket socket = Connection.this.socket) {
initSocket(socket);
- if (mode == CLIENT) {
+ if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
}
while (state != DISCONNECTED) {
- Thread.sleep(100);
+ if (requestedObjects.isEmpty()) {
+ Thread.sleep(1000);
+ } else {
+ Thread.sleep(100);
+ }
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
@@ -377,6 +380,7 @@ public class Connection {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
break;
case CLIENT:
+ case SYNC:
activateConnection();
break;
}
@@ -391,6 +395,7 @@ public class Connection {
activateConnection();
break;
case CLIENT:
+ case SYNC:
// NO OP
break;
}
@@ -407,7 +412,7 @@ public class Connection {
+ msg.getPayload().getCommand() + "'");
}
}
- if (socket.isClosed() || syncFinished(msg)) disconnect();
+ if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect();
} catch (SocketTimeoutException ignore) {
if (state == ACTIVE) {
if (syncFinished(null)) disconnect();
@@ -429,16 +434,20 @@ public class Connection {
}
}
+ private boolean checkOpenRequests() {
+ return !requestedObjects.isEmpty() && lastObjectTime > 0 && (UnixTime.now() - lastObjectTime) > 2 * MINUTE;
+ }
+
public class WriterRunnable implements Runnable {
@Override
public void run() {
try (Socket socket = Connection.this.socket) {
initSocket(socket);
while (state != DISCONNECTED) {
- if (sendingQueue.size() > 0) {
+ if (!sendingQueue.isEmpty()) {
send(sendingQueue.poll());
} else {
- Thread.sleep(100);
+ Thread.sleep(1000);
}
}
} catch (IOException | InterruptedException e) {
diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java
index e934bdf..bd58d8e 100644
--- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java
+++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java
@@ -19,6 +19,7 @@ package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.InternalContext.ContextHolder;
import ch.dissem.bitmessage.entity.CustomMessage;
+import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
@@ -41,8 +42,9 @@ import java.util.concurrent.*;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE;
-import static ch.dissem.bitmessage.networking.Connection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.DebugUtils.inc;
+import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
+import static java.util.Collections.newSetFromMap;
/**
* Handles all the networky stuff.
@@ -50,13 +52,14 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc;
public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
public final static int NETWORK_MAGIC_NUMBER = 8;
private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class);
- private final List connections = new LinkedList<>();
+ private static final Random RANDOM = new Random();
+ private final Collection connections = new ConcurrentLinkedQueue<>();
private final ExecutorService pool;
private InternalContext ctx;
private ServerSocket serverSocket;
private volatile boolean running;
- private ConcurrentMap requestedObjects = new ConcurrentHashMap<>();
+ private Set requestedObjects = newSetFromMap(new ConcurrentHashMap(50_000));
public DefaultNetworkHandler() {
pool = Executors.newCachedThreadPool(new ThreadFactory() {
@@ -134,6 +137,8 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
}
});
pool.execute(new Runnable() {
+ public Connection initialConnection;
+
@Override
public void run() {
try {
@@ -152,25 +157,38 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
}
for (Iterator iterator = connections.iterator(); iterator.hasNext(); ) {
Connection c = iterator.next();
- if (now - c.getStartTime() > ctx.getConnectionTTL()) {
+ // Just in case they were all created at the same time, don't disconnect
+ // all at once.
+ if (now - c.getStartTime() + RANDOM.nextInt(5 * MINUTE) > ctx.getConnectionTTL()) {
c.disconnect();
}
- if (c.getState() == DISCONNECTED) {
- // Remove the current element from the iterator and the list.
- iterator.remove();
- }
- if (c.getState() == ACTIVE) {
- active++;
+ switch (c.getState()) {
+ case DISCONNECTED:
+ iterator.remove();
+ break;
+ case ACTIVE:
+ active++;
+ break;
}
}
}
if (active < NETWORK_MAGIC_NUMBER) {
List addresses = ctx.getNodeRegistry().getKnownAddresses(
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
+ boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) {
- startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects));
+ Connection c = new Connection(ctx, CLIENT, address, listener, requestedObjects);
+ if (first) {
+ initialConnection = c;
+ first = false;
+ }
+ startConnection(c);
}
Thread.sleep(10000);
+ } else if (initialConnection != null) {
+ initialConnection.disconnect();
+ initialConnection = null;
+ Thread.sleep(10000);
} else {
Thread.sleep(30000);
}
@@ -209,6 +227,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
c.disconnect();
}
}
+ requestedObjects.clear();
}
private void startConnection(Connection c) {
@@ -272,7 +291,56 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
}
return new Property("network", null,
new Property("connectionManager", running ? "running" : "stopped"),
- new Property("connections", null, streamProperties)
+ new Property("connections", null, streamProperties),
+ new Property("requestedObjects", requestedObjects.size())
);
}
+
+ void request(Set inventoryVectors) {
+ if (!running || inventoryVectors.isEmpty()) return;
+ synchronized (connections) {
+ Map> distribution = new HashMap<>();
+ for (Connection connection : connections) {
+ if (connection.getState() == ACTIVE) {
+ distribution.put(connection, new LinkedList());
+ }
+ }
+ Iterator iterator = inventoryVectors.iterator();
+ boolean firstRound = true;
+ InventoryVector next = iterator.next();
+ while (firstRound || iterator.hasNext()) {
+ if (!firstRound) {
+ next = iterator.next();
+ firstRound = true;
+ } else {
+ firstRound = false;
+ }
+ for (Connection connection : distribution.keySet()) {
+ if (connection.knowsOf(next)) {
+ List ivs = distribution.get(connection);
+ if (ivs.size() == 50_000) {
+ connection.send(new GetData.Builder().inventory(ivs).build());
+ ivs.clear();
+ }
+ ivs.add(next);
+ iterator.remove();
+
+ if (iterator.hasNext()) {
+ next = iterator.next();
+ firstRound = true;
+ } else {
+ firstRound = false;
+ break;
+ }
+ }
+ }
+ }
+ for (Connection connection : distribution.keySet()) {
+ List ivs = distribution.get(connection);
+ if (!ivs.isEmpty()) {
+ connection.send(new GetData.Builder().inventory(ivs).build());
+ }
+ }
+ }
+ }
}
diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java
index 499e377..e850ffb 100644
--- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java
+++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java
@@ -101,7 +101,7 @@ public class NetworkHandlerTest {
Property status;
do {
Thread.yield();
- status = node.status().getProperty("network").getProperty("connections").getProperty("stream 0");
+ status = node.status().getProperty("network", "connections", "stream 0");
} while (status == null);
assertEquals(1, status.getProperty("outgoing").getValue());
} finally {
@@ -109,7 +109,7 @@ public class NetworkHandlerTest {
}
}
- @Test(timeout = 5_000)
+ @Test(timeout = 10_000)
public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception {
peerInventory.init(
"V4Pubkey.payload",