This breaks a lot of things, but it seems necessary. Implemented the resending mechanism and fixed many problems on the way, but tests and triggers are still to do.

This commit is contained in:
Christian Basler 2016-05-20 07:32:41 +02:00
parent e44dd967d0
commit 43f42dd400
17 changed files with 202 additions and 52 deletions

View File

@ -167,13 +167,13 @@ public class BitmessageContext {
LOG.info("Sending message."); LOG.info("Sending message.");
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);
if (msg.getType() == MSG) { if (msg.getType() == MSG) {
ctx.send(msg, TTL.msg()); ctx.send(msg);
} else { } else {
ctx.send( ctx.send(
msg.getFrom(), msg.getFrom(),
to, to,
Factory.getBroadcast(msg), Factory.getBroadcast(msg),
TTL.msg() msg.getTTL()
); );
} }
} }

View File

@ -51,9 +51,12 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
public void receive(ObjectMessage object) throws IOException { public void receive(ObjectMessage object) throws IOException {
ObjectPayload payload = object.getPayload(); ObjectPayload payload = object.getPayload();
if (payload.getType() == null && payload instanceof GenericPayload) { if (payload.getType() == null) {
if (payload instanceof GenericPayload) {
receive((GenericPayload) payload); receive((GenericPayload) payload);
} }
return;
}
switch (payload.getType()) { switch (payload.getType()) {
case GET_PUBKEY: { case GET_PUBKEY: {
@ -115,7 +118,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
for (Plaintext msg : messages) { for (Plaintext msg : messages) {
ctx.getLabeler().markAsSending(msg); ctx.getLabeler().markAsSending(msg);
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);
ctx.send(msg, TTL.msg()); ctx.send(msg);
} }
} }

View File

@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.TreeSet; import java.util.TreeSet;
/** /**
@ -160,12 +161,13 @@ public class InternalContext {
return port; return port;
} }
public void send(final Plaintext plaintext, final long timeToLive) { public void send(final Plaintext plaintext) {
if (plaintext.getAckMessage() != null) { if (plaintext.getAckMessage() != null) {
long expires = UnixTime.now(+timeToLive); long expires = UnixTime.now(+plaintext.getTTL());
LOG.info("Expires at " + expires);
proofOfWorkService.doProofOfWorkWithAck(plaintext, expires); proofOfWorkService.doProofOfWorkWithAck(plaintext, expires);
} else { } else {
send(plaintext.getFrom(), plaintext.getTo(), new Msg(plaintext), timeToLive); send(plaintext.getFrom(), plaintext.getTo(), new Msg(plaintext), plaintext.getTTL());
} }
} }
@ -278,6 +280,14 @@ public class InternalContext {
} }
} }
public void resendUnacknowledged() {
List<Plaintext> messages = messageRepository.findMessagesToResend();
for (Plaintext message : messages) {
send(message);
messageRepository.save(message);
}
}
public long getClientNonce() { public long getClientNonce() {
return clientNonce; return clientNonce;
} }

View File

@ -69,6 +69,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
public void doProofOfWorkWithAck(Plaintext plaintext, long expirationTime) { public void doProofOfWorkWithAck(Plaintext plaintext, long expirationTime) {
final ObjectMessage ack = plaintext.getAckMessage(); final ObjectMessage ack = plaintext.getAckMessage();
messageRepo.save(plaintext);
Item item = new Item(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, Item item = new Item(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES,
expirationTime, plaintext); expirationTime, plaintext);
powRepo.putObject(item); powRepo.putObject(item);
@ -84,6 +85,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
Plaintext plaintext = messageRepo.getMessage(initialHash); Plaintext plaintext = messageRepo.getMessage(initialHash);
if (plaintext != null) { if (plaintext != null) {
plaintext.setInventoryVector(object.getInventoryVector()); plaintext.setInventoryVector(object.getInventoryVector());
plaintext.updateNextTry();
ctx.getLabeler().markAsSent(plaintext); ctx.getLabeler().markAsSent(plaintext);
messageRepo.save(plaintext); messageRepo.save(plaintext);
} }

View File

@ -17,13 +17,14 @@
package ch.dissem.bitmessage.entity; package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.payload.Msg; import ch.dissem.bitmessage.entity.payload.Msg;
import ch.dissem.bitmessage.entity.payload.Pubkey; import ch.dissem.bitmessage.entity.payload.Pubkey.Feature;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.utils.Decode; import ch.dissem.bitmessage.utils.Decode;
import ch.dissem.bitmessage.utils.Encode; import ch.dissem.bitmessage.utils.Encode;
import ch.dissem.bitmessage.utils.TTL;
import ch.dissem.bitmessage.utils.UnixTime; import ch.dissem.bitmessage.utils.UnixTime;
import java.io.*; import java.io.*;
@ -54,6 +55,10 @@ public class Plaintext implements Streamable {
private Set<Label> labels; private Set<Label> labels;
private byte[] initialHash; private byte[] initialHash;
private long ttl;
private int retries;
private Long nextTry;
private Plaintext(Builder builder) { private Plaintext(Builder builder) {
id = builder.id; id = builder.id;
inventoryVector = builder.inventoryVector; inventoryVector = builder.inventoryVector;
@ -74,6 +79,9 @@ public class Plaintext implements Streamable {
sent = builder.sent; sent = builder.sent;
received = builder.received; received = builder.received;
labels = builder.labels; labels = builder.labels;
ttl = builder.ttl;
retries = builder.retries;
nextTry = builder.nextTry;
} }
public static Plaintext read(Type type, InputStream in) throws IOException { public static Plaintext read(Type type, InputStream in) throws IOException {
@ -96,7 +104,7 @@ public class Plaintext implements Streamable {
.destinationRipe(type == Type.MSG ? Decode.bytes(in, 20) : null) .destinationRipe(type == Type.MSG ? Decode.bytes(in, 20) : null)
.encoding(Decode.varInt(in)) .encoding(Decode.varInt(in))
.message(Decode.varBytes(in)) .message(Decode.varBytes(in))
.ack(type == Type.MSG ? Decode.varBytes(in) : null); .ackMessage(type == Type.MSG ? Decode.varBytes(in) : null);
} }
public InventoryVector getInventoryVector() { public InventoryVector getInventoryVector() {
@ -174,12 +182,10 @@ public class Plaintext implements Streamable {
Encode.varInt(message.length, out); Encode.varInt(message.length, out);
out.write(message); out.write(message);
if (type == Type.MSG) { if (type == Type.MSG) {
if (to.has(Pubkey.Feature.DOES_ACK) && getAckMessage() != null) { if (to.has(Feature.DOES_ACK) && getAckMessage() != null) {
ByteArrayOutputStream ack = new ByteArrayOutputStream(); ByteArrayOutputStream ack = new ByteArrayOutputStream();
getAckMessage().write(ack); getAckMessage().write(ack);
byte[] data = ack.toByteArray(); Encode.varBytes(ack.toByteArray(), out);
Encode.varInt(data.length, out);
out.write(data);
} else { } else {
Encode.varInt(0, out); Encode.varInt(0, out);
} }
@ -224,6 +230,30 @@ public class Plaintext implements Streamable {
this.status = status; this.status = status;
} }
public long getTTL() {
return ttl;
}
public int getRetries() {
return retries;
}
public Long getNextTry() {
return nextTry;
}
public void updateNextTry() {
if (nextTry == null) {
if (sent != null && to.has(Feature.DOES_ACK)) {
nextTry = sent + ttl;
retries++;
}
} else {
nextTry = nextTry + (1 << retries) * ttl;
retries++;
}
}
public String getSubject() { public String getSubject() {
Scanner s = new Scanner(new ByteArrayInputStream(message), "UTF-8"); Scanner s = new Scanner(new ByteArrayInputStream(message), "UTF-8");
String firstLine = s.nextLine(); String firstLine = s.nextLine();
@ -272,9 +302,7 @@ public class Plaintext implements Streamable {
public void addLabels(Label... labels) { public void addLabels(Label... labels) {
if (labels != null) { if (labels != null) {
for (Label label : labels) { Collections.addAll(this.labels, labels);
this.labels.add(label);
}
} }
} }
@ -366,6 +394,9 @@ public class Plaintext implements Streamable {
private long received; private long received;
private Status status; private Status status;
private Set<Label> labels = new HashSet<>(); private Set<Label> labels = new HashSet<>();
private long ttl;
private int retries;
private Long nextTry;
public Builder(Type type) { public Builder(Type type) {
this.type = type; this.type = type;
@ -459,14 +490,15 @@ public class Plaintext implements Streamable {
return this; return this;
} }
public Builder ack(byte[] ack) { public Builder ackMessage(byte[] ack) {
if (type != Type.MSG && ack != null) throw new IllegalArgumentException("ack only allowed for msg"); if (type != Type.MSG && ack != null) throw new IllegalArgumentException("ackMessage only allowed for msg");
this.ackMessage = ack; this.ackMessage = ack;
return this; return this;
} }
public Builder ackData(byte[] ackData) { public Builder ackData(byte[] ackData) {
if (type != Type.MSG && ackData != null) throw new IllegalArgumentException("ack only allowed for msg"); if (type != Type.MSG && ackData != null)
throw new IllegalArgumentException("ackMessage only allowed for msg");
this.ackData = ackData; this.ackData = ackData;
return this; return this;
} }
@ -496,6 +528,21 @@ public class Plaintext implements Streamable {
return this; return this;
} }
public Builder ttl(long ttl) {
this.ttl = ttl;
return this;
}
public Builder retries(int retries) {
this.retries = retries;
return this;
}
public Builder nextTry(Long nextTry) {
this.nextTry = nextTry;
return this;
}
public Plaintext build() { public Plaintext build() {
if (from == null) { if (from == null) {
from = new BitmessageAddress(Factory.createPubkey( from = new BitmessageAddress(Factory.createPubkey(
@ -514,6 +561,9 @@ public class Plaintext implements Streamable {
if (type == Type.MSG && ackMessage == null && ackData == null) { if (type == Type.MSG && ackMessage == null && ackData == null) {
ackData = cryptography().randomBytes(Msg.ACK_LENGTH); ackData = cryptography().randomBytes(Msg.ACK_LENGTH);
} }
if (ttl <= 0) {
ttl = TTL.msg();
}
return new Plaintext(this); return new Plaintext(this);
} }
} }

View File

@ -145,13 +145,13 @@ public class Version implements MessagePayload {
private String userAgent; private String userAgent;
private long[] streamNumbers; private long[] streamNumbers;
public Builder defaults() { public Builder defaults(long clientNonce) {
version = BitmessageContext.CURRENT_VERSION; version = BitmessageContext.CURRENT_VERSION;
services = 1; services = 1;
timestamp = UnixTime.now(); timestamp = UnixTime.now();
nonce = new Random().nextInt();
userAgent = "/Jabit:0.0.1/"; userAgent = "/Jabit:0.0.1/";
streamNumbers = new long[]{1}; streamNumbers = new long[]{1};
nonce = clientNonce;
return this; return this;
} }

View File

@ -212,6 +212,6 @@ public class Factory {
if (plaintext == null || plaintext.getAckData() == null) if (plaintext == null || plaintext.getAckData() == null)
return null; return null;
GenericPayload ack = new GenericPayload(3, plaintext.getFrom().getStream(), plaintext.getAckData()); GenericPayload ack = new GenericPayload(3, plaintext.getFrom().getStream(), plaintext.getAckData());
return new ObjectMessage.Builder().objectType(MSG).payload(ack).expiresTime(UnixTime.now(+TTL.msg())).build(); return new ObjectMessage.Builder().objectType(MSG).payload(ack).expiresTime(UnixTime.now(plaintext.getTTL())).build();
} }
} }

View File

@ -44,6 +44,8 @@ public interface MessageRepository {
List<Plaintext> findMessages(BitmessageAddress sender); List<Plaintext> findMessages(BitmessageAddress sender);
List<Plaintext> findMessagesToResend();
void save(Plaintext message); void save(Plaintext message);
void remove(Plaintext message); void remove(Plaintext message);

View File

@ -82,7 +82,7 @@ public class SerializationTest extends TestBase {
.from(TestUtils.loadIdentity("BM-2cSqjfJ8xK6UUn5Rw3RpdGQ9RsDkBhWnS8")) .from(TestUtils.loadIdentity("BM-2cSqjfJ8xK6UUn5Rw3RpdGQ9RsDkBhWnS8"))
.to(TestUtils.loadContact()) .to(TestUtils.loadContact())
.message("Subject", "Message") .message("Subject", "Message")
.ackData("ack".getBytes()) .ackData("ackMessage".getBytes())
.signature(new byte[0]) .signature(new byte[0])
.build(); .build();
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -98,6 +98,32 @@ public class SerializationTest extends TestBase {
assertEquals(p1, p2); assertEquals(p1, p2);
} }
@Test
public void ensurePlaintextWithAckMessageIsSerializedAndDeserializedCorrectly() throws Exception {
Plaintext p1 = new Plaintext.Builder(MSG)
.from(TestUtils.loadIdentity("BM-2cSqjfJ8xK6UUn5Rw3RpdGQ9RsDkBhWnS8"))
.to(TestUtils.loadContact())
.message("Subject", "Message")
.ackData("ackMessage".getBytes())
.signature(new byte[0])
.build();
ObjectMessage ackMessage1 = p1.getAckMessage();
assertNotNull(ackMessage1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
p1.write(out);
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
Plaintext p2 = Plaintext.read(MSG, in);
// Received is automatically set on deserialization, so we'll need to set it to 0
Field received = Plaintext.class.getDeclaredField("received");
received.setAccessible(true);
received.set(p2, 0L);
assertEquals(p1, p2);
assertEquals(ackMessage1, p2.getAckMessage());
}
@Test @Test
public void ensureNetworkMessageIsSerializedAndDeserializedCorrectly() throws Exception { public void ensureNetworkMessageIsSerializedAndDeserializedCorrectly() throws Exception {
ArrayList<InventoryVector> ivs = new ArrayList<>(50000); ArrayList<InventoryVector> ivs = new ArrayList<>(50000);

View File

@ -34,6 +34,10 @@ public class NodeRegistryTest {
assertThat(registry.getKnownAddresses(10), empty()); assertThat(registry.getKnownAddresses(10), empty());
} }
/**
* Please note that this test fails if there is no internet connection,
* as the initial nodes' IP addresses are determined by DNS lookup.
*/
@Test @Test
public void ensureGetKnownNodesForStream1YieldsResult() { public void ensureGetKnownNodesForStream1YieldsResult() {
assertThat(registry.getKnownAddresses(10, 1), hasSize(1)); assertThat(registry.getKnownAddresses(10, 1), hasSize(1));

View File

@ -72,6 +72,7 @@ class Connection {
private final ReaderRunnable reader = new ReaderRunnable(); private final ReaderRunnable reader = new ReaderRunnable();
private final WriterRunnable writer = new WriterRunnable(); private final WriterRunnable writer = new WriterRunnable();
private final DefaultNetworkHandler networkHandler; private final DefaultNetworkHandler networkHandler;
private final long clientNonce;
private volatile State state; private volatile State state;
private InputStream in; private InputStream in;
@ -83,22 +84,23 @@ class Connection {
private long lastObjectTime; private long lastObjectTime;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
Set<InventoryVector> requestedObjectsMap) throws IOException { Set<InventoryVector> requestedObjectsMap, long clientNonce) throws IOException {
this(context, mode, listener, socket, requestedObjectsMap, this(context, mode, listener, socket, requestedObjectsMap,
Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)), Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)),
new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(),
0); 0, clientNonce);
} }
public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener,
Set<InventoryVector> requestedObjectsMap) { Set<InventoryVector> requestedObjectsMap, long clientNonce) {
this(context, mode, listener, new Socket(), requestedObjectsMap, this(context, mode, listener, new Socket(), requestedObjectsMap,
Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)), Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)),
node, 0); node, 0, clientNonce);
} }
private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket,
Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> requestedObjects, NetworkAddress node, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> requestedObjects,
NetworkAddress node, long syncTimeout, long clientNonce) {
this.startTime = UnixTime.now(); this.startTime = UnixTime.now();
this.ctx = context; this.ctx = context;
this.mode = mode; this.mode = mode;
@ -112,6 +114,7 @@ class Connection {
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
this.ivCache = new ConcurrentHashMap<>(); this.ivCache = new ConcurrentHashMap<>();
this.networkHandler = (DefaultNetworkHandler) ctx.getNetworkHandler(); this.networkHandler = (DefaultNetworkHandler) ctx.getNetworkHandler();
this.clientNonce = clientNonce;
} }
public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener,
@ -120,7 +123,7 @@ class Connection {
new HashSet<InventoryVector>(), new HashSet<InventoryVector>(),
new HashSet<InventoryVector>(), new HashSet<InventoryVector>(),
new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), new NetworkAddress.Builder().ip(address).port(port).stream(1).build(),
timeoutInSeconds); timeoutInSeconds, cryptography().randomNonce());
} }
public long getStartTime() { public long getStartTime() {
@ -362,7 +365,7 @@ class Connection {
try (Socket socket = Connection.this.socket) { try (Socket socket = Connection.this.socket) {
initSocket(socket); initSocket(socket);
if (mode == CLIENT || mode == SYNC) { if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build());
} }
while (state != DISCONNECTED) { while (state != DISCONNECTED) {
if (mode != SYNC) { if (mode != SYNC) {
@ -446,7 +449,7 @@ class Connection {
send(new VerAck()); send(new VerAck());
switch (mode) { switch (mode) {
case SERVER: case SERVER:
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build());
break; break;
case CLIENT: case CLIENT:
case SYNC: case SYNC:

View File

@ -38,15 +38,17 @@ public class ConnectionOrganizer implements Runnable {
private final InternalContext ctx; private final InternalContext ctx;
private final DefaultNetworkHandler networkHandler; private final DefaultNetworkHandler networkHandler;
private final NetworkHandler.MessageListener listener; private final NetworkHandler.MessageListener listener;
private final long clientNonce;
private Connection initialConnection; private Connection initialConnection;
public ConnectionOrganizer(InternalContext ctx, public ConnectionOrganizer(InternalContext ctx,
DefaultNetworkHandler networkHandler, DefaultNetworkHandler networkHandler,
NetworkHandler.MessageListener listener) { NetworkHandler.MessageListener listener, long clientNonce) {
this.ctx = ctx; this.ctx = ctx;
this.networkHandler = networkHandler; this.networkHandler = networkHandler;
this.listener = listener; this.listener = listener;
this.clientNonce = clientNonce;
} }
@Override @Override
@ -91,7 +93,8 @@ public class ConnectionOrganizer implements Runnable {
NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
boolean first = active == 0 && initialConnection == null; boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
Connection c = new Connection(ctx, CLIENT, address, listener, networkHandler.requestedObjects); Connection c = new Connection(ctx, CLIENT, address, listener,
networkHandler.requestedObjects, clientNonce);
if (first) { if (first) {
initialConnection = c; initialConnection = c;
first = false; first = false;

View File

@ -28,7 +28,6 @@ import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.Collections; import ch.dissem.bitmessage.utils.Collections;
import ch.dissem.bitmessage.utils.Property; import ch.dissem.bitmessage.utils.Property;
import ch.dissem.bitmessage.utils.ThreadFactoryBuilder;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -109,9 +108,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
try { try {
running = true; running = true;
connections.clear(); connections.clear();
server = new ServerRunnable(ctx, this, listener); server = new ServerRunnable(ctx, this, listener, ctx.getClientNonce());
pool.execute(server); pool.execute(server);
pool.execute(new ConnectionOrganizer(ctx, this, listener)); pool.execute(new ConnectionOrganizer(ctx, this, listener, ctx.getClientNonce()));
} catch (IOException e) { } catch (IOException e) {
throw new ApplicationException(e); throw new ApplicationException(e);
} }

View File

@ -37,12 +37,15 @@ public class ServerRunnable implements Runnable, Closeable {
private final ServerSocket serverSocket; private final ServerSocket serverSocket;
private final DefaultNetworkHandler networkHandler; private final DefaultNetworkHandler networkHandler;
private final NetworkHandler.MessageListener listener; private final NetworkHandler.MessageListener listener;
private final long clientNonce;
public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, NetworkHandler.MessageListener listener) throws IOException { public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler,
NetworkHandler.MessageListener listener, long clientNonce) throws IOException {
this.ctx = ctx; this.ctx = ctx;
this.networkHandler = networkHandler; this.networkHandler = networkHandler;
this.listener = listener; this.listener = listener;
this.serverSocket = new ServerSocket(ctx.getPort()); this.serverSocket = new ServerSocket(ctx.getPort());
this.clientNonce = clientNonce;
} }
@Override @Override
@ -52,7 +55,7 @@ public class ServerRunnable implements Runnable, Closeable {
Socket socket = serverSocket.accept(); Socket socket = serverSocket.accept();
socket.setSoTimeout(Connection.READ_TIMEOUT); socket.setSoTimeout(Connection.READ_TIMEOUT);
networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener, networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener,
networkHandler.requestedObjects)); networkHandler.requestedObjects, clientNonce));
} catch (IOException e) { } catch (IOException e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);
} }

View File

@ -24,6 +24,8 @@ import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.ports.MessageRepository; import ch.dissem.bitmessage.ports.MessageRepository;
import ch.dissem.bitmessage.utils.Strings; import ch.dissem.bitmessage.utils.Strings;
import ch.dissem.bitmessage.utils.TTL;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -165,12 +167,19 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
return find("sender='" + sender.getAddress() + "'"); return find("sender='" + sender.getAddress() + "'");
} }
@Override
public List<Plaintext> findMessagesToResend() {
return find("status='" + Plaintext.Status.SENT.name() + "'" +
" AND next_try < " + UnixTime.now());
}
private List<Plaintext> find(String where) { private List<Plaintext> find(String where) {
List<Plaintext> result = new LinkedList<>(); List<Plaintext> result = new LinkedList<>();
try ( try (
Connection connection = config.getConnection(); Connection connection = config.getConnection();
Statement stmt = connection.createStatement(); Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, status " + ResultSet rs = stmt.executeQuery(
"SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try " +
"FROM Message WHERE " + where) "FROM Message WHERE " + where)
) { ) {
while (rs.next()) { while (rs.next()) {
@ -187,8 +196,13 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
builder.sent(rs.getLong("sent")); builder.sent(rs.getLong("sent"));
builder.received(rs.getLong("received")); builder.received(rs.getLong("received"));
builder.status(Plaintext.Status.valueOf(rs.getString("status"))); builder.status(Plaintext.Status.valueOf(rs.getString("status")));
builder.ttl(rs.getLong("ttl"));
builder.retries(rs.getInt("retries"));
builder.nextTry(rs.getLong("next_try"));
builder.labels(findLabels(connection, id)); builder.labels(findLabels(connection, id));
result.add(builder.build()); Plaintext message = builder.build();
message.setInitialHash(rs.getBytes("initial_hash"));
result.add(message);
} }
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
@ -265,8 +279,9 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
private void insert(Connection connection, Plaintext message) throws SQLException, IOException { private void insert(Connection connection, Plaintext message) throws SQLException, IOException {
try (PreparedStatement ps = connection.prepareStatement( try (PreparedStatement ps = connection.prepareStatement(
"INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, status, initial_hash) " + "INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "status, initial_hash, ttl, retries, next_try) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
Statement.RETURN_GENERATED_KEYS) Statement.RETURN_GENERATED_KEYS)
) { ) {
ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash()); ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash());
@ -279,6 +294,9 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
ps.setLong(8, message.getReceived()); ps.setLong(8, message.getReceived());
ps.setString(9, message.getStatus() == null ? null : message.getStatus().name()); ps.setString(9, message.getStatus() == null ? null : message.getStatus().name());
ps.setBytes(10, message.getInitialHash()); ps.setBytes(10, message.getInitialHash());
ps.setLong(11, message.getTTL());
ps.setInt(12, message.getRetries());
ps.setObject(13, message.getNextTry());
ps.executeUpdate(); ps.executeUpdate();
// get generated id // get generated id
@ -291,13 +309,23 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
private void update(Connection connection, Plaintext message) throws SQLException, IOException { private void update(Connection connection, Plaintext message) throws SQLException, IOException {
try (PreparedStatement ps = connection.prepareStatement( try (PreparedStatement ps = connection.prepareStatement(
"UPDATE Message SET iv=?, sent=?, received=?, status=?, initial_hash=? WHERE id=?")) { "UPDATE Message SET iv=?, type=?, sender=?, recipient=?, data=?, ack_data=?, sent=?, received=?, " +
"status=?, initial_hash=?, ttl=?, retries=?, next_try=? " +
"WHERE id=?")) {
ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash()); ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash());
ps.setLong(2, message.getSent()); ps.setString(2, message.getType().name());
ps.setLong(3, message.getReceived()); ps.setString(3, message.getFrom().getAddress());
ps.setString(4, message.getStatus() == null ? null : message.getStatus().name()); ps.setString(4, message.getTo() == null ? null : message.getTo().getAddress());
ps.setBytes(5, message.getInitialHash()); writeBlob(ps, 5, message);
ps.setLong(6, (Long) message.getId()); ps.setBytes(6, message.getAckData());
ps.setLong(7, message.getSent());
ps.setLong(8, message.getReceived());
ps.setString(9, message.getStatus() == null ? null : message.getStatus().name());
ps.setBytes(10, message.getInitialHash());
ps.setLong(11, message.getTTL());
ps.setInt(12, message.getRetries());
ps.setObject(13, message.getNextTry());
ps.setLong(14, (Long) message.getId());
ps.executeUpdate(); ps.executeUpdate();
} }
} }

View File

@ -1 +1,4 @@
ALTER TABLE Message ADD COLUMN ack_data BINARY(32); ALTER TABLE Message ADD COLUMN ack_data BINARY(32);
ALTER TABLE Message ADD COLUMN ttl BIGINT NOT NULL;
ALTER TABLE Message ADD COLUMN retries INT NOT NULL DEFAULT 0;
ALTER TABLE Message ADD COLUMN next_try BIGINT;

View File

@ -19,6 +19,7 @@ package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.entity.valueobject.Label;
@ -32,6 +33,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
import static ch.dissem.bitmessage.entity.payload.Pubkey.Feature.DOES_ACK;
import static ch.dissem.bitmessage.utils.Singleton.cryptography; import static ch.dissem.bitmessage.utils.Singleton.cryptography;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -59,14 +61,14 @@ public class JdbcMessageRepositoryTest extends TestBase {
.messageRepo(repo) .messageRepo(repo)
); );
BitmessageAddress tmp = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000)); BitmessageAddress tmp = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000, DOES_ACK));
contactA = new BitmessageAddress(tmp.getAddress()); contactA = new BitmessageAddress(tmp.getAddress());
contactA.setPubkey(tmp.getPubkey()); contactA.setPubkey(tmp.getPubkey());
addressRepo.save(contactA); addressRepo.save(contactA);
contactB = new BitmessageAddress("BM-2cTtkBnb4BUYDndTKun6D9PjtueP2h1bQj"); contactB = new BitmessageAddress("BM-2cTtkBnb4BUYDndTKun6D9PjtueP2h1bQj");
addressRepo.save(contactB); addressRepo.save(contactB);
identity = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000)); identity = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000, DOES_ACK));
addressRepo.save(identity); addressRepo.save(identity);
inbox = repo.getLabels(Label.Type.INBOX).get(0); inbox = repo.getLabels(Label.Type.INBOX).get(0);
@ -123,6 +125,18 @@ public class JdbcMessageRepositoryTest extends TestBase {
assertThat(other, is(message)); assertThat(other, is(message));
} }
@Test
public void ensureAckMessageCanBeUpdatedAndRetrieved() {
byte[] initialHash = new byte[64];
Plaintext message = repo.findMessages(contactA).get(0);
message.setInitialHash(initialHash);
ObjectMessage ackMessage = message.getAckMessage();
repo.save(message);
Plaintext other = repo.getMessage(initialHash);
assertThat(other, is(message));
assertThat(other.getAckMessage(), is(ackMessage));
}
@Test @Test
public void testFindMessagesByStatus() throws Exception { public void testFindMessagesByStatus() throws Exception {
List<Plaintext> messages = repo.findMessages(Plaintext.Status.RECEIVED); List<Plaintext> messages = repo.findMessages(Plaintext.Status.RECEIVED);