Some code for sending acknowledgements

- some of it isn't tested
- somehow the ack part seems to be empty, even though the flag should be set
This commit is contained in:
Christian Basler 2015-11-08 19:29:26 +01:00
parent 1f05a52f05
commit 2fae90c433
9 changed files with 181 additions and 46 deletions

View File

@ -156,6 +156,7 @@ public class BitmessageContext {
} else {
LOG.info("Sending message.");
msg.setStatus(DOING_PROOF_OF_WORK);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX));
ctx.getMessageRepository().save(msg);
ctx.send(
from,
@ -165,9 +166,6 @@ public class BitmessageContext {
ctx.getNonceTrialsPerByte(to),
ctx.getExtraBytes(to)
);
msg.setStatus(SENT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
ctx.getMessageRepository().save(msg);
}
}
});

View File

@ -118,15 +118,24 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
for (BitmessageAddress identity : ctx.getAddressRepo().getIdentities()) {
try {
msg.decrypt(identity.getPrivateKey().getPrivateEncryptionKey());
msg.getPlaintext().setTo(identity);
if (!object.isSignatureValid(msg.getPlaintext().getFrom().getPubkey())) {
Plaintext plaintext = msg.getPlaintext();
plaintext.setTo(identity);
if (!object.isSignatureValid(plaintext.getFrom().getPubkey())) {
LOG.warn("Msg with IV " + object.getInventoryVector() + " was successfully decrypted, but signature check failed. Ignoring.");
} else {
msg.getPlaintext().setStatus(RECEIVED);
msg.getPlaintext().addLabels(ctx.getMessageRepository().getLabels(Label.Type.INBOX, Label.Type.UNREAD));
msg.getPlaintext().setInventoryVector(object.getInventoryVector());
ctx.getMessageRepository().save(msg.getPlaintext());
listener.receive(msg.getPlaintext());
plaintext.setStatus(RECEIVED);
plaintext.addLabels(ctx.getMessageRepository().getLabels(Label.Type.INBOX, Label.Type.UNREAD));
plaintext.setInventoryVector(object.getInventoryVector());
ctx.getMessageRepository().save(plaintext);
listener.receive(plaintext);
if (identity.has(Pubkey.Feature.DOES_ACK)) {
ObjectMessage ack = plaintext.getAckMessage();
if (ack != null) {
ctx.getInventory().storeObject(ack);
ctx.getNetworkHandler().offer(ack.getInventoryVector());
}
}
}
break;
} catch (DecryptionFailedException ignore) {

View File

@ -17,9 +17,8 @@
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.payload.Broadcast;
import ch.dissem.bitmessage.entity.payload.GetPubkey;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.payload.*;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.ports.*;
import ch.dissem.bitmessage.utils.Singleton;
import ch.dissem.bitmessage.utils.UnixTime;
@ -29,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.TreeSet;
import static ch.dissem.bitmessage.entity.Plaintext.Status.SENT;
import static ch.dissem.bitmessage.utils.UnixTime.DAY;
/**
@ -162,39 +162,35 @@ public class InternalContext {
public void send(final BitmessageAddress from, BitmessageAddress to, final ObjectPayload payload,
final long timeToLive, final long nonceTrialsPerByte, final long extraBytes) {
try {
if (to == null) to = from;
final BitmessageAddress recipient = (to != null ? to : from);
long expires = UnixTime.now(+timeToLive);
LOG.info("Expires at " + expires);
final ObjectMessage object = new ObjectMessage.Builder()
.stream(to.getStream())
.stream(recipient.getStream())
.expiresTime(expires)
.payload(payload)
.build();
if (object.isSigned()) {
object.sign(from.getPrivateKey());
}
if (payload instanceof Broadcast) {
((Broadcast) payload).encrypt();
} else if (payload instanceof Encrypted) {
object.encrypt(to.getPubkey());
if (payload instanceof Msg && recipient.has(Pubkey.Feature.DOES_ACK)) {
ObjectMessage ackMessage = ((Msg) payload).getPlaintext().getAckMessage();
messageCallback.proofOfWorkStarted(payload);
security.doProofOfWork(ackMessage, networkNonceTrialsPerByte, networkExtraBytes, new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
object.encrypt(recipient.getPubkey());
security.doProofOfWork(object, nonceTrialsPerByte, extraBytes, new ProofOfWorkCallback(object, payload));
}
});
} else {
if (payload instanceof Broadcast) {
((Broadcast) payload).encrypt();
} else if (payload instanceof Encrypted) {
object.encrypt(recipient.getPubkey());
}
security.doProofOfWork(object, nonceTrialsPerByte, extraBytes, new ProofOfWorkCallback(object, payload));
}
messageCallback.proofOfWorkStarted(payload);
security.doProofOfWork(object, nonceTrialsPerByte, extraBytes,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
object.setNonce(nonce);
messageCallback.proofOfWorkCompleted(payload);
if (payload instanceof PlaintextHolder) {
Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext();
plaintext.setInventoryVector(object.getInventoryVector());
messageRepository.save(plaintext);
}
inventory.storeObject(object);
networkHandler.offer(object.getInventoryVector());
messageCallback.messageOffered(payload, object.getInventoryVector());
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -266,4 +262,31 @@ public class InternalContext {
public interface ContextHolder {
void setContext(InternalContext context);
}
private class ProofOfWorkCallback implements ProofOfWorkEngine.Callback {
private final ObjectMessage object;
private final ObjectPayload payload;
private ProofOfWorkCallback(ObjectMessage object, ObjectPayload payload) {
this.object = object;
this.payload = payload;
}
@Override
public void onNonceCalculated(byte[] nonce) {
object.setNonce(nonce);
messageCallback.proofOfWorkCompleted(payload);
if (payload instanceof PlaintextHolder) {
Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext();
plaintext.setInventoryVector(object.getInventoryVector());
plaintext.setStatus(SENT);
plaintext.removeLabel(Label.Type.OUTBOX);
plaintext.addLabels(messageRepository.getLabels(Label.Type.SENT));
messageRepository.save(plaintext);
}
inventory.storeObject(object);
networkHandler.offer(object.getInventoryVector());
messageCallback.messageOffered(payload, object.getInventoryVector());
}
}
}

View File

@ -17,6 +17,7 @@
package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.payload.Pubkey;
import ch.dissem.bitmessage.entity.payload.Pubkey.Feature;
import ch.dissem.bitmessage.entity.payload.V4Pubkey;
import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
import ch.dissem.bitmessage.utils.AccessCounter;
@ -220,4 +221,11 @@ public class BitmessageAddress implements Serializable {
public void setSubscribed(boolean subscribed) {
this.subscribed = subscribed;
}
public boolean has(Feature feature) {
if (pubkey == null || feature == null) {
return false;
}
return feature.isActive(pubkey.getBehaviorBitfield());
}
}

View File

@ -16,6 +16,7 @@
package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.payload.Pubkey;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.factory.Factory;
@ -26,6 +27,8 @@ import ch.dissem.bitmessage.utils.UnixTime;
import java.io.*;
import java.util.*;
import static ch.dissem.bitmessage.utils.Singleton.security;
/**
* The unencrypted message to be sent by 'msg' or 'broadcast'.
*/
@ -34,7 +37,8 @@ public class Plaintext implements Streamable {
private final BitmessageAddress from;
private final long encoding;
private final byte[] message;
private final byte[] ack;
private final byte[] ackData;
private ObjectMessage ackMessage;
private Object id;
private InventoryVector inventoryVector;
private BitmessageAddress to;
@ -53,7 +57,13 @@ public class Plaintext implements Streamable {
to = builder.to;
encoding = builder.encoding;
message = builder.message;
ack = builder.ack;
ackData = builder.ackData;
if (builder.ackMessage != null) {
ackMessage = Factory.getObjectMessage(
3,
new ByteArrayInputStream(builder.ackMessage),
builder.ackMessage.length);
}
signature = builder.signature;
status = builder.status;
sent = builder.sent;
@ -159,8 +169,15 @@ public class Plaintext implements Streamable {
Encode.varInt(message.length, out);
out.write(message);
if (type == Type.MSG) {
Encode.varInt(ack.length, out);
out.write(ack);
if (to.has(Pubkey.Feature.DOES_ACK)) {
ByteArrayOutputStream ack = new ByteArrayOutputStream();
ackMessage.write(ack);
byte[] data = ack.toByteArray();
Encode.varInt(data.length, out);
out.write(data);
} else {
Encode.varInt(0, out);
}
}
if (includeSignature) {
if (signature == null) {
@ -234,7 +251,7 @@ public class Plaintext implements Streamable {
return Objects.equals(encoding, plaintext.encoding) &&
Objects.equals(from, plaintext.from) &&
Arrays.equals(message, plaintext.message) &&
Arrays.equals(ack, plaintext.ack) &&
Arrays.equals(ackData, plaintext.ackData) &&
Arrays.equals(to.getRipe(), plaintext.to.getRipe()) &&
Arrays.equals(signature, plaintext.signature) &&
Objects.equals(status, plaintext.status) &&
@ -245,21 +262,46 @@ public class Plaintext implements Streamable {
@Override
public int hashCode() {
return Objects.hash(from, encoding, message, ack, to, signature, status, sent, received, labels);
return Objects.hash(from, encoding, message, ackData, to, signature, status, sent, received, labels);
}
public void addLabels(Label... labels) {
if (labels != null) {
Collections.addAll(this.labels, labels);
for (Label label : labels) {
this.labels.add(label);
}
}
}
public void addLabels(Collection<Label> labels) {
if (labels != null) {
this.labels.addAll(labels);
for (Label label : labels) {
this.labels.add(label);
}
}
}
public void removeLabel(Label.Type type) {
Iterator<Label> iterator = labels.iterator();
while (iterator.hasNext()) {
Label label = iterator.next();
if (label.getType() == type) {
iterator.remove();
}
}
}
public byte[] getAckData() {
return ackData;
}
public ObjectMessage getAckMessage() {
if (ackMessage == null) {
ackMessage = Factory.createAck(this);
}
return ackMessage;
}
public enum Encoding {
IGNORE(0), TRIVIAL(1), SIMPLE(2);
@ -304,7 +346,8 @@ public class Plaintext implements Streamable {
private byte[] destinationRipe;
private long encoding;
private byte[] message = new byte[0];
private byte[] ack = new byte[0];
private byte[] ackData;
private byte[] ackMessage;
private byte[] signature;
private long sent;
private long received;
@ -405,7 +448,13 @@ public class Plaintext implements Streamable {
public Builder ack(byte[] ack) {
if (type != Type.MSG && ack != null) throw new IllegalArgumentException("ack only allowed for msg");
this.ack = ack;
this.ackMessage = ack;
return this;
}
public Builder ackData(byte[] ackData) {
if (type != Type.MSG && ackData != null) throw new IllegalArgumentException("ack only allowed for msg");
this.ackData = ackData;
return this;
}
@ -449,6 +498,9 @@ public class Plaintext implements Streamable {
if (to == null && type != Type.BROADCAST) {
to = new BitmessageAddress(0, 0, destinationRipe);
}
if (type == Type.MSG && ackMessage == null && ackData == null) {
ackData = security().randomBytes(32);
}
return new Plaintext(this);
}
}

View File

@ -0,0 +1,33 @@
package ch.dissem.bitmessage.entity.payload;
import java.io.IOException;
import java.io.OutputStream;
/**
* Created by chrigu on 06.11.15.
*/
public class Ack extends ObjectPayload {
private final long stream;
private final byte[] data;
public Ack(long version, long stream, byte[] data) {
super(version);
this.stream = stream;
this.data = data;
}
@Override
public ObjectType getType() {
return ObjectType.MSG;
}
@Override
public long getStream() {
return stream;
}
@Override
public void write(OutputStream out) throws IOException {
out.write(data);
}
}

View File

@ -103,5 +103,9 @@ public abstract class Pubkey extends ObjectPayload {
}
return features.toArray(new Feature[features.size()]);
}
public boolean isActive(int bitfield) {
return (bitfield & bit) != 0;
}
}
}

View File

@ -79,6 +79,7 @@ public class Label implements Serializable {
INBOX,
BROADCAST,
DRAFT,
OUTBOX,
SENT,
UNREAD,
TRASH

View File

@ -203,4 +203,11 @@ public class Factory {
return new V5Broadcast(sendingAddress, plaintext);
}
}
public static ObjectMessage createAck(Plaintext plaintext) {
if (plaintext == null || plaintext.getAckData() == null)
return null;
Ack ack = new Ack(3, plaintext.getFrom().getStream(), plaintext.getAckData());
return new ObjectMessage.Builder().payload(ack).build();
}
}