Merge branch 'feature/ACK' into develop
This commit is contained in:
commit
3d2cea91ce
1
.gitignore
vendored
1
.gitignore
vendored
@ -5,6 +5,7 @@
|
||||
### Gradle ###
|
||||
.gradle
|
||||
build/
|
||||
classes/
|
||||
|
||||
# Ignore Gradle GUI config
|
||||
gradle-app.setting
|
||||
|
51
README.md
51
README.md
@ -5,10 +5,11 @@ Jabit
|
||||
[](https://raw.githubusercontent.com/Dissem/Jabit/master/LICENSE)
|
||||
[](https://kiwiirc.com/client/irc.freenode.net/#jabit)
|
||||
|
||||
A Java implementation for the Bitmessage protocol. To build, use command `gradle build` or `./gradlew build`.
|
||||
A Java implementation for the Bitmessage protocol. To build, use command `./gradlew build`.
|
||||
|
||||
Please note that it still has its limitations, but the API should now be stable. Jabit uses Semantic Versioning, meaning
|
||||
as long as the major version doesn't change, nothing should break if you update.
|
||||
Please note that it still has its limitations, but the API should now be stable. Jabit uses Semantic Versioning, meaning as long as the major version doesn't change, nothing should break if you update.
|
||||
|
||||
Be aware though that this doesn't necessarily applies for SNAPSHOT builds and the development branch, notably when it comes to database updates. _In other words, they may break your installation!_
|
||||
|
||||
#### Master
|
||||
[](https://travis-ci.org/Dissem/Jabit)
|
||||
@ -23,9 +24,7 @@ as long as the major version doesn't change, nothing should break if you update.
|
||||
Security
|
||||
--------
|
||||
|
||||
There are most probably some security issues, me programming this thing all by myself. Jabit doesn't do anything against
|
||||
timing attacks yet, for example. Please feel free to use the library, report bugs and maybe even help out. I hope the
|
||||
code is easy to understand and work with.
|
||||
There are most probably some security issues, me programming this thing all by myself. Jabit doesn't do anything against timing attacks yet, for example. Please feel free to use the library, report bugs and maybe even help out. I hope the code is easy to understand and work with.
|
||||
|
||||
Project Status
|
||||
--------------
|
||||
@ -74,17 +73,22 @@ BitmessageContext ctx = new BitmessageContext.Builder()
|
||||
.nodeRegistry(new MemoryNodeRegistry())
|
||||
.networkHandler(new NetworkNode())
|
||||
.cryptography(new BouncyCryptography())
|
||||
.listener(System.out::println)
|
||||
.build();
|
||||
```
|
||||
This creates a simple context using a H2 database that will be created in the user's home directory. Next you'll need to
|
||||
start the context and decide what happens if a message arrives:
|
||||
This creates a simple context using a H2 database that will be created in the user's home directory. In the listener you decide what happens when a message arrives. If you can't use lambdas, you may instead write
|
||||
```Java
|
||||
ctx.startup(new BitmessageContext.Listener() {
|
||||
@Override
|
||||
public void receive(Plaintext plaintext) {
|
||||
// TODO: Notify the user
|
||||
}
|
||||
});
|
||||
.listener(new BitmessageContext.Listener() {
|
||||
@Override
|
||||
public void receive(Plaintext plaintext) {
|
||||
// TODO: Notify the user
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
Next you'll need to start the context:
|
||||
```Java
|
||||
ctx.startup()
|
||||
```
|
||||
Then you might want to create an identity
|
||||
```Java
|
||||
@ -100,3 +104,22 @@ to which you can send some messages
|
||||
```Java
|
||||
ctx.send(identity, contact, "Test", "Hello Chris, this is a message.");
|
||||
```
|
||||
|
||||
### Housekeeping
|
||||
|
||||
As Bitmessage stores all currently valid messages, we'll need to delete expired objects from time to time:
|
||||
```Java
|
||||
ctx.cleanup();
|
||||
```
|
||||
If the client runs all the time, it might be a good idea to do this daily or at least weekly. Otherwise, you might just want to clean up on shutdown.
|
||||
|
||||
Also, if some messages weren't acknowledged when it expired, they can be resent:
|
||||
```Java
|
||||
ctx.resendUnacknowledgedMessages();
|
||||
```
|
||||
This could be triggered periodically, or manually by the user. Please be aware that _if_ there is a message to resend, proof of work needs to be calculated, so to not annoy your users you might not want to trigger it on shutdown. As the client might have been offline for some time, it might as well be wise to wait until it caught up downloading new messages before resending those messages, after all they might be acknowledged by now.
|
||||
|
||||
There probably won't happen extremely bad things if you don't - at least not more than otherwise - but you can properly shutdown the network connection by calling
|
||||
```Java
|
||||
ctx.shutdown();
|
||||
```
|
||||
|
@ -26,6 +26,7 @@ artifacts {
|
||||
dependencies {
|
||||
compile 'org.slf4j:slf4j-api:1.7.12'
|
||||
testCompile 'junit:junit:4.11'
|
||||
testCompile 'org.hamcrest:hamcrest-library:1.3'
|
||||
testCompile 'org.mockito:mockito-core:1.10.19'
|
||||
testCompile project(':cryptography-bc')
|
||||
}
|
||||
|
@ -1,47 +0,0 @@
|
||||
/*
|
||||
* Copyright 2016 Christian Basler
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ch.dissem.bitmessage;
|
||||
|
||||
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
|
||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
|
||||
|
||||
/**
|
||||
* Default implementation that doesn't do anything.
|
||||
*
|
||||
* @author Christian Basler
|
||||
*/
|
||||
public class BaseMessageCallback implements MessageCallback {
|
||||
@Override
|
||||
public void proofOfWorkStarted(ObjectPayload message) {
|
||||
// No op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proofOfWorkCompleted(ObjectPayload message) {
|
||||
// No op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageOffered(ObjectPayload message, InventoryVector iv) {
|
||||
// No op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageAcknowledged(InventoryVector iv) {
|
||||
// No op
|
||||
}
|
||||
}
|
@ -18,13 +18,9 @@ package ch.dissem.bitmessage;
|
||||
|
||||
import ch.dissem.bitmessage.entity.*;
|
||||
import ch.dissem.bitmessage.entity.payload.Broadcast;
|
||||
import ch.dissem.bitmessage.entity.payload.Msg;
|
||||
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
|
||||
import ch.dissem.bitmessage.entity.payload.ObjectType;
|
||||
import ch.dissem.bitmessage.entity.payload.Pubkey.Feature;
|
||||
import ch.dissem.bitmessage.entity.valueobject.Label;
|
||||
import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
|
||||
import ch.dissem.bitmessage.exception.ApplicationException;
|
||||
import ch.dissem.bitmessage.exception.DecryptionFailedException;
|
||||
import ch.dissem.bitmessage.factory.Factory;
|
||||
import ch.dissem.bitmessage.ports.*;
|
||||
@ -35,15 +31,12 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.List;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
|
||||
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
|
||||
import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
|
||||
import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST;
|
||||
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
|
||||
import static ch.dissem.bitmessage.utils.UnixTime.*;
|
||||
@ -76,16 +69,10 @@ public class BitmessageContext {
|
||||
private BitmessageContext(Builder builder) {
|
||||
ctx = new InternalContext(builder);
|
||||
labeler = builder.labeler;
|
||||
ctx.getProofOfWorkService().doMissingProofOfWork(30_000); // TODO: this should be configurable
|
||||
|
||||
networkListener = new DefaultMessageListener(ctx, labeler, builder.listener);
|
||||
|
||||
sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation;
|
||||
|
||||
new Timer().schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
ctx.getProofOfWorkService().doMissingProofOfWork();
|
||||
}
|
||||
}, 30_000); // After 30 seconds
|
||||
}
|
||||
|
||||
public AddressRepository addresses() {
|
||||
@ -157,7 +144,6 @@ public class BitmessageContext {
|
||||
.from(from)
|
||||
.to(to)
|
||||
.message(subject, message)
|
||||
.labels(messages().getLabels(Label.Type.SENT))
|
||||
.build();
|
||||
send(msg);
|
||||
}
|
||||
@ -166,6 +152,7 @@ public class BitmessageContext {
|
||||
if (msg.getFrom() == null || msg.getFrom().getPrivateKey() == null) {
|
||||
throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key.");
|
||||
}
|
||||
labeler().markAsSending(msg);
|
||||
BitmessageAddress to = msg.getTo();
|
||||
if (to != null) {
|
||||
if (to.getPubkey() == null) {
|
||||
@ -173,35 +160,22 @@ public class BitmessageContext {
|
||||
ctx.requestPubkey(to);
|
||||
}
|
||||
if (to.getPubkey() == null) {
|
||||
msg.setStatus(PUBKEY_REQUESTED);
|
||||
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX));
|
||||
ctx.getMessageRepository().save(msg);
|
||||
}
|
||||
}
|
||||
if (to == null || to.getPubkey() != null) {
|
||||
LOG.info("Sending message.");
|
||||
msg.setStatus(DOING_PROOF_OF_WORK);
|
||||
ctx.getMessageRepository().save(msg);
|
||||
ctx.send(
|
||||
msg.getFrom(),
|
||||
to,
|
||||
wrapInObjectPayload(msg),
|
||||
TTL.msg()
|
||||
);
|
||||
msg.setStatus(SENT);
|
||||
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
|
||||
ctx.getMessageRepository().save(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectPayload wrapInObjectPayload(Plaintext msg) {
|
||||
switch (msg.getType()) {
|
||||
case MSG:
|
||||
return new Msg(msg);
|
||||
case BROADCAST:
|
||||
return Factory.getBroadcast(msg);
|
||||
default:
|
||||
throw new ApplicationException("Unknown message type " + msg.getType());
|
||||
if (msg.getType() == MSG) {
|
||||
ctx.send(msg);
|
||||
} else {
|
||||
ctx.send(
|
||||
msg.getFrom(),
|
||||
to,
|
||||
Factory.getBroadcast(msg),
|
||||
msg.getTTL()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,10 +221,32 @@ public class BitmessageContext {
|
||||
return ctx.getNetworkHandler().send(server, port, request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes expired objects from the inventory. You should call this method regularly,
|
||||
* e.g. daily and on each shutdown.
|
||||
*/
|
||||
public void cleanup() {
|
||||
ctx.getInventory().cleanup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends messages again whose time to live expired without being acknowledged. (And whose
|
||||
* recipient is expected to send acknowledgements.
|
||||
* <p>
|
||||
* You should call this method regularly, but be aware of the following:
|
||||
* <ul>
|
||||
* <li>As messages might be sent, POW will be done. It is therefore not advised to
|
||||
* call it on shutdown.</li>
|
||||
* <li>It shouldn't be called right after startup, as it's possible the missing
|
||||
* acknowledgement was sent while the client was offline.</li>
|
||||
* <li>Other than that, the call isn't expensive as long as there is no message
|
||||
* to send, so it might be a good idea to just call it every few minutes.</li>
|
||||
* </ul>
|
||||
*/
|
||||
public void resendUnacknowledgedMessages() {
|
||||
ctx.resendUnacknowledged();
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return ctx.getNetworkHandler().isRunning();
|
||||
}
|
||||
@ -285,7 +281,8 @@ public class BitmessageContext {
|
||||
|
||||
public Property status() {
|
||||
return new Property("status", null,
|
||||
ctx.getNetworkHandler().getNetworkStatus()
|
||||
ctx.getNetworkHandler().getNetworkStatus(),
|
||||
new Property("unacknowledged", ctx.getMessageRepository().findMessagesToResend().size())
|
||||
);
|
||||
}
|
||||
|
||||
@ -311,7 +308,6 @@ public class BitmessageContext {
|
||||
ProofOfWorkRepository proofOfWorkRepository;
|
||||
ProofOfWorkEngine proofOfWorkEngine;
|
||||
Cryptography cryptography;
|
||||
MessageCallback messageCallback;
|
||||
CustomCommandHandler customCommandHandler;
|
||||
Labeler labeler;
|
||||
Listener listener;
|
||||
@ -359,11 +355,6 @@ public class BitmessageContext {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder messageCallback(MessageCallback callback) {
|
||||
this.messageCallback = callback;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder customCommandHandler(CustomCommandHandler handler) {
|
||||
this.customCommandHandler = handler;
|
||||
return this;
|
||||
@ -430,9 +421,6 @@ public class BitmessageContext {
|
||||
if (proofOfWorkEngine == null) {
|
||||
proofOfWorkEngine = new MultiThreadedPOWEngine();
|
||||
}
|
||||
if (messageCallback == null) {
|
||||
messageCallback = new BaseMessageCallback();
|
||||
}
|
||||
if (labeler == null) {
|
||||
labeler = new DefaultLabeler();
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
|
||||
import ch.dissem.bitmessage.exception.DecryptionFailedException;
|
||||
import ch.dissem.bitmessage.ports.Labeler;
|
||||
import ch.dissem.bitmessage.ports.NetworkHandler;
|
||||
import ch.dissem.bitmessage.utils.TTL;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -47,9 +48,15 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
public void receive(ObjectMessage object) throws IOException {
|
||||
ObjectPayload payload = object.getPayload();
|
||||
if (payload.getType() == null) return;
|
||||
if (payload.getType() == null) {
|
||||
if (payload instanceof GenericPayload) {
|
||||
receive((GenericPayload) payload);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
switch (payload.getType()) {
|
||||
case GET_PUBKEY: {
|
||||
@ -109,16 +116,9 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
|
||||
List<Plaintext> messages = ctx.getMessageRepository().findMessages(PUBKEY_REQUESTED, address);
|
||||
LOG.info("Sending " + messages.size() + " messages for contact " + address);
|
||||
for (Plaintext msg : messages) {
|
||||
msg.setStatus(DOING_PROOF_OF_WORK);
|
||||
ctx.getMessageRepository().save(msg);
|
||||
ctx.send(
|
||||
msg.getFrom(),
|
||||
msg.getTo(),
|
||||
new Msg(msg),
|
||||
+2 * DAY
|
||||
);
|
||||
msg.setStatus(SENT);
|
||||
ctx.getLabeler().markAsSending(msg);
|
||||
ctx.getMessageRepository().save(msg);
|
||||
ctx.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,11 +126,12 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
|
||||
for (BitmessageAddress identity : ctx.getAddressRepository().getIdentities()) {
|
||||
try {
|
||||
msg.decrypt(identity.getPrivateKey().getPrivateEncryptionKey());
|
||||
msg.getPlaintext().setTo(identity);
|
||||
if (!object.isSignatureValid(msg.getPlaintext().getFrom().getPubkey())) {
|
||||
Plaintext plaintext = msg.getPlaintext();
|
||||
plaintext.setTo(identity);
|
||||
if (!object.isSignatureValid(plaintext.getFrom().getPubkey())) {
|
||||
LOG.warn("Msg with IV " + object.getInventoryVector() + " was successfully decrypted, but signature check failed. Ignoring.");
|
||||
} else {
|
||||
receive(object.getInventoryVector(), msg.getPlaintext());
|
||||
receive(object.getInventoryVector(), plaintext);
|
||||
}
|
||||
break;
|
||||
} catch (DecryptionFailedException ignore) {
|
||||
@ -138,6 +139,16 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
|
||||
}
|
||||
}
|
||||
|
||||
protected void receive(GenericPayload ack) {
|
||||
if (ack.getData().length == Msg.ACK_LENGTH) {
|
||||
Plaintext msg = ctx.getMessageRepository().getMessageForAck(ack.getData());
|
||||
if (msg != null) {
|
||||
ctx.getLabeler().markAsAcknowledged(msg);
|
||||
ctx.getMessageRepository().save(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void receive(ObjectMessage object, Broadcast broadcast) throws IOException {
|
||||
byte[] tag = broadcast instanceof V5Broadcast ? ((V5Broadcast) broadcast).getTag() : null;
|
||||
for (BitmessageAddress subscription : ctx.getAddressRepository().getSubscriptions(broadcast.getVersion())) {
|
||||
@ -157,11 +168,18 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
|
||||
}
|
||||
|
||||
protected void receive(InventoryVector iv, Plaintext msg) {
|
||||
msg.setStatus(RECEIVED);
|
||||
msg.setInventoryVector(iv);
|
||||
labeler.setLabels(msg);
|
||||
ctx.getMessageRepository().save(msg);
|
||||
listener.receive(msg);
|
||||
updatePubkey(msg.getFrom(), msg.getFrom().getPubkey());
|
||||
|
||||
if (msg.getType() == Plaintext.Type.MSG && msg.getTo().has(Pubkey.Feature.DOES_ACK)) {
|
||||
ObjectMessage ack = msg.getAckMessage();
|
||||
if (ack != null) {
|
||||
ctx.getInventory().storeObject(ack);
|
||||
ctx.getNetworkHandler().offer(ack.getInventoryVector());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,9 +16,7 @@
|
||||
|
||||
package ch.dissem.bitmessage;
|
||||
|
||||
import ch.dissem.bitmessage.entity.BitmessageAddress;
|
||||
import ch.dissem.bitmessage.entity.Encrypted;
|
||||
import ch.dissem.bitmessage.entity.ObjectMessage;
|
||||
import ch.dissem.bitmessage.entity.*;
|
||||
import ch.dissem.bitmessage.entity.payload.*;
|
||||
import ch.dissem.bitmessage.exception.ApplicationException;
|
||||
import ch.dissem.bitmessage.ports.*;
|
||||
@ -30,6 +28,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
@ -54,9 +53,9 @@ public class InternalContext {
|
||||
private final MessageRepository messageRepository;
|
||||
private final ProofOfWorkRepository proofOfWorkRepository;
|
||||
private final ProofOfWorkEngine proofOfWorkEngine;
|
||||
private final MessageCallback messageCallback;
|
||||
private final CustomCommandHandler customCommandHandler;
|
||||
private final ProofOfWorkService proofOfWorkService;
|
||||
private final Labeler labeler;
|
||||
|
||||
private final TreeSet<Long> streams = new TreeSet<>();
|
||||
private final int port;
|
||||
@ -75,11 +74,11 @@ public class InternalContext {
|
||||
this.proofOfWorkService = new ProofOfWorkService();
|
||||
this.proofOfWorkEngine = builder.proofOfWorkEngine;
|
||||
this.clientNonce = cryptography.randomNonce();
|
||||
this.messageCallback = builder.messageCallback;
|
||||
this.customCommandHandler = builder.customCommandHandler;
|
||||
this.port = builder.port;
|
||||
this.connectionLimit = builder.connectionLimit;
|
||||
this.connectionTTL = builder.connectionTTL;
|
||||
this.labeler = builder.labeler;
|
||||
|
||||
Singleton.initialize(cryptography);
|
||||
|
||||
@ -95,8 +94,7 @@ public class InternalContext {
|
||||
}
|
||||
|
||||
init(cryptography, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository,
|
||||
proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine,
|
||||
messageCallback, customCommandHandler, builder.labeler);
|
||||
proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler);
|
||||
for (BitmessageAddress identity : addressRepository.getIdentities()) {
|
||||
streams.add(identity.getStream());
|
||||
}
|
||||
@ -146,6 +144,10 @@ public class InternalContext {
|
||||
return proofOfWorkService;
|
||||
}
|
||||
|
||||
public Labeler getLabeler() {
|
||||
return labeler;
|
||||
}
|
||||
|
||||
public long[] getStreams() {
|
||||
long[] result = new long[streams.size()];
|
||||
int i = 0;
|
||||
@ -159,14 +161,24 @@ public class InternalContext {
|
||||
return port;
|
||||
}
|
||||
|
||||
public void send(final Plaintext plaintext) {
|
||||
if (plaintext.getAckMessage() != null) {
|
||||
long expires = UnixTime.now(+plaintext.getTTL());
|
||||
LOG.info("Expires at " + expires);
|
||||
proofOfWorkService.doProofOfWorkWithAck(plaintext, expires);
|
||||
} else {
|
||||
send(plaintext.getFrom(), plaintext.getTo(), new Msg(plaintext), plaintext.getTTL());
|
||||
}
|
||||
}
|
||||
|
||||
public void send(final BitmessageAddress from, BitmessageAddress to, final ObjectPayload payload,
|
||||
final long timeToLive) {
|
||||
try {
|
||||
if (to == null) to = from;
|
||||
final BitmessageAddress recipient = (to != null ? to : from);
|
||||
long expires = UnixTime.now(+timeToLive);
|
||||
LOG.info("Expires at " + expires);
|
||||
final ObjectMessage object = new ObjectMessage.Builder()
|
||||
.stream(to.getStream())
|
||||
.stream(recipient.getStream())
|
||||
.expiresTime(expires)
|
||||
.payload(payload)
|
||||
.build();
|
||||
@ -176,9 +188,8 @@ public class InternalContext {
|
||||
if (payload instanceof Broadcast) {
|
||||
((Broadcast) payload).encrypt();
|
||||
} else if (payload instanceof Encrypted) {
|
||||
object.encrypt(to.getPubkey());
|
||||
object.encrypt(recipient.getPubkey());
|
||||
}
|
||||
messageCallback.proofOfWorkStarted(payload);
|
||||
proofOfWorkService.doProofOfWork(to, object);
|
||||
} catch (IOException e) {
|
||||
throw new ApplicationException(e);
|
||||
@ -196,7 +207,6 @@ public class InternalContext {
|
||||
.build();
|
||||
response.sign(identity.getPrivateKey());
|
||||
response.encrypt(cryptography.createPublicKey(identity.getPublicDecryptionKey()));
|
||||
messageCallback.proofOfWorkStarted(identity.getPubkey());
|
||||
// TODO: remember that the pubkey is just about to be sent, and on which stream!
|
||||
proofOfWorkService.doProofOfWork(response);
|
||||
} catch (IOException e) {
|
||||
@ -233,7 +243,6 @@ public class InternalContext {
|
||||
.expiresTime(expires)
|
||||
.payload(new GetPubkey(contact))
|
||||
.build();
|
||||
messageCallback.proofOfWorkStarted(request.getPayload());
|
||||
proofOfWorkService.doProofOfWork(request);
|
||||
}
|
||||
|
||||
@ -271,6 +280,14 @@ public class InternalContext {
|
||||
}
|
||||
}
|
||||
|
||||
public void resendUnacknowledged() {
|
||||
List<Plaintext> messages = messageRepository.findMessagesToResend();
|
||||
for (Plaintext message : messages) {
|
||||
send(message);
|
||||
messageRepository.save(message);
|
||||
}
|
||||
}
|
||||
|
||||
public long getClientNonce() {
|
||||
return clientNonce;
|
||||
}
|
||||
|
@ -1,52 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Christian Basler
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ch.dissem.bitmessage;
|
||||
|
||||
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
|
||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
|
||||
|
||||
/**
|
||||
* Callback for message sending events, mostly so the user can be notified when POW is done.
|
||||
*/
|
||||
public interface MessageCallback {
|
||||
/**
|
||||
* Called before calculation of proof of work begins.
|
||||
*/
|
||||
void proofOfWorkStarted(ObjectPayload message);
|
||||
|
||||
/**
|
||||
* Called after calculation of proof of work finished.
|
||||
*/
|
||||
void proofOfWorkCompleted(ObjectPayload message);
|
||||
|
||||
/**
|
||||
* Called once the message is offered to the network. Please note that this doesn't mean the message was sent,
|
||||
* if the client is not connected to the network it's just stored in the inventory.
|
||||
* <p>
|
||||
* Also, please note that this is where the original payload as well as the {@link InventoryVector} of the sent
|
||||
* message is available. If the callback needs the IV for some reason, it should be retrieved here. (Plaintext
|
||||
* and Broadcast messages will have their IV property set automatically though.)
|
||||
* </p>
|
||||
*/
|
||||
void messageOffered(ObjectPayload message, InventoryVector iv);
|
||||
|
||||
/**
|
||||
* This isn't called yet, as ACK messages aren't being processed yet. Also, this is only relevant for Plaintext
|
||||
* messages.
|
||||
*/
|
||||
void messageAcknowledged(InventoryVector iv);
|
||||
}
|
@ -1,22 +1,23 @@
|
||||
package ch.dissem.bitmessage;
|
||||
|
||||
import ch.dissem.bitmessage.entity.BitmessageAddress;
|
||||
import ch.dissem.bitmessage.entity.ObjectMessage;
|
||||
import ch.dissem.bitmessage.entity.Plaintext;
|
||||
import ch.dissem.bitmessage.entity.PlaintextHolder;
|
||||
import ch.dissem.bitmessage.entity.*;
|
||||
import ch.dissem.bitmessage.entity.payload.Msg;
|
||||
import ch.dissem.bitmessage.entity.payload.Pubkey;
|
||||
import ch.dissem.bitmessage.ports.Cryptography;
|
||||
import ch.dissem.bitmessage.ports.MessageRepository;
|
||||
import ch.dissem.bitmessage.ports.ProofOfWorkEngine;
|
||||
import ch.dissem.bitmessage.ports.ProofOfWorkRepository;
|
||||
import ch.dissem.bitmessage.ports.ProofOfWorkRepository.Item;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
|
||||
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
|
||||
import static ch.dissem.bitmessage.utils.Singleton.security;
|
||||
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
|
||||
|
||||
/**
|
||||
* @author Christian Basler
|
||||
@ -29,15 +30,22 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
|
||||
private ProofOfWorkRepository powRepo;
|
||||
private MessageRepository messageRepo;
|
||||
|
||||
public void doMissingProofOfWork() {
|
||||
List<byte[]> items = powRepo.getItems();
|
||||
public void doMissingProofOfWork(long delayInMilliseconds) {
|
||||
final List<byte[]> items = powRepo.getItems();
|
||||
if (items.isEmpty()) return;
|
||||
|
||||
LOG.info("Doing POW for " + items.size() + " tasks.");
|
||||
for (byte[] initialHash : items) {
|
||||
ProofOfWorkRepository.Item item = powRepo.getItem(initialHash);
|
||||
cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes, this);
|
||||
}
|
||||
// Wait for 30 seconds, to let the application start up before putting heavy load on the CPU
|
||||
new Timer().schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Doing POW for " + items.size() + " tasks.");
|
||||
for (byte[] initialHash : items) {
|
||||
Item item = powRepo.getItem(initialHash);
|
||||
cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes,
|
||||
ProofOfWorkService.this);
|
||||
}
|
||||
}
|
||||
}, delayInMilliseconds);
|
||||
}
|
||||
|
||||
public void doProofOfWork(ObjectMessage object) {
|
||||
@ -59,24 +67,52 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
|
||||
cryptography.doProofOfWork(object, nonceTrialsPerByte, extraBytes, this);
|
||||
}
|
||||
|
||||
public void doProofOfWorkWithAck(Plaintext plaintext, long expirationTime) {
|
||||
final ObjectMessage ack = plaintext.getAckMessage();
|
||||
messageRepo.save(plaintext);
|
||||
Item item = new Item(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES,
|
||||
expirationTime, plaintext);
|
||||
powRepo.putObject(item);
|
||||
cryptography.doProofOfWork(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
|
||||
ObjectMessage object = powRepo.getItem(initialHash).object;
|
||||
object.setNonce(nonce);
|
||||
Plaintext plaintext = messageRepo.getMessage(initialHash);
|
||||
if (plaintext != null) {
|
||||
plaintext.setInventoryVector(object.getInventoryVector());
|
||||
messageRepo.save(plaintext);
|
||||
Item item = powRepo.getItem(initialHash);
|
||||
if (item.message == null) {
|
||||
ObjectMessage object = powRepo.getItem(initialHash).object;
|
||||
object.setNonce(nonce);
|
||||
Plaintext plaintext = messageRepo.getMessage(initialHash);
|
||||
if (plaintext != null) {
|
||||
plaintext.setInventoryVector(object.getInventoryVector());
|
||||
plaintext.updateNextTry();
|
||||
ctx.getLabeler().markAsSent(plaintext);
|
||||
messageRepo.save(plaintext);
|
||||
}
|
||||
ctx.getInventory().storeObject(object);
|
||||
ctx.getNetworkHandler().offer(object.getInventoryVector());
|
||||
} else {
|
||||
item.message.getAckMessage().setNonce(nonce);
|
||||
final ObjectMessage object = new ObjectMessage.Builder()
|
||||
.stream(item.message.getStream())
|
||||
.expiresTime(item.expirationTime)
|
||||
.payload(new Msg(item.message))
|
||||
.build();
|
||||
if (object.isSigned()) {
|
||||
object.sign(item.message.getFrom().getPrivateKey());
|
||||
}
|
||||
if (object.getPayload() instanceof Encrypted) {
|
||||
object.encrypt(item.message.getTo().getPubkey());
|
||||
}
|
||||
doProofOfWork(item.message.getTo(), object);
|
||||
}
|
||||
ctx.getInventory().storeObject(object);
|
||||
powRepo.removeObject(initialHash);
|
||||
ctx.getNetworkHandler().offer(object.getInventoryVector());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContext(InternalContext ctx) {
|
||||
this.ctx = ctx;
|
||||
this.cryptography = security();
|
||||
this.cryptography = cryptography();
|
||||
this.powRepo = ctx.getProofOfWorkRepository();
|
||||
this.messageRepo = ctx.getMessageRepository();
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package ch.dissem.bitmessage.entity;
|
||||
|
||||
import ch.dissem.bitmessage.entity.payload.Pubkey;
|
||||
import ch.dissem.bitmessage.entity.payload.Pubkey.Feature;
|
||||
import ch.dissem.bitmessage.entity.payload.V4Pubkey;
|
||||
import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
|
||||
import ch.dissem.bitmessage.exception.ApplicationException;
|
||||
@ -36,7 +37,7 @@ import java.util.Objects;
|
||||
|
||||
import static ch.dissem.bitmessage.utils.Decode.bytes;
|
||||
import static ch.dissem.bitmessage.utils.Decode.varInt;
|
||||
import static ch.dissem.bitmessage.utils.Singleton.security;
|
||||
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
|
||||
|
||||
/**
|
||||
* A Bitmessage address. Can be a user's private address, an address string without public keys or a recipient's address
|
||||
@ -73,19 +74,19 @@ public class BitmessageAddress implements Serializable {
|
||||
Encode.varInt(version, os);
|
||||
Encode.varInt(stream, os);
|
||||
if (version < 4) {
|
||||
byte[] checksum = security().sha512(os.toByteArray(), ripe);
|
||||
byte[] checksum = cryptography().sha512(os.toByteArray(), ripe);
|
||||
this.tag = null;
|
||||
this.publicDecryptionKey = Arrays.copyOfRange(checksum, 0, 32);
|
||||
} else {
|
||||
// for tag and decryption key, the checksum has to be created with 0x00 padding
|
||||
byte[] checksum = security().doubleSha512(os.toByteArray(), ripe);
|
||||
byte[] checksum = cryptography().doubleSha512(os.toByteArray(), ripe);
|
||||
this.tag = Arrays.copyOfRange(checksum, 32, 64);
|
||||
this.publicDecryptionKey = Arrays.copyOfRange(checksum, 0, 32);
|
||||
}
|
||||
// but for the address and its checksum they need to be stripped
|
||||
int offset = Bytes.numberOfLeadingZeros(ripe);
|
||||
os.write(ripe, offset, ripe.length - offset);
|
||||
byte[] checksum = security().doubleSha512(os.toByteArray());
|
||||
byte[] checksum = cryptography().doubleSha512(os.toByteArray());
|
||||
os.write(checksum, 0, 4);
|
||||
this.address = "BM-" + Base58.encode(os.toByteArray());
|
||||
} catch (IOException e) {
|
||||
@ -146,18 +147,18 @@ public class BitmessageAddress implements Serializable {
|
||||
this.ripe = Bytes.expand(bytes(in, bytes.length - counter.length() - 4), 20);
|
||||
|
||||
// test checksum
|
||||
byte[] checksum = security().doubleSha512(bytes, bytes.length - 4);
|
||||
byte[] checksum = cryptography().doubleSha512(bytes, bytes.length - 4);
|
||||
byte[] expectedChecksum = bytes(in, 4);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
if (expectedChecksum[i] != checksum[i])
|
||||
throw new IllegalArgumentException("Checksum of address failed");
|
||||
}
|
||||
if (version < 4) {
|
||||
checksum = security().sha512(Arrays.copyOfRange(bytes, 0, counter.length()), ripe);
|
||||
checksum = cryptography().sha512(Arrays.copyOfRange(bytes, 0, counter.length()), ripe);
|
||||
this.tag = null;
|
||||
this.publicDecryptionKey = Arrays.copyOfRange(checksum, 0, 32);
|
||||
} else {
|
||||
checksum = security().doubleSha512(Arrays.copyOfRange(bytes, 0, counter.length()), ripe);
|
||||
checksum = cryptography().doubleSha512(Arrays.copyOfRange(bytes, 0, counter.length()), ripe);
|
||||
this.tag = Arrays.copyOfRange(checksum, 32, 64);
|
||||
this.publicDecryptionKey = Arrays.copyOfRange(checksum, 0, 32);
|
||||
}
|
||||
@ -172,7 +173,7 @@ public class BitmessageAddress implements Serializable {
|
||||
Encode.varInt(version, out);
|
||||
Encode.varInt(stream, out);
|
||||
out.write(ripe);
|
||||
return Arrays.copyOfRange(security().doubleSha512(out.toByteArray()), 32, 64);
|
||||
return Arrays.copyOfRange(cryptography().doubleSha512(out.toByteArray()), 32, 64);
|
||||
} catch (IOException e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
@ -266,4 +267,11 @@ public class BitmessageAddress implements Serializable {
|
||||
public void setChan(boolean chan) {
|
||||
this.chan = chan;
|
||||
}
|
||||
|
||||
public boolean has(Feature feature) {
|
||||
if (pubkey == null || feature == null) {
|
||||
return false;
|
||||
}
|
||||
return feature.isActive(pubkey.getBehaviorBitfield());
|
||||
}
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import java.security.GeneralSecurityException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.NoSuchProviderException;
|
||||
|
||||
import static ch.dissem.bitmessage.utils.Singleton.security;
|
||||
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
|
||||
|
||||
/**
|
||||
* A network message is exchanged between two nodes.
|
||||
@ -51,7 +51,7 @@ public class NetworkMessage implements Streamable {
|
||||
* First 4 bytes of sha512(payload)
|
||||
*/
|
||||
private byte[] getChecksum(byte[] bytes) throws NoSuchProviderException, NoSuchAlgorithmException {
|
||||
byte[] d = security().sha512(bytes);
|
||||
byte[] d = cryptography().sha512(bytes);
|
||||
return new byte[]{d[0], d[1], d[2], d[3]};
|
||||
}
|
||||
|
||||
|
@ -29,8 +29,10 @@ import ch.dissem.bitmessage.utils.Encode;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
|
||||
import static ch.dissem.bitmessage.utils.Singleton.security;
|
||||
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
|
||||
|
||||
/**
|
||||
* The 'object' command sends an object that is shared throughout the network.
|
||||
@ -55,7 +57,7 @@ public class ObjectMessage implements MessagePayload {
|
||||
expiresTime = builder.expiresTime;
|
||||
objectType = builder.objectType;
|
||||
version = builder.payload.getVersion();
|
||||
stream = builder.streamNumber;
|
||||
stream = builder.streamNumber > 0 ? builder.streamNumber : builder.payload.getStream();
|
||||
payload = builder.payload;
|
||||
}
|
||||
|
||||
@ -94,7 +96,7 @@ public class ObjectMessage implements MessagePayload {
|
||||
|
||||
public InventoryVector getInventoryVector() {
|
||||
return new InventoryVector(
|
||||
Bytes.truncate(security().doubleSha512(nonce, getPayloadBytesWithoutNonce()), 32)
|
||||
Bytes.truncate(cryptography().doubleSha512(nonce, getPayloadBytesWithoutNonce()), 32)
|
||||
);
|
||||
}
|
||||
|
||||
@ -119,7 +121,7 @@ public class ObjectMessage implements MessagePayload {
|
||||
|
||||
public void sign(PrivateKey key) {
|
||||
if (payload.isSigned()) {
|
||||
payload.setSignature(security().getSignature(getBytesToSign(), key));
|
||||
payload.setSignature(cryptography().getSignature(getBytesToSign(), key));
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,7 +155,7 @@ public class ObjectMessage implements MessagePayload {
|
||||
|
||||
public boolean isSignatureValid(Pubkey pubkey) throws IOException {
|
||||
if (isEncrypted()) throw new IllegalStateException("Payload must be decrypted first");
|
||||
return security().isSignatureValid(getBytesToSign(), payload.getSignature(), pubkey);
|
||||
return cryptography().isSignatureValid(getBytesToSign(), payload.getSignature(), pubkey);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -230,4 +232,29 @@ public class ObjectMessage implements MessagePayload {
|
||||
return new ObjectMessage(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
ObjectMessage that = (ObjectMessage) o;
|
||||
|
||||
return expiresTime == that.expiresTime &&
|
||||
objectType == that.objectType &&
|
||||
version == that.version &&
|
||||
stream == that.stream &&
|
||||
Objects.equals(payload, that.payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = Arrays.hashCode(nonce);
|
||||
result = 31 * result + (int) (expiresTime ^ (expiresTime >>> 32));
|
||||
result = 31 * result + (int) (objectType ^ (objectType >>> 32));
|
||||
result = 31 * result + (int) (version ^ (version >>> 32));
|
||||
result = 31 * result + (int) (stream ^ (stream >>> 32));
|
||||
result = 31 * result + (payload != null ? payload.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -16,16 +16,19 @@
|
||||
|
||||
package ch.dissem.bitmessage.entity;
|
||||
|
||||
import ch.dissem.bitmessage.entity.payload.Msg;
|
||||
import ch.dissem.bitmessage.entity.payload.Pubkey.Feature;
|
||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
|
||||
import ch.dissem.bitmessage.entity.valueobject.Label;
|
||||
import ch.dissem.bitmessage.exception.ApplicationException;
|
||||
import ch.dissem.bitmessage.factory.Factory;
|
||||
import ch.dissem.bitmessage.utils.Decode;
|
||||
import ch.dissem.bitmessage.utils.Encode;
|
||||
import ch.dissem.bitmessage.utils.UnixTime;
|
||||
import ch.dissem.bitmessage.utils.*;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
|
||||
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
|
||||
|
||||
/**
|
||||
* The unencrypted message to be sent by 'msg' or 'broadcast'.
|
||||
@ -37,7 +40,8 @@ public class Plaintext implements Streamable {
|
||||
private final BitmessageAddress from;
|
||||
private final long encoding;
|
||||
private final byte[] message;
|
||||
private final byte[] ack;
|
||||
private final byte[] ackData;
|
||||
private ObjectMessage ackMessage;
|
||||
private Object id;
|
||||
private InventoryVector inventoryVector;
|
||||
private BitmessageAddress to;
|
||||
@ -49,6 +53,10 @@ public class Plaintext implements Streamable {
|
||||
private Set<Label> labels;
|
||||
private byte[] initialHash;
|
||||
|
||||
private long ttl;
|
||||
private int retries;
|
||||
private Long nextTry;
|
||||
|
||||
private Plaintext(Builder builder) {
|
||||
id = builder.id;
|
||||
inventoryVector = builder.inventoryVector;
|
||||
@ -57,12 +65,21 @@ public class Plaintext implements Streamable {
|
||||
to = builder.to;
|
||||
encoding = builder.encoding;
|
||||
message = builder.message;
|
||||
ack = builder.ack;
|
||||
ackData = builder.ackData;
|
||||
if (builder.ackMessage != null && builder.ackMessage.length > 0) {
|
||||
ackMessage = Factory.getObjectMessage(
|
||||
3,
|
||||
new ByteArrayInputStream(builder.ackMessage),
|
||||
builder.ackMessage.length);
|
||||
}
|
||||
signature = builder.signature;
|
||||
status = builder.status;
|
||||
sent = builder.sent;
|
||||
received = builder.received;
|
||||
labels = builder.labels;
|
||||
ttl = builder.ttl;
|
||||
retries = builder.retries;
|
||||
nextTry = builder.nextTry;
|
||||
}
|
||||
|
||||
public static Plaintext read(Type type, InputStream in) throws IOException {
|
||||
@ -85,7 +102,7 @@ public class Plaintext implements Streamable {
|
||||
.destinationRipe(type == Type.MSG ? Decode.bytes(in, 20) : null)
|
||||
.encoding(Decode.varInt(in))
|
||||
.message(Decode.varBytes(in))
|
||||
.ack(type == Type.MSG ? Decode.varBytes(in) : null);
|
||||
.ackMessage(type == Type.MSG ? Decode.varBytes(in) : null);
|
||||
}
|
||||
|
||||
public InventoryVector getInventoryVector() {
|
||||
@ -163,8 +180,13 @@ public class Plaintext implements Streamable {
|
||||
Encode.varInt(message.length, out);
|
||||
out.write(message);
|
||||
if (type == Type.MSG) {
|
||||
Encode.varInt(ack.length, out);
|
||||
out.write(ack);
|
||||
if (to.has(Feature.DOES_ACK) && getAckMessage() != null) {
|
||||
ByteArrayOutputStream ack = new ByteArrayOutputStream();
|
||||
getAckMessage().write(ack);
|
||||
Encode.varBytes(ack.toByteArray(), out);
|
||||
} else {
|
||||
Encode.varInt(0, out);
|
||||
}
|
||||
}
|
||||
if (includeSignature) {
|
||||
if (signature == null) {
|
||||
@ -206,6 +228,30 @@ public class Plaintext implements Streamable {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public long getTTL() {
|
||||
return ttl;
|
||||
}
|
||||
|
||||
public int getRetries() {
|
||||
return retries;
|
||||
}
|
||||
|
||||
public Long getNextTry() {
|
||||
return nextTry;
|
||||
}
|
||||
|
||||
public void updateNextTry() {
|
||||
if (nextTry == null) {
|
||||
if (sent != null && to.has(Feature.DOES_ACK)) {
|
||||
nextTry = UnixTime.now(+ttl);
|
||||
retries++;
|
||||
}
|
||||
} else {
|
||||
nextTry = nextTry + (1 << retries) * ttl;
|
||||
retries++;
|
||||
}
|
||||
}
|
||||
|
||||
public String getSubject() {
|
||||
Scanner s = new Scanner(new ByteArrayInputStream(message), "UTF-8");
|
||||
String firstLine = s.nextLine();
|
||||
@ -238,7 +284,7 @@ public class Plaintext implements Streamable {
|
||||
return Objects.equals(encoding, plaintext.encoding) &&
|
||||
Objects.equals(from, plaintext.from) &&
|
||||
Arrays.equals(message, plaintext.message) &&
|
||||
Arrays.equals(ack, plaintext.ack) &&
|
||||
Objects.equals(getAckMessage(), plaintext.getAckMessage()) &&
|
||||
Arrays.equals(to.getRipe(), plaintext.to.getRipe()) &&
|
||||
Arrays.equals(signature, plaintext.signature) &&
|
||||
Objects.equals(status, plaintext.status) &&
|
||||
@ -249,7 +295,7 @@ public class Plaintext implements Streamable {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(from, encoding, message, ack, to, signature, status, sent, received, labels);
|
||||
return Objects.hash(from, encoding, message, ackData, to, signature, status, sent, received, labels);
|
||||
}
|
||||
|
||||
public void addLabels(Label... labels) {
|
||||
@ -260,10 +306,33 @@ public class Plaintext implements Streamable {
|
||||
|
||||
public void addLabels(Collection<Label> labels) {
|
||||
if (labels != null) {
|
||||
this.labels.addAll(labels);
|
||||
for (Label label : labels) {
|
||||
this.labels.add(label);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void removeLabel(Label.Type type) {
|
||||
Iterator<Label> iterator = labels.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Label label = iterator.next();
|
||||
if (label.getType() == type) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public byte[] getAckData() {
|
||||
return ackData;
|
||||
}
|
||||
|
||||
public ObjectMessage getAckMessage() {
|
||||
if (ackMessage == null) {
|
||||
ackMessage = Factory.createAck(this);
|
||||
}
|
||||
return ackMessage;
|
||||
}
|
||||
|
||||
public void setInitialHash(byte[] initialHash) {
|
||||
this.initialHash = initialHash;
|
||||
}
|
||||
@ -316,12 +385,16 @@ public class Plaintext implements Streamable {
|
||||
private byte[] destinationRipe;
|
||||
private long encoding;
|
||||
private byte[] message = new byte[0];
|
||||
private byte[] ack = new byte[0];
|
||||
private byte[] ackData;
|
||||
private byte[] ackMessage;
|
||||
private byte[] signature;
|
||||
private long sent;
|
||||
private long received;
|
||||
private Status status;
|
||||
private Set<Label> labels = new HashSet<>();
|
||||
private long ttl;
|
||||
private int retries;
|
||||
private Long nextTry;
|
||||
|
||||
public Builder(Type type) {
|
||||
this.type = type;
|
||||
@ -415,9 +488,16 @@ public class Plaintext implements Streamable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder ack(byte[] ack) {
|
||||
if (type != Type.MSG && ack != null) throw new IllegalArgumentException("ack only allowed for msg");
|
||||
this.ack = ack;
|
||||
public Builder ackMessage(byte[] ack) {
|
||||
if (type != Type.MSG && ack != null) throw new IllegalArgumentException("ackMessage only allowed for msg");
|
||||
this.ackMessage = ack;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder ackData(byte[] ackData) {
|
||||
if (type != Type.MSG && ackData != null)
|
||||
throw new IllegalArgumentException("ackMessage only allowed for msg");
|
||||
this.ackData = ackData;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -446,6 +526,21 @@ public class Plaintext implements Streamable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder ttl(long ttl) {
|
||||
this.ttl = ttl;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder retries(int retries) {
|
||||