diff --git a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index 46d76d7..28e8483 100644 --- a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -157,6 +157,7 @@ public class BitmessageContext { .from(from) .to(to) .message(subject, message) + .labels(messages().getLabels(Label.Type.SENT)) .build(); if (to.getPubkey() == null) { tryToFindMatchingPubkey(to); diff --git a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java index eb22f03..b2366e4 100644 --- a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java +++ b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java @@ -90,28 +90,32 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { address = ctx.getAddressRepository().findContact(pubkey.getRipe()); } if (address != null) { - address.setPubkey(pubkey); - LOG.info("Got pubkey for contact " + address); - ctx.getAddressRepository().save(address); - List messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address); - LOG.info("Sending " + messages.size() + " messages for contact " + address); - for (Plaintext msg : messages) { - msg.setStatus(DOING_PROOF_OF_WORK); - ctx.getMessageRepository().save(msg); - ctx.send( - msg.getFrom(), - msg.getTo(), - new Msg(msg), - +2 * DAY - ); - msg.setStatus(SENT); - ctx.getMessageRepository().save(msg); - } + updatePubkey(address, pubkey); } } catch (DecryptionFailedException ignore) { } } + private void updatePubkey(BitmessageAddress address, Pubkey pubkey){ + address.setPubkey(pubkey); + LOG.info("Got pubkey for contact " + address); + ctx.getAddressRepository().save(address); + List<Plaintext> messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address); + LOG.info("Sending " + messages.size() + " messages for contact " + address); + for (Plaintext msg : messages) { + msg.setStatus(DOING_PROOF_OF_WORK); + ctx.getMessageRepository().save(msg); + ctx.send( + msg.getFrom(), + msg.getTo(), + new Msg(msg), + +2 * DAY + ); + msg.setStatus(SENT); + ctx.getMessageRepository().save(msg); + } + } + protected void receive(ObjectMessage object, Msg msg) throws IOException { for (BitmessageAddress identity : ctx.getAddressRepository().getIdentities()) { try { @@ -125,6 +129,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { msg.getPlaintext().setInventoryVector(object.getInventoryVector()); ctx.getMessageRepository().save(msg.getPlaintext()); listener.receive(msg.getPlaintext()); + updatePubkey(msg.getPlaintext().getFrom(), msg.getPlaintext().getFrom().getPubkey()); } break; } catch (DecryptionFailedException ignore) { @@ -148,6 +153,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { broadcast.getPlaintext().setInventoryVector(object.getInventoryVector()); ctx.getMessageRepository().save(broadcast.getPlaintext()); listener.receive(broadcast.getPlaintext()); + updatePubkey(broadcast.getPlaintext().getFrom(), broadcast.getPlaintext().getFrom().getPubkey()); } } catch (DecryptionFailedException ignore) { } diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java b/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java index df1b380..0135ec0 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java @@ -44,10 +44,10 @@ public class Inv implements MessagePayload { } @Override - public void write(OutputStream stream) throws IOException { - Encode.varInt(inventory.size(), stream); + public void write(OutputStream out) throws IOException { + Encode.varInt(inventory.size(), out); for (InventoryVector iv : inventory) { - iv.write(stream); + iv.write(out); } } diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java index f87dd13..fc67422 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java @@ -38,7 +38,6 @@ public class InventoryVector implements Streamable, Serializable { InventoryVector that = (InventoryVector) o; return Arrays.equals(hash, that.hash); - } @Override diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java index 9e15b3d..d13e73e 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java @@ -44,6 +44,9 @@ class V3MessageFactory { findMagic(in); String command = getCommand(in); int length = (int) Decode.uint32(in); + if (length > 1600003) { + throw new NodeException("Payload of " + length + " bytes received, no more than 1600003 was expected."); + } byte[] checksum = Decode.bytes(in, 4); byte[] payloadBytes = Decode.bytes(in, length); @@ -191,10 +194,10 @@ class V3MessageFactory { private static String getCommand(InputStream stream) throws IOException { byte[] bytes = new byte[12]; - int end = -1; + int end = bytes.length; for (int i = 0; i < bytes.length; i++) { bytes[i] = (byte) stream.read(); - if (end == -1) { + if (end == bytes.length) { if (bytes[i] == 0) end = i; } else { if (bytes[i] != 0) throw new IOException("'\\0' padding expected for command"); diff --git a/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java b/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java index 03b7bc5..1bcb8e7 100644 --- a/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java +++ b/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java @@ -17,6 +17,7 @@ package ch.dissem.bitmessage.entity; import ch.dissem.bitmessage.entity.payload.*; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.utils.TestBase; @@ -25,9 +26,11 @@ import org.junit.Test; import java.io.*; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; +import static ch.dissem.bitmessage.utils.Singleton.security; import static org.junit.Assert.*; public class SerializationTest extends TestBase { @@ -95,6 +98,23 @@ public class SerializationTest extends TestBase { assertEquals(p1, p2); } + @Test + public void ensureNetworkMessageIsSerializedAndDeserializedCorrectly() throws Exception { + ArrayList<InventoryVector> ivs = new ArrayList<>(50000); + for (int i = 0; i < 50000; i++) { + ivs.add(new InventoryVector(security().randomBytes(32))); + } + + Inv inv = new Inv.Builder().inventory(ivs).build(); + NetworkMessage before = new NetworkMessage(inv); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + before.write(out); + + NetworkMessage after = Factory.getNetworkMessage(3, new ByteArrayInputStream(out.toByteArray())); + Inv invAfter = (Inv) after.getPayload(); + assertEquals(ivs, invAfter.getInventory()); + } + private void doTest(String resourceName, int version, Class<?> expectedPayloadType) throws IOException { byte[] data = TestUtils.getBytes(resourceName); InputStream in = new ByteArrayInputStream(data); diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index f35bcc5..26d6652 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; +import java.net.InetAddress; import java.util.List; import java.util.Scanner; @@ -40,7 +41,7 @@ public class Application { private BitmessageContext ctx; - public Application() { + public Application(String syncServer, int syncPort) { JdbcConfig jdbcConfig = new JdbcConfig(); ctx = new BitmessageContext.Builder() .addressRepo(new JdbcAddressRepository(jdbcConfig)) @@ -63,7 +64,9 @@ public class Application { }) .build(); - ctx.startup(); + if (syncServer == null) { + ctx.startup(); + } scanner = new Scanner(System.in); @@ -75,6 +78,9 @@ public class Application { System.out.println("c) contacts"); System.out.println("s) subscriptions"); System.out.println("m) messages"); + if (syncServer != null) { + System.out.println("y) sync"); + } System.out.println("?) info"); System.out.println("e) exit"); @@ -99,6 +105,9 @@ public class Application { break; case "e": break; + case "y": + ctx.synchronize(InetAddress.getByName(syncServer), syncPort, 120, true); + break; default: System.out.println("Unknown command. Please try again."); } diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java index a7b63d1..b0e114b 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java @@ -64,7 +64,7 @@ public class Main { new WifImporter(ctx, options.importWIF).importAll(); } } else { - new Application(); + new Application(options.syncServer, options.syncPort); } } @@ -74,5 +74,11 @@ public class Main { @Option(name = "-export", usage = "Export to WIF file.") private File exportWIF; + + @Option(name = "-syncServer", usage = "Use manual synchronization with the given server instead of starting a full node.") + private String syncServer; + + @Option(name = "-syncPort", usage = "Port to use for synchronisation") + private int syncPort = 8444; } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 76094ff..54bfee4 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -139,13 +139,13 @@ class Connection { @SuppressWarnings("RedundantIfStatement") private boolean syncFinished(NetworkMessage msg) { - if (mode != SYNC){ + if (mode != SYNC) { return false; } if (Thread.interrupted()) { return true; } - if (syncTimeout == 0 || state != ACTIVE) { + if (state != ACTIVE) { return false; } if (syncTimeout < UnixTime.now()) { @@ -204,10 +204,11 @@ class Connection { switch (messagePayload.getCommand()) { case INV: Inv inv = (Inv) messagePayload; + int originalSize = inv.getInventory().size(); updateIvCache(inv.getInventory()); List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams); missing.removeAll(commonRequestedObjects); - LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are " + LOG.debug("Received inventory with " + originalSize + " elements, of which are " + missing.size() + " missing."); send(new GetData.Builder().inventory(missing).build()); break; @@ -230,8 +231,6 @@ class Connection { security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); ctx.getInventory().storeObject(objectMessage); // offer object to some random nodes so it gets distributed throughout the network: - // FIXME: don't do this while we catch up after initialising our first connection - // (that might be a bit tricky to do) networkHandler.offer(objectMessage.getInventoryVector()); lastObjectTime = UnixTime.now(); } catch (InsufficientProofOfWorkException e) { @@ -283,7 +282,9 @@ class Connection { if (payload instanceof GetData) { requestedObjects.addAll(((GetData) payload).getInventory()); } - new NetworkMessage(payload).write(out); + synchronized (this) { + new NetworkMessage(payload).write(out); + } } catch (IOException e) { LOG.error(e.getMessage(), e); disconnect(); @@ -342,16 +343,19 @@ class Connection { public class ReaderRunnable implements Runnable { @Override public void run() { + lastObjectTime = 0; try (Socket socket = Connection.this.socket) { initSocket(socket); if (mode == CLIENT || mode == SYNC) { send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); } while (state != DISCONNECTED) { - if (mode != SYNC && state == ACTIVE && requestedObjects.isEmpty()) { - Thread.sleep(1000); - } else { - Thread.sleep(100); + if (mode != SYNC) { + if (state == ACTIVE && requestedObjects.isEmpty() && sendingQueue.isEmpty()) { + Thread.sleep(1000); + } else { + Thread.sleep(100); + } } try { NetworkMessage msg = Factory.getNetworkMessage(version, in); diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index 77cad46..a45ec47 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -117,7 +117,8 @@ public class NetworkHandlerTest { ); nodeInventory.init( - "V1Msg.payload" + "V1Msg.payload", + "V4Pubkey.payload" ); Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java index 48b8df5..69890c3 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java @@ -256,12 +256,13 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito private void update(Connection connection, Plaintext message) throws SQLException, IOException { PreparedStatement ps = connection.prepareStatement( - "UPDATE Message SET iv=?, sent=?, received=?, status=? WHERE id=?"); + "UPDATE Message SET iv=?, sent=?, received=?, status=?, initial_hash=? WHERE id=?"); ps.setBytes(1, message.getInventoryVector() != null ? message.getInventoryVector().getHash() : null); ps.setLong(2, message.getSent()); ps.setLong(3, message.getReceived()); ps.setString(4, message.getStatus() != null ? message.getStatus().name() : null); - ps.setLong(5, (Long) message.getId()); + ps.setBytes(5, message.getInitialHash()); + ps.setLong(6, (Long) message.getId()); ps.executeUpdate(); }