Acknowledgments are now returned, received, and the message (Plaintext object) updated

-> no logic to resend messages yet
This commit is contained in:
Christian Basler 2016-05-06 19:39:39 +02:00
parent de8f04e22a
commit 05d9ea93d2
12 changed files with 79 additions and 39 deletions

View File

@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -71,16 +69,10 @@ public class BitmessageContext {
private BitmessageContext(Builder builder) {
ctx = new InternalContext(builder);
labeler = builder.labeler;
networkListener = new DefaultMessageListener(ctx, labeler, builder.listener);
sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation;
new Timer().schedule(new TimerTask() {
@Override
public void run() {
ctx.getProofOfWorkService().doMissingProofOfWork();
}
}, 30_000); // After 30 seconds
networkListener = new DefaultMessageListener(ctx, labeler, builder.listener);
sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation;
}
public AddressRepository addresses() {

View File

@ -48,9 +48,12 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
}
@Override
@SuppressWarnings("ConstantConditions")
public void receive(ObjectMessage object) throws IOException {
ObjectPayload payload = object.getPayload();
if (payload.getType() == null) return;
if (payload.getType() == null && payload instanceof GenericPayload) {
receive((GenericPayload) payload);
}
switch (payload.getType()) {
case GET_PUBKEY: {
@ -125,7 +128,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
if (!object.isSignatureValid(plaintext.getFrom().getPubkey())) {
LOG.warn("Msg with IV " + object.getInventoryVector() + " was successfully decrypted, but signature check failed. Ignoring.");
} else {
receive(object.getInventoryVector(), msg.getPlaintext());
receive(object.getInventoryVector(), plaintext);
}
break;
} catch (DecryptionFailedException ignore) {
@ -133,6 +136,16 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
}
}
protected void receive(GenericPayload ack) {
if (ack.getData().length == Msg.ACK_LENGTH) {
Plaintext msg = ctx.getMessageRepository().getMessageForAck(ack.getData());
if (msg != null) {
ctx.getLabeler().markAsAcknowledged(msg);
ctx.getMessageRepository().save(msg);
}
}
}
protected void receive(ObjectMessage object, Broadcast broadcast) throws IOException {
byte[] tag = broadcast instanceof V5Broadcast ? ((V5Broadcast) broadcast).getTag() : null;
for (BitmessageAddress subscription : ctx.getAddressRepository().getSubscriptions(broadcast.getVersion())) {

View File

@ -12,6 +12,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
@ -29,15 +31,22 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
private MessageRepository messageRepo;
public void doMissingProofOfWork() {
List<byte[]> items = powRepo.getItems();
final List<byte[]> items = powRepo.getItems();
if (items.isEmpty()) return;
// Wait for 30 seconds, to let the application start up before putting heavy load on the CPU
new Timer().schedule(new TimerTask() {
@Override
public void run() {
LOG.info("Doing POW for " + items.size() + " tasks.");
for (byte[] initialHash : items) {
Item item = powRepo.getItem(initialHash);
cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes, this);
cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes,
ProofOfWorkService.this);
}
}
}, 30_000);
}
public void doProofOfWork(ObjectMessage object) {
doProofOfWork(null, object);
@ -79,7 +88,6 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
messageRepo.save(plaintext);
}
ctx.getInventory().storeObject(object);
powRepo.removeObject(initialHash);
ctx.getNetworkHandler().offer(object.getInventoryVector());
} else {
item.message.getAckMessage().setNonce(nonce);
@ -96,6 +104,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
}
doProofOfWork(item.message.getTo(), object);
}
powRepo.removeObject(initialHash);
}
@Override

View File

@ -16,6 +16,7 @@
package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.payload.Msg;
import ch.dissem.bitmessage.entity.payload.Pubkey;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.Label;
@ -511,7 +512,7 @@ public class Plaintext implements Streamable {
to = new BitmessageAddress(0, 0, destinationRipe);
}
if (type == Type.MSG && ackMessage == null && ackData == null) {
ackData = cryptography().randomBytes(32);
ackData = cryptography().randomBytes(Msg.ACK_LENGTH);
}
return new Plaintext(this);
}

View File

@ -53,6 +53,10 @@ public class GenericPayload extends ObjectPayload {
return stream;
}
public byte[] getData() {
return data;
}
@Override
public void write(OutputStream stream) throws IOException {
stream.write(data);

View File

@ -33,6 +33,7 @@ import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
*/
public class Msg extends ObjectPayload implements Encrypted, PlaintextHolder {
private static final long serialVersionUID = 4327495048296365733L;
public static final int ACK_LENGTH = 32;
private long stream;
private CryptoBox encrypted;

View File

@ -23,6 +23,8 @@ import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.payload.*;
import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.utils.TTL;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -210,6 +212,6 @@ public class Factory {
if (plaintext == null || plaintext.getAckData() == null)
return null;
GenericPayload ack = new GenericPayload(3, plaintext.getFrom().getStream(), plaintext.getAckData());
return new ObjectMessage.Builder().objectType(MSG).payload(ack).build();
return new ObjectMessage.Builder().objectType(MSG).payload(ack).expiresTime(UnixTime.now(+TTL.msg())).build();
}
}

View File

@ -34,6 +34,8 @@ public interface MessageRepository {
Plaintext getMessage(byte[] initialHash);
Plaintext getMessageForAck(byte[] ackData);
List<Plaintext> findMessages(Label label);
List<Plaintext> findMessages(Status status);

View File

@ -5,11 +5,13 @@ import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.ports.DefaultLabeler;
import ch.dissem.bitmessage.ports.Labeler;
import ch.dissem.bitmessage.repository.*;
import ch.dissem.bitmessage.utils.TTL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,6 +22,7 @@ import static ch.dissem.bitmessage.entity.payload.Pubkey.Feature.DOES_ACK;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
/**
* @author Christian Basler
@ -29,6 +32,7 @@ public class SystemTest {
private BitmessageContext alice;
private TestListener aliceListener = new TestListener();
private Labeler aliceLabeler = Mockito.spy(new DebugLabeler("Alice"));
private BitmessageAddress aliceIdentity;
private BitmessageContext bob;
@ -53,7 +57,7 @@ public class SystemTest {
.networkHandler(new DefaultNetworkHandler())
.cryptography(new BouncyCryptography())
.listener(aliceListener)
.labeler(new DebugLabeler("Alice"))
.labeler(aliceLabeler)
.build();
alice.startup();
aliceIdentity = alice.createIdentity(false, DOES_ACK);
@ -93,6 +97,9 @@ public class SystemTest {
assertThat(plaintext.getType(), equalTo(Plaintext.Type.MSG));
assertThat(plaintext.getText(), equalTo(originalMessage));
Mockito.verify(aliceLabeler, Mockito.timeout(TimeUnit.MINUTES.toMillis(15)).atLeastOnce())
.markAsAcknowledged(any());
}
@Test

View File

@ -18,6 +18,7 @@ package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.entity.Streamable;
import ch.dissem.bitmessage.entity.payload.ObjectType;
import ch.dissem.bitmessage.exception.ApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,6 +26,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import static ch.dissem.bitmessage.utils.Strings.hex;
@ -85,4 +87,12 @@ public abstract class JdbcHelper {
ps.setBytes(parameterIndex, os.toByteArray());
}
}
protected <T> T single(Collection<T> collection) {
if (collection.size() > 1) {
throw new ApplicationException("This shouldn't happen, found " + collection.size() +
" messages, one or none was expected");
}
return collection.stream().findAny().orElse(null);
}
}

View File

@ -137,16 +137,12 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
@Override
public Plaintext getMessage(byte[] initialHash) {
List<Plaintext> plaintexts = find("initial_hash=X'" + Strings.hex(initialHash) + "'");
switch (plaintexts.size()) {
case 0:
return null;
case 1:
return plaintexts.get(0);
default:
throw new ApplicationException("This shouldn't happen, found " + plaintexts.size() +
" messages, one or none was expected");
return single(find("initial_hash=X'" + Strings.hex(initialHash) + "'"));
}
@Override
public Plaintext getMessageForAck(byte[] ackData) {
return single(find("ack_data=X'" + Strings.hex(ackData) + "' AND status='" + Plaintext.Status.SENT + "'"));
}
@Override
@ -174,7 +170,7 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
try (
Connection connection = config.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT id, iv, type, sender, recipient, data, sent, received, status " +
ResultSet rs = stmt.executeQuery("SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, status " +
"FROM Message WHERE " + where)
) {
while (rs.next()) {
@ -187,6 +183,7 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
builder.IV(new InventoryVector(iv));
builder.from(ctx.getAddressRepository().getAddress(rs.getString("sender")));
builder.to(ctx.getAddressRepository().getAddress(rs.getString("recipient")));
builder.ackData(rs.getBytes("ack_data"));
builder.sent(rs.getLong("sent"));
builder.received(rs.getLong("received"));
builder.status(Plaintext.Status.valueOf(rs.getString("status")));
@ -268,8 +265,8 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
private void insert(Connection connection, Plaintext message) throws SQLException, IOException {
try (PreparedStatement ps = connection.prepareStatement(
"INSERT INTO Message (iv, type, sender, recipient, data, sent, received, status, initial_hash) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
"INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, status, initial_hash) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
Statement.RETURN_GENERATED_KEYS)
) {
ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash());
@ -277,10 +274,11 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
ps.setString(3, message.getFrom().getAddress());
ps.setString(4, message.getTo() == null ? null : message.getTo().getAddress());
writeBlob(ps, 5, message);
ps.setLong(6, message.getSent());
ps.setLong(7, message.getReceived());
ps.setString(8, message.getStatus() == null ? null : message.getStatus().name());
ps.setBytes(9, message.getInitialHash());
ps.setBytes(6, message.getAckData());
ps.setLong(7, message.getSent());
ps.setLong(8, message.getReceived());
ps.setString(9, message.getStatus() == null ? null : message.getStatus().name());
ps.setBytes(10, message.getInitialHash());
ps.executeUpdate();
// get generated id

View File

@ -0,0 +1 @@
ALTER TABLE Message ADD COLUMN ack_data BINARY(32);