diff --git a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index 23e8eff..2307958 100644 --- a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.util.List; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -71,16 +69,10 @@ public class BitmessageContext { private BitmessageContext(Builder builder) { ctx = new InternalContext(builder); labeler = builder.labeler; + ctx.getProofOfWorkService().doMissingProofOfWork(); + networkListener = new DefaultMessageListener(ctx, labeler, builder.listener); - sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation; - - new Timer().schedule(new TimerTask() { - @Override - public void run() { - ctx.getProofOfWorkService().doMissingProofOfWork(); - } - }, 30_000); // After 30 seconds } public AddressRepository addresses() { diff --git a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java index eb27dd9..05961e4 100644 --- a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java +++ b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java @@ -48,9 +48,12 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { } @Override + @SuppressWarnings("ConstantConditions") public void receive(ObjectMessage object) throws IOException { ObjectPayload payload = object.getPayload(); - if (payload.getType() == null) return; + if (payload.getType() == null && payload instanceof GenericPayload) { + receive((GenericPayload) payload); + } switch (payload.getType()) { case GET_PUBKEY: { @@ -125,7 +128,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { if (!object.isSignatureValid(plaintext.getFrom().getPubkey())) { LOG.warn("Msg with IV " + object.getInventoryVector() + " was successfully decrypted, but signature check failed. Ignoring."); } else { - receive(object.getInventoryVector(), msg.getPlaintext()); + receive(object.getInventoryVector(), plaintext); } break; } catch (DecryptionFailedException ignore) { @@ -133,6 +136,16 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { } } + protected void receive(GenericPayload ack) { + if (ack.getData().length == Msg.ACK_LENGTH) { + Plaintext msg = ctx.getMessageRepository().getMessageForAck(ack.getData()); + if (msg != null) { + ctx.getLabeler().markAsAcknowledged(msg); + ctx.getMessageRepository().save(msg); + } + } + } + protected void receive(ObjectMessage object, Broadcast broadcast) throws IOException { byte[] tag = broadcast instanceof V5Broadcast ? ((V5Broadcast) broadcast).getTag() : null; for (BitmessageAddress subscription : ctx.getAddressRepository().getSubscriptions(broadcast.getVersion())) { diff --git a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java index e886c22..d242b79 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java +++ b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java @@ -12,6 +12,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES; import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE; @@ -29,14 +31,21 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC private MessageRepository messageRepo; public void doMissingProofOfWork() { - List items = powRepo.getItems(); + final List items = powRepo.getItems(); if (items.isEmpty()) return; - LOG.info("Doing POW for " + items.size() + " tasks."); - for (byte[] initialHash : items) { - Item item = powRepo.getItem(initialHash); - cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes, this); - } + // Wait for 30 seconds, to let the application start up before putting heavy load on the CPU + new Timer().schedule(new TimerTask() { + @Override + public void run() { + LOG.info("Doing POW for " + items.size() + " tasks."); + for (byte[] initialHash : items) { + Item item = powRepo.getItem(initialHash); + cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes, + ProofOfWorkService.this); + } + } + }, 30_000); } public void doProofOfWork(ObjectMessage object) { @@ -79,7 +88,6 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC messageRepo.save(plaintext); } ctx.getInventory().storeObject(object); - powRepo.removeObject(initialHash); ctx.getNetworkHandler().offer(object.getInventoryVector()); } else { item.message.getAckMessage().setNonce(nonce); @@ -96,6 +104,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC } doProofOfWork(item.message.getTo(), object); } + powRepo.removeObject(initialHash); } @Override diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java b/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java index a2b66f7..b90bf14 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/Plaintext.java @@ -16,6 +16,7 @@ package ch.dissem.bitmessage.entity; +import ch.dissem.bitmessage.entity.payload.Msg; import ch.dissem.bitmessage.entity.payload.Pubkey; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.Label; @@ -511,7 +512,7 @@ public class Plaintext implements Streamable { to = new BitmessageAddress(0, 0, destinationRipe); } if (type == Type.MSG && ackMessage == null && ackData == null) { - ackData = cryptography().randomBytes(32); + ackData = cryptography().randomBytes(Msg.ACK_LENGTH); } return new Plaintext(this); } diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java b/core/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java index 4ae4696..176d938 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java @@ -53,6 +53,10 @@ public class GenericPayload extends ObjectPayload { return stream; } + public byte[] getData() { + return data; + } + @Override public void write(OutputStream stream) throws IOException { stream.write(data); diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java b/core/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java index edc4643..64a010c 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java @@ -33,6 +33,7 @@ import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; */ public class Msg extends ObjectPayload implements Encrypted, PlaintextHolder { private static final long serialVersionUID = 4327495048296365733L; + public static final int ACK_LENGTH = 32; private long stream; private CryptoBox encrypted; diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java b/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java index 49db9d6..53e1676 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/Factory.java @@ -23,6 +23,8 @@ import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.valueobject.PrivateKey; import ch.dissem.bitmessage.exception.NodeException; +import ch.dissem.bitmessage.utils.TTL; +import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,6 +212,6 @@ public class Factory { if (plaintext == null || plaintext.getAckData() == null) return null; GenericPayload ack = new GenericPayload(3, plaintext.getFrom().getStream(), plaintext.getAckData()); - return new ObjectMessage.Builder().objectType(MSG).payload(ack).build(); + return new ObjectMessage.Builder().objectType(MSG).payload(ack).expiresTime(UnixTime.now(+TTL.msg())).build(); } } diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java b/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java index c4cff5c..0c46499 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java @@ -34,6 +34,8 @@ public interface MessageRepository { Plaintext getMessage(byte[] initialHash); + Plaintext getMessageForAck(byte[] ackData); + List findMessages(Label label); List<Plaintext> findMessages(Status status); diff --git a/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java b/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java index 6751819..03cbf5e 100644 --- a/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java +++ b/demo/src/test/java/ch/dissem/bitmessage/SystemTest.java @@ -5,11 +5,13 @@ import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.networking.DefaultNetworkHandler; import ch.dissem.bitmessage.ports.DefaultLabeler; +import ch.dissem.bitmessage.ports.Labeler; import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.utils.TTL; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +22,7 @@ import static ch.dissem.bitmessage.entity.payload.Pubkey.Feature.DOES_ACK; import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; /** * @author Christian Basler @@ -29,6 +32,7 @@ public class SystemTest { private BitmessageContext alice; private TestListener aliceListener = new TestListener(); + private Labeler aliceLabeler = Mockito.spy(new DebugLabeler("Alice")); private BitmessageAddress aliceIdentity; private BitmessageContext bob; @@ -53,7 +57,7 @@ public class SystemTest { .networkHandler(new DefaultNetworkHandler()) .cryptography(new BouncyCryptography()) .listener(aliceListener) - .labeler(new DebugLabeler("Alice")) + .labeler(aliceLabeler) .build(); alice.startup(); aliceIdentity = alice.createIdentity(false, DOES_ACK); @@ -93,6 +97,9 @@ public class SystemTest { assertThat(plaintext.getType(), equalTo(Plaintext.Type.MSG)); assertThat(plaintext.getText(), equalTo(originalMessage)); + + Mockito.verify(aliceLabeler, Mockito.timeout(TimeUnit.MINUTES.toMillis(15)).atLeastOnce()) + .markAsAcknowledged(any()); } @Test diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java index 1df12f6..beb4c80 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java @@ -18,6 +18,7 @@ package ch.dissem.bitmessage.repository; import ch.dissem.bitmessage.entity.Streamable; import ch.dissem.bitmessage.entity.payload.ObjectType; +import ch.dissem.bitmessage.exception.ApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Collection; import static ch.dissem.bitmessage.utils.Strings.hex; @@ -85,4 +87,12 @@ public abstract class JdbcHelper { ps.setBytes(parameterIndex, os.toByteArray()); } } + + protected <T> T single(Collection<T> collection) { + if (collection.size() > 1) { + throw new ApplicationException("This shouldn't happen, found " + collection.size() + + " messages, one or none was expected"); + } + return collection.stream().findAny().orElse(null); + } } diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java index ae1b496..ffc639f 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java @@ -137,16 +137,12 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito @Override public Plaintext getMessage(byte[] initialHash) { - List<Plaintext> plaintexts = find("initial_hash=X'" + Strings.hex(initialHash) + "'"); - switch (plaintexts.size()) { - case 0: - return null; - case 1: - return plaintexts.get(0); - default: - throw new ApplicationException("This shouldn't happen, found " + plaintexts.size() + - " messages, one or none was expected"); - } + return single(find("initial_hash=X'" + Strings.hex(initialHash) + "'")); + } + + @Override + public Plaintext getMessageForAck(byte[] ackData) { + return single(find("ack_data=X'" + Strings.hex(ackData) + "' AND status='" + Plaintext.Status.SENT + "'")); } @Override @@ -174,7 +170,7 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito try ( Connection connection = config.getConnection(); Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT id, iv, type, sender, recipient, data, sent, received, status " + + ResultSet rs = stmt.executeQuery("SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, status " + "FROM Message WHERE " + where) ) { while (rs.next()) { @@ -187,6 +183,7 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito builder.IV(new InventoryVector(iv)); builder.from(ctx.getAddressRepository().getAddress(rs.getString("sender"))); builder.to(ctx.getAddressRepository().getAddress(rs.getString("recipient"))); + builder.ackData(rs.getBytes("ack_data")); builder.sent(rs.getLong("sent")); builder.received(rs.getLong("received")); builder.status(Plaintext.Status.valueOf(rs.getString("status"))); @@ -268,8 +265,8 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito private void insert(Connection connection, Plaintext message) throws SQLException, IOException { try (PreparedStatement ps = connection.prepareStatement( - "INSERT INTO Message (iv, type, sender, recipient, data, sent, received, status, initial_hash) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, status, initial_hash) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS) ) { ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash()); @@ -277,10 +274,11 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito ps.setString(3, message.getFrom().getAddress()); ps.setString(4, message.getTo() == null ? null : message.getTo().getAddress()); writeBlob(ps, 5, message); - ps.setLong(6, message.getSent()); - ps.setLong(7, message.getReceived()); - ps.setString(8, message.getStatus() == null ? null : message.getStatus().name()); - ps.setBytes(9, message.getInitialHash()); + ps.setBytes(6, message.getAckData()); + ps.setLong(7, message.getSent()); + ps.setLong(8, message.getReceived()); + ps.setString(9, message.getStatus() == null ? null : message.getStatus().name()); + ps.setBytes(10, message.getInitialHash()); ps.executeUpdate(); // get generated id diff --git a/repositories/src/main/resources/db/migration/V3.2__Update_table_message.sql b/repositories/src/main/resources/db/migration/V3.2__Update_table_message.sql new file mode 100644 index 0000000..8b43b0a --- /dev/null +++ b/repositories/src/main/resources/db/migration/V3.2__Update_table_message.sql @@ -0,0 +1 @@ +ALTER TABLE Message ADD COLUMN ack_data BINARY(32);