diff --git a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index aef6a67..ad21e72 100644 --- a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -167,13 +167,13 @@ public class BitmessageContext { LOG.info("Sending message."); ctx.getMessageRepository().save(msg); if (msg.getType() == MSG) { - ctx.send(msg, TTL.msg()); + ctx.send(msg); } else { ctx.send( msg.getFrom(), to, Factory.getBroadcast(msg), - TTL.msg() + msg.getTTL() ); } } diff --git a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java index 05961e4..b5d70d1 100644 --- a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java +++ b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java @@ -51,8 +51,11 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { @SuppressWarnings("ConstantConditions") public void receive(ObjectMessage object) throws IOException { ObjectPayload payload = object.getPayload(); - if (payload.getType() == null && payload instanceof GenericPayload) { - receive((GenericPayload) payload); + if (payload.getType() == null) { + if (payload instanceof GenericPayload) { + receive((GenericPayload) payload); + } + return; } switch (payload.getType()) { @@ -115,7 +118,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { for (Plaintext msg : messages) { ctx.getLabeler().markAsSending(msg); ctx.getMessageRepository().save(msg); - ctx.send(msg, TTL.msg()); + ctx.send(msg); } } diff --git a/core/src/main/java/ch/dissem/bitmessage/InternalContext.java b/core/src/main/java/ch/dissem/bitmessage/InternalContext.java index 884dd16..057bfef 100644 --- a/core/src/main/java/ch/dissem/bitmessage/InternalContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/InternalContext.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.TreeSet; /** @@ -160,12 +161,13 @@ public class InternalContext { return port; } - public void send(final Plaintext plaintext, final long timeToLive) { + public void send(final Plaintext plaintext) { if (plaintext.getAckMessage() != null) { - long expires = UnixTime.now(+timeToLive); + long expires = UnixTime.now(+plaintext.getTTL()); + LOG.info("Expires at " + expires); proofOfWorkService.doProofOfWorkWithAck(plaintext, expires); } else { - send(plaintext.getFrom(), plaintext.getTo(), new Msg(plaintext), timeToLive); + send(plaintext.getFrom(), plaintext.getTo(), new Msg(plaintext), plaintext.getTTL()); } } @@ -278,6 +280,14 @@ public class InternalContext { } } + public void resendUnacknowledged() { + List messages = messageRepository.findMessagesToResend(); + for (Plaintext message : messages) { + send(message); + messageRepository.save(message); + } + } + public long getClientNonce() { return clientNonce; } diff --git a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java index 04016b5..e9a27db 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java +++ b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java @@ -69,6 +69,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC public void doProofOfWorkWithAck(Plaintext plaintext, long expirationTime) { final ObjectMessage ack = plaintext.getAckMessage(); + messageRepo.save(plaintext); Item item = new Item(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, expirationTime, plaintext); powRepo.putObject(item); @@ -84,6 +85,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC Plaintext plaintext = messageRepo.getMessage(initialHash); if (plaintext != null) { plaintext.setInventoryVector(object.getInventoryVector()); + plaintext.updateNextTry(); ctx.getLabeler().markAsSent(plaintext); messageRepo.save(plaintext); } diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java b/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java index b90bf14..90c3c50 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java @@ -17,13 +17,14 @@ package ch.dissem.bitmessage.entity; import ch.dissem.bitmessage.entity.payload.Msg; -import ch.dissem.bitmessage.entity.payload.Pubkey; +import ch.dissem.bitmessage.entity.payload.Pubkey.Feature; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.utils.Decode; import ch.dissem.bitmessage.utils.Encode; +import ch.dissem.bitmessage.utils.TTL; import ch.dissem.bitmessage.utils.UnixTime; import java.io.*; @@ -54,6 +55,10 @@ public class Plaintext implements Streamable { private Set<Label> labels; private byte[] initialHash; + private long ttl; + private int retries; + private Long nextTry; + private Plaintext(Builder builder) { id = builder.id; inventoryVector = builder.inventoryVector; @@ -74,6 +79,9 @@ public class Plaintext implements Streamable { sent = builder.sent; received = builder.received; labels = builder.labels; + ttl = builder.ttl; + retries = builder.retries; + nextTry = builder.nextTry; } public static Plaintext read(Type type, InputStream in) throws IOException { @@ -96,7 +104,7 @@ public class Plaintext implements Streamable { .destinationRipe(type == Type.MSG ? Decode.bytes(in, 20) : null) .encoding(Decode.varInt(in)) .message(Decode.varBytes(in)) - .ack(type == Type.MSG ? Decode.varBytes(in) : null); + .ackMessage(type == Type.MSG ? Decode.varBytes(in) : null); } public InventoryVector getInventoryVector() { @@ -174,12 +182,10 @@ public class Plaintext implements Streamable { Encode.varInt(message.length, out); out.write(message); if (type == Type.MSG) { - if (to.has(Pubkey.Feature.DOES_ACK) && getAckMessage() != null) { + if (to.has(Feature.DOES_ACK) && getAckMessage() != null) { ByteArrayOutputStream ack = new ByteArrayOutputStream(); getAckMessage().write(ack); - byte[] data = ack.toByteArray(); - Encode.varInt(data.length, out); - out.write(data); + Encode.varBytes(ack.toByteArray(), out); } else { Encode.varInt(0, out); } @@ -224,6 +230,30 @@ public class Plaintext implements Streamable { this.status = status; } + public long getTTL() { + return ttl; + } + + public int getRetries() { + return retries; + } + + public Long getNextTry() { + return nextTry; + } + + public void updateNextTry() { + if (nextTry == null) { + if (sent != null && to.has(Feature.DOES_ACK)) { + nextTry = sent + ttl; + retries++; + } + } else { + nextTry = nextTry + (1 << retries) * ttl; + retries++; + } + } + public String getSubject() { Scanner s = new Scanner(new ByteArrayInputStream(message), "UTF-8"); String firstLine = s.nextLine(); @@ -272,9 +302,7 @@ public class Plaintext implements Streamable { public void addLabels(Label... labels) { if (labels != null) { - for (Label label : labels) { - this.labels.add(label); - } + Collections.addAll(this.labels, labels); } } @@ -366,6 +394,9 @@ public class Plaintext implements Streamable { private long received; private Status status; private Set<Label> labels = new HashSet<>(); + private long ttl; + private int retries; + private Long nextTry; public Builder(Type type) { this.type = type; @@ -459,14 +490,15 @@ public class Plaintext implements Streamable { return this; } - public Builder ack(byte[] ack) { - if (type != Type.MSG && ack != null) throw new IllegalArgumentException("ack only allowed for msg"); + public Builder ackMessage(byte[] ack) { + if (type != Type.MSG && ack != null) throw new IllegalArgumentException("ackMessage only allowed for msg"); this.ackMessage = ack; return this; } public Builder ackData(byte[] ackData) { - if (type != Type.MSG && ackData != null) throw new IllegalArgumentException("ack only allowed for msg"); + if (type != Type.MSG && ackData != null) + throw new IllegalArgumentException("ackMessage only allowed for msg"); this.ackData = ackData; return this; } @@ -496,6 +528,21 @@ public class Plaintext implements Streamable { return this; } + public Builder ttl(long ttl) { + this.ttl = ttl; + return this; + } + + public Builder retries(int retries) { + this.retries = retries; + return this; + } + + public Builder nextTry(Long nextTry) { + this.nextTry = nextTry; + return this; + } + public Plaintext build() { if (from == null) { from = new BitmessageAddress(Factory.createPubkey( @@ -514,6 +561,9 @@ public class Plaintext implements Streamable { if (type == Type.MSG && ackMessage == null && ackData == null) { ackData = cryptography().randomBytes(Msg.ACK_LENGTH); } + if (ttl <= 0) { + ttl = TTL.msg(); + } return new Plaintext(this); } } diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/Version.java b/core/src/main/java/ch/dissem/bitmessage/entity/Version.java index 3722528..8539be1 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/Version.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/Version.java @@ -145,13 +145,13 @@ public class Version implements MessagePayload { private String userAgent; private long[] streamNumbers; - public Builder defaults() { + public Builder defaults(long clientNonce) { version = BitmessageContext.CURRENT_VERSION; services = 1; timestamp = UnixTime.now(); - nonce = new Random().nextInt(); userAgent = "/Jabit:0.0.1/"; streamNumbers = new long[]{1}; + nonce = clientNonce; return this; } diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java b/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java index 53e1676..0597f6a 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java @@ -212,6 +212,6 @@ public class Factory { if (plaintext == null || plaintext.getAckData() == null) return null; GenericPayload ack = new GenericPayload(3, plaintext.getFrom().getStream(), plaintext.getAckData()); - return new ObjectMessage.Builder().objectType(MSG).payload(ack).expiresTime(UnixTime.now(+TTL.msg())).build(); + return new ObjectMessage.Builder().objectType(MSG).payload(ack).expiresTime(UnixTime.now(plaintext.getTTL())).build(); } } diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java b/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java index 0c46499..aca16b9 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java @@ -44,6 +44,8 @@ public interface MessageRepository { List<Plaintext> findMessages(BitmessageAddress sender); + List<Plaintext> findMessagesToResend(); + void save(Plaintext message); void remove(Plaintext message); diff --git a/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java b/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java index 26d8a35..5405f23 100644 --- a/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java +++ b/core/src/test/java/ch/dissem/bitmessage/entity/SerializationTest.java @@ -82,7 +82,7 @@ public class SerializationTest extends TestBase { .from(TestUtils.loadIdentity("BM-2cSqjfJ8xK6UUn5Rw3RpdGQ9RsDkBhWnS8")) .to(TestUtils.loadContact()) .message("Subject", "Message") - .ackData("ack".getBytes()) + .ackData("ackMessage".getBytes()) .signature(new byte[0]) .build(); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -98,6 +98,32 @@ public class SerializationTest extends TestBase { assertEquals(p1, p2); } + @Test + public void ensurePlaintextWithAckMessageIsSerializedAndDeserializedCorrectly() throws Exception { + Plaintext p1 = new Plaintext.Builder(MSG) + .from(TestUtils.loadIdentity("BM-2cSqjfJ8xK6UUn5Rw3RpdGQ9RsDkBhWnS8")) + .to(TestUtils.loadContact()) + .message("Subject", "Message") + .ackData("ackMessage".getBytes()) + .signature(new byte[0]) + .build(); + ObjectMessage ackMessage1 = p1.getAckMessage(); + assertNotNull(ackMessage1); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + p1.write(out); + ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + Plaintext p2 = Plaintext.read(MSG, in); + + // Received is automatically set on deserialization, so we'll need to set it to 0 + Field received = Plaintext.class.getDeclaredField("received"); + received.setAccessible(true); + received.set(p2, 0L); + + assertEquals(p1, p2); + assertEquals(ackMessage1, p2.getAckMessage()); + } + @Test public void ensureNetworkMessageIsSerializedAndDeserializedCorrectly() throws Exception { ArrayList<InventoryVector> ivs = new ArrayList<>(50000); diff --git a/core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java b/core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java index 0e54446..993e3a9 100644 --- a/core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java +++ b/core/src/test/java/ch/dissem/bitmessage/ports/NodeRegistryTest.java @@ -34,6 +34,10 @@ public class NodeRegistryTest { assertThat(registry.getKnownAddresses(10), empty()); } + /** + * Please note that this test fails if there is no internet connection, + * as the initial nodes' IP addresses are determined by DNS lookup. + */ @Test public void ensureGetKnownNodesForStream1YieldsResult() { assertThat(registry.getKnownAddresses(10, 1), hasSize(1)); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 4717675..9ed19bc 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -72,6 +72,7 @@ class Connection { private final ReaderRunnable reader = new ReaderRunnable(); private final WriterRunnable writer = new WriterRunnable(); private final DefaultNetworkHandler networkHandler; + private final long clientNonce; private volatile State state; private InputStream in; @@ -83,22 +84,23 @@ class Connection { private long lastObjectTime; public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, - Set<InventoryVector> requestedObjectsMap) throws IOException { + Set<InventoryVector> requestedObjectsMap, long clientNonce) throws IOException { this(context, mode, listener, socket, requestedObjectsMap, Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)), new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), - 0); + 0, clientNonce); } public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, - Set<InventoryVector> requestedObjectsMap) { + Set<InventoryVector> requestedObjectsMap, long clientNonce) { this(context, mode, listener, new Socket(), requestedObjectsMap, Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)), - node, 0); + node, 0, clientNonce); } private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, - Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> requestedObjects, NetworkAddress node, long syncTimeout) { + Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> requestedObjects, + NetworkAddress node, long syncTimeout, long clientNonce) { this.startTime = UnixTime.now(); this.ctx = context; this.mode = mode; @@ -112,6 +114,7 @@ class Connection { this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); this.ivCache = new ConcurrentHashMap<>(); this.networkHandler = (DefaultNetworkHandler) ctx.getNetworkHandler(); + this.clientNonce = clientNonce; } public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, @@ -120,7 +123,7 @@ class Connection { new HashSet<InventoryVector>(), new HashSet<InventoryVector>(), new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), - timeoutInSeconds); + timeoutInSeconds, cryptography().randomNonce()); } public long getStartTime() { @@ -362,7 +365,7 @@ class Connection { try (Socket socket = Connection.this.socket) { initSocket(socket); if (mode == CLIENT || mode == SYNC) { - send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build()); } while (state != DISCONNECTED) { if (mode != SYNC) { @@ -446,7 +449,7 @@ class Connection { send(new VerAck()); switch (mode) { case SERVER: - send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build()); break; case CLIENT: case SYNC: diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java index 1163d1c..c25a31c 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java @@ -38,15 +38,17 @@ public class ConnectionOrganizer implements Runnable { private final InternalContext ctx; private final DefaultNetworkHandler networkHandler; private final NetworkHandler.MessageListener listener; + private final long clientNonce; private Connection initialConnection; public ConnectionOrganizer(InternalContext ctx, DefaultNetworkHandler networkHandler, - NetworkHandler.MessageListener listener) { + NetworkHandler.MessageListener listener, long clientNonce) { this.ctx = ctx; this.networkHandler = networkHandler; this.listener = listener; + this.clientNonce = clientNonce; } @Override @@ -91,7 +93,8 @@ public class ConnectionOrganizer implements Runnable { NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); boolean first = active == 0 && initialConnection == null; for (NetworkAddress address : addresses) { - Connection c = new Connection(ctx, CLIENT, address, listener, networkHandler.requestedObjects); + Connection c = new Connection(ctx, CLIENT, address, listener, + networkHandler.requestedObjects, clientNonce); if (first) { initialConnection = c; first = false; diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index 289aa79..635aed2 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -28,7 +28,6 @@ import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.utils.Collections; import ch.dissem.bitmessage.utils.Property; -import ch.dissem.bitmessage.utils.ThreadFactoryBuilder; import java.io.IOException; import java.net.InetAddress; @@ -109,9 +108,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { try { running = true; connections.clear(); - server = new ServerRunnable(ctx, this, listener); + server = new ServerRunnable(ctx, this, listener, ctx.getClientNonce()); pool.execute(server); - pool.execute(new ConnectionOrganizer(ctx, this, listener)); + pool.execute(new ConnectionOrganizer(ctx, this, listener, ctx.getClientNonce())); } catch (IOException e) { throw new ApplicationException(e); } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java index bf6d6f6..5a866b9 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java @@ -37,12 +37,15 @@ public class ServerRunnable implements Runnable, Closeable { private final ServerSocket serverSocket; private final DefaultNetworkHandler networkHandler; private final NetworkHandler.MessageListener listener; + private final long clientNonce; - public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, NetworkHandler.MessageListener listener) throws IOException { + public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, + NetworkHandler.MessageListener listener, long clientNonce) throws IOException { this.ctx = ctx; this.networkHandler = networkHandler; this.listener = listener; this.serverSocket = new ServerSocket(ctx.getPort()); + this.clientNonce = clientNonce; } @Override @@ -52,7 +55,7 @@ public class ServerRunnable implements Runnable, Closeable { Socket socket = serverSocket.accept(); socket.setSoTimeout(Connection.READ_TIMEOUT); networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener, - networkHandler.requestedObjects)); + networkHandler.requestedObjects, clientNonce)); } catch (IOException e) { LOG.debug(e.getMessage(), e); } diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java index ffc639f..7ac42f9 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java @@ -24,6 +24,8 @@ import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.ports.MessageRepository; import ch.dissem.bitmessage.utils.Strings; +import ch.dissem.bitmessage.utils.TTL; +import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,13 +167,20 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito return find("sender='" + sender.getAddress() + "'"); } + @Override + public List<Plaintext> findMessagesToResend() { + return find("status='" + Plaintext.Status.SENT.name() + "'" + + " AND next_try < " + UnixTime.now()); + } + private List<Plaintext> find(String where) { List<Plaintext> result = new LinkedList<>(); try ( Connection connection = config.getConnection(); Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, status " + - "FROM Message WHERE " + where) + ResultSet rs = stmt.executeQuery( + "SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try " + + "FROM Message WHERE " + where) ) { while (rs.next()) { byte[] iv = rs.getBytes("iv"); @@ -187,8 +196,13 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito builder.sent(rs.getLong("sent")); builder.received(rs.getLong("received")); builder.status(Plaintext.Status.valueOf(rs.getString("status"))); + builder.ttl(rs.getLong("ttl")); + builder.retries(rs.getInt("retries")); + builder.nextTry(rs.getLong("next_try")); builder.labels(findLabels(connection, id)); - result.add(builder.build()); + Plaintext message = builder.build(); + message.setInitialHash(rs.getBytes("initial_hash")); + result.add(message); } } catch (IOException | SQLException e) { LOG.error(e.getMessage(), e); @@ -265,8 +279,9 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito private void insert(Connection connection, Plaintext message) throws SQLException, IOException { try (PreparedStatement ps = connection.prepareStatement( - "INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, status, initial_hash) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, " + + "status, initial_hash, ttl, retries, next_try) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS) ) { ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash()); @@ -279,6 +294,9 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito ps.setLong(8, message.getReceived()); ps.setString(9, message.getStatus() == null ? null : message.getStatus().name()); ps.setBytes(10, message.getInitialHash()); + ps.setLong(11, message.getTTL()); + ps.setInt(12, message.getRetries()); + ps.setObject(13, message.getNextTry()); ps.executeUpdate(); // get generated id @@ -291,13 +309,23 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito private void update(Connection connection, Plaintext message) throws SQLException, IOException { try (PreparedStatement ps = connection.prepareStatement( - "UPDATE Message SET iv=?, sent=?, received=?, status=?, initial_hash=? WHERE id=?")) { + "UPDATE Message SET iv=?, type=?, sender=?, recipient=?, data=?, ack_data=?, sent=?, received=?, " + + "status=?, initial_hash=?, ttl=?, retries=?, next_try=? " + + "WHERE id=?")) { ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash()); - ps.setLong(2, message.getSent()); - ps.setLong(3, message.getReceived()); - ps.setString(4, message.getStatus() == null ? null : message.getStatus().name()); - ps.setBytes(5, message.getInitialHash()); - ps.setLong(6, (Long) message.getId()); + ps.setString(2, message.getType().name()); + ps.setString(3, message.getFrom().getAddress()); + ps.setString(4, message.getTo() == null ? null : message.getTo().getAddress()); + writeBlob(ps, 5, message); + ps.setBytes(6, message.getAckData()); + ps.setLong(7, message.getSent()); + ps.setLong(8, message.getReceived()); + ps.setString(9, message.getStatus() == null ? null : message.getStatus().name()); + ps.setBytes(10, message.getInitialHash()); + ps.setLong(11, message.getTTL()); + ps.setInt(12, message.getRetries()); + ps.setObject(13, message.getNextTry()); + ps.setLong(14, (Long) message.getId()); ps.executeUpdate(); } } diff --git a/repositories/src/main/resources/db/migration/V3.2__Update_table_message.sql b/repositories/src/main/resources/db/migration/V3.2__Update_table_message.sql index 8b43b0a..38847cf 100644 --- a/repositories/src/main/resources/db/migration/V3.2__Update_table_message.sql +++ b/repositories/src/main/resources/db/migration/V3.2__Update_table_message.sql @@ -1 +1,4 @@ ALTER TABLE Message ADD COLUMN ack_data BINARY(32); +ALTER TABLE Message ADD COLUMN ttl BIGINT NOT NULL; +ALTER TABLE Message ADD COLUMN retries INT NOT NULL DEFAULT 0; +ALTER TABLE Message ADD COLUMN next_try BIGINT; diff --git a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcMessageRepositoryTest.java b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcMessageRepositoryTest.java index 81051de..df8a42e 100644 --- a/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcMessageRepositoryTest.java +++ b/repositories/src/test/java/ch/dissem/bitmessage/repository/JdbcMessageRepositoryTest.java @@ -19,6 +19,7 @@ package ch.dissem.bitmessage.repository; import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.entity.BitmessageAddress; +import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.Label; @@ -32,6 +33,7 @@ import java.util.Arrays; import java.util.List; import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; +import static ch.dissem.bitmessage.entity.payload.Pubkey.Feature.DOES_ACK; import static ch.dissem.bitmessage.utils.Singleton.cryptography; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.*; @@ -59,14 +61,14 @@ public class JdbcMessageRepositoryTest extends TestBase { .messageRepo(repo) ); - BitmessageAddress tmp = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000)); + BitmessageAddress tmp = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000, DOES_ACK)); contactA = new BitmessageAddress(tmp.getAddress()); contactA.setPubkey(tmp.getPubkey()); addressRepo.save(contactA); contactB = new BitmessageAddress("BM-2cTtkBnb4BUYDndTKun6D9PjtueP2h1bQj"); addressRepo.save(contactB); - identity = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000)); + identity = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000, DOES_ACK)); addressRepo.save(identity); inbox = repo.getLabels(Label.Type.INBOX).get(0); @@ -123,6 +125,18 @@ public class JdbcMessageRepositoryTest extends TestBase { assertThat(other, is(message)); } + @Test + public void ensureAckMessageCanBeUpdatedAndRetrieved() { + byte[] initialHash = new byte[64]; + Plaintext message = repo.findMessages(contactA).get(0); + message.setInitialHash(initialHash); + ObjectMessage ackMessage = message.getAckMessage(); + repo.save(message); + Plaintext other = repo.getMessage(initialHash); + assertThat(other, is(message)); + assertThat(other.getAckMessage(), is(ackMessage)); + } + @Test public void testFindMessagesByStatus() throws Exception { List<Plaintext> messages = repo.findMessages(Plaintext.Status.RECEIVED);