Improved labeler to cover all cases, and fixed when labels are set while sending (outbox vs sent)

Removed message callback with both didn't work and is obsolete (use a labeler descendant)
This commit is contained in:
Christian Basler 2016-04-29 15:29:22 +02:00
parent 8df42a6cf0
commit ea2cd7bf53
9 changed files with 58 additions and 170 deletions

View File

@ -1,47 +0,0 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
/**
* Default implementation that doesn't do anything.
*
* @author Christian Basler
*/
public class BaseMessageCallback implements MessageCallback {
@Override
public void proofOfWorkStarted(ObjectPayload message) {
// No op
}
@Override
public void proofOfWorkCompleted(ObjectPayload message) {
// No op
}
@Override
public void messageOffered(ObjectPayload message, InventoryVector iv) {
// No op
}
@Override
public void messageAcknowledged(InventoryVector iv) {
// No op
}
}

View File

@ -22,7 +22,6 @@ import ch.dissem.bitmessage.entity.payload.Msg;
import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.payload.ObjectType; import ch.dissem.bitmessage.entity.payload.ObjectType;
import ch.dissem.bitmessage.entity.payload.Pubkey.Feature; import ch.dissem.bitmessage.entity.payload.Pubkey.Feature;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.entity.valueobject.PrivateKey; import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.DecryptionFailedException; import ch.dissem.bitmessage.exception.DecryptionFailedException;
@ -37,11 +36,12 @@ import java.net.InetAddress;
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.*; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES; import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE; import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST; import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
import static ch.dissem.bitmessage.utils.UnixTime.*; import static ch.dissem.bitmessage.utils.UnixTime.*;
@ -155,8 +155,8 @@ public class BitmessageContext {
.from(from) .from(from)
.to(to) .to(to)
.message(subject, message) .message(subject, message)
.labels(messages().getLabels(Label.Type.SENT))
.build(); .build();
labeler().markAsSending(msg);
send(msg); send(msg);
} }
@ -171,15 +171,11 @@ public class BitmessageContext {
ctx.requestPubkey(to); ctx.requestPubkey(to);
} }
if (to.getPubkey() == null) { if (to.getPubkey() == null) {
msg.setStatus(PUBKEY_REQUESTED);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX));
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);
} }
} }
if (to == null || to.getPubkey() != null) { if (to == null || to.getPubkey() != null) {
LOG.info("Sending message."); LOG.info("Sending message.");
msg.setStatus(DOING_PROOF_OF_WORK);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX));
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);
ctx.send( ctx.send(
msg.getFrom(), msg.getFrom(),
@ -187,9 +183,6 @@ public class BitmessageContext {
wrapInObjectPayload(msg), wrapInObjectPayload(msg),
TTL.msg() TTL.msg()
); );
msg.setStatus(SENT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
ctx.getMessageRepository().save(msg);
} }
} }
@ -310,7 +303,6 @@ public class BitmessageContext {
ProofOfWorkRepository proofOfWorkRepository; ProofOfWorkRepository proofOfWorkRepository;
ProofOfWorkEngine proofOfWorkEngine; ProofOfWorkEngine proofOfWorkEngine;
Cryptography cryptography; Cryptography cryptography;
MessageCallback messageCallback;
CustomCommandHandler customCommandHandler; CustomCommandHandler customCommandHandler;
Labeler labeler; Labeler labeler;
Listener listener; Listener listener;
@ -358,11 +350,6 @@ public class BitmessageContext {
return this; return this;
} }
public Builder messageCallback(MessageCallback callback) {
this.messageCallback = callback;
return this;
}
public Builder customCommandHandler(CustomCommandHandler handler) { public Builder customCommandHandler(CustomCommandHandler handler) {
this.customCommandHandler = handler; this.customCommandHandler = handler;
return this; return this;
@ -429,9 +416,6 @@ public class BitmessageContext {
if (proofOfWorkEngine == null) { if (proofOfWorkEngine == null) {
proofOfWorkEngine = new MultiThreadedPOWEngine(); proofOfWorkEngine = new MultiThreadedPOWEngine();
} }
if (messageCallback == null) {
messageCallback = new BaseMessageCallback();
}
if (labeler == null) { if (labeler == null) {
labeler = new DefaultLabeler(); labeler = new DefaultLabeler();
} }

View File

@ -109,7 +109,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
List<Plaintext> messages = ctx.getMessageRepository().findMessages(PUBKEY_REQUESTED, address); List<Plaintext> messages = ctx.getMessageRepository().findMessages(PUBKEY_REQUESTED, address);
LOG.info("Sending " + messages.size() + " messages for contact " + address); LOG.info("Sending " + messages.size() + " messages for contact " + address);
for (Plaintext msg : messages) { for (Plaintext msg : messages) {
msg.setStatus(DOING_PROOF_OF_WORK); ctx.getLabeler().markAsSending(msg);
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);
ctx.send( ctx.send(
msg.getFrom(), msg.getFrom(),
@ -117,8 +117,6 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
new Msg(msg), new Msg(msg),
+2 * DAY +2 * DAY
); );
msg.setStatus(SENT);
ctx.getMessageRepository().save(msg);
} }
} }
@ -158,7 +156,6 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
} }
protected void receive(InventoryVector iv, Plaintext msg) { protected void receive(InventoryVector iv, Plaintext msg) {
msg.setStatus(RECEIVED);
msg.setInventoryVector(iv); msg.setInventoryVector(iv);
labeler.setLabels(msg); labeler.setLabels(msg);
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);

View File

@ -18,7 +18,6 @@ package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.payload.*;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.ports.*; import ch.dissem.bitmessage.ports.*;
import ch.dissem.bitmessage.utils.Singleton; import ch.dissem.bitmessage.utils.Singleton;
@ -31,8 +30,6 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.TreeSet; import java.util.TreeSet;
import static ch.dissem.bitmessage.entity.Plaintext.Status.SENT;
/** /**
* The internal context should normally only be used for port implementations. If you need it in your client * The internal context should normally only be used for port implementations. If you need it in your client
* implementation, you're either doing something wrong, something very weird, or the BitmessageContext should * implementation, you're either doing something wrong, something very weird, or the BitmessageContext should
@ -55,9 +52,9 @@ public class InternalContext {
private final MessageRepository messageRepository; private final MessageRepository messageRepository;
private final ProofOfWorkRepository proofOfWorkRepository; private final ProofOfWorkRepository proofOfWorkRepository;
private final ProofOfWorkEngine proofOfWorkEngine; private final ProofOfWorkEngine proofOfWorkEngine;
private final MessageCallback messageCallback;
private final CustomCommandHandler customCommandHandler; private final CustomCommandHandler customCommandHandler;
private final ProofOfWorkService proofOfWorkService; private final ProofOfWorkService proofOfWorkService;
private final Labeler labeler;
private final TreeSet<Long> streams = new TreeSet<>(); private final TreeSet<Long> streams = new TreeSet<>();
private final int port; private final int port;
@ -76,11 +73,11 @@ public class InternalContext {
this.proofOfWorkService = new ProofOfWorkService(); this.proofOfWorkService = new ProofOfWorkService();
this.proofOfWorkEngine = builder.proofOfWorkEngine; this.proofOfWorkEngine = builder.proofOfWorkEngine;
this.clientNonce = cryptography.randomNonce(); this.clientNonce = cryptography.randomNonce();
this.messageCallback = builder.messageCallback;
this.customCommandHandler = builder.customCommandHandler; this.customCommandHandler = builder.customCommandHandler;
this.port = builder.port; this.port = builder.port;
this.connectionLimit = builder.connectionLimit; this.connectionLimit = builder.connectionLimit;
this.connectionTTL = builder.connectionTTL; this.connectionTTL = builder.connectionTTL;
this.labeler = builder.labeler;
Singleton.initialize(cryptography); Singleton.initialize(cryptography);
@ -96,8 +93,7 @@ public class InternalContext {
} }
init(cryptography, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, init(cryptography, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository,
proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler);
messageCallback, customCommandHandler, builder.labeler);
for (BitmessageAddress identity : addressRepository.getIdentities()) { for (BitmessageAddress identity : addressRepository.getIdentities()) {
streams.add(identity.getStream()); streams.add(identity.getStream());
} }
@ -147,6 +143,10 @@ public class InternalContext {
return proofOfWorkService; return proofOfWorkService;
} }
public Labeler getLabeler() {
return labeler;
}
public long[] getStreams() { public long[] getStreams() {
long[] result = new long[streams.size()]; long[] result = new long[streams.size()];
int i = 0; int i = 0;
@ -176,7 +176,6 @@ public class InternalContext {
} }
if (payload instanceof Msg && recipient.has(Pubkey.Feature.DOES_ACK)) { if (payload instanceof Msg && recipient.has(Pubkey.Feature.DOES_ACK)) {
ObjectMessage ackMessage = ((Msg) payload).getPlaintext().getAckMessage(); ObjectMessage ackMessage = ((Msg) payload).getPlaintext().getAckMessage();
messageCallback.proofOfWorkStarted(payload);
cryptography.doProofOfWork(ackMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, new ProofOfWorkEngine.Callback() { cryptography.doProofOfWork(ackMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, new ProofOfWorkEngine.Callback() {
@Override @Override
public void onNonceCalculated(byte[] initialHash, byte[] nonce) { public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
@ -208,7 +207,6 @@ public class InternalContext {
.build(); .build();
response.sign(identity.getPrivateKey()); response.sign(identity.getPrivateKey());
response.encrypt(cryptography.createPublicKey(identity.getPublicDecryptionKey())); response.encrypt(cryptography.createPublicKey(identity.getPublicDecryptionKey()));
messageCallback.proofOfWorkStarted(identity.getPubkey());
// TODO: remember that the pubkey is just about to be sent, and on which stream! // TODO: remember that the pubkey is just about to be sent, and on which stream!
proofOfWorkService.doProofOfWork(response); proofOfWorkService.doProofOfWork(response);
} catch (IOException e) { } catch (IOException e) {
@ -245,7 +243,6 @@ public class InternalContext {
.expiresTime(expires) .expiresTime(expires)
.payload(new GetPubkey(contact)) .payload(new GetPubkey(contact))
.build(); .build();
messageCallback.proofOfWorkStarted(request.getPayload());
proofOfWorkService.doProofOfWork(request); proofOfWorkService.doProofOfWork(request);
} }
@ -302,31 +299,4 @@ public class InternalContext {
public interface ContextHolder { public interface ContextHolder {
void setContext(InternalContext context); void setContext(InternalContext context);
} }
private class ProofOfWorkCallback implements ProofOfWorkEngine.Callback {
private final ObjectMessage object;
private final ObjectPayload payload;
private ProofOfWorkCallback(ObjectMessage object, ObjectPayload payload) {
this.object = object;
this.payload = payload;
}
@Override
public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
object.setNonce(nonce);
messageCallback.proofOfWorkCompleted(payload);
if (payload instanceof PlaintextHolder) {
Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext();
plaintext.setInventoryVector(object.getInventoryVector());
plaintext.setStatus(SENT);
plaintext.removeLabel(Label.Type.OUTBOX);
plaintext.addLabels(messageRepository.getLabels(Label.Type.SENT));
messageRepository.save(plaintext);
}
inventory.storeObject(object);
networkHandler.offer(object.getInventoryVector());
messageCallback.messageOffered(payload, object.getInventoryVector());
}
}
} }

View File

@ -1,52 +0,0 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
/**
* Callback for message sending events, mostly so the user can be notified when POW is done.
*/
public interface MessageCallback {
/**
* Called before calculation of proof of work begins.
*/
void proofOfWorkStarted(ObjectPayload message);
/**
* Called after calculation of proof of work finished.
*/
void proofOfWorkCompleted(ObjectPayload message);
/**
* Called once the message is offered to the network. Please note that this doesn't mean the message was sent,
* if the client is not connected to the network it's just stored in the inventory.
* <p>
* Also, please note that this is where the original payload as well as the {@link InventoryVector} of the sent
* message is available. If the callback needs the IV for some reason, it should be retrieved here. (Plaintext
* and Broadcast messages will have their IV property set automatically though.)
* </p>
*/
void messageOffered(ObjectPayload message, InventoryVector iv);
/**
* This isn't called yet, as ACK messages aren't being processed yet. Also, this is only relevant for Plaintext
* messages.
*/
void messageAcknowledged(InventoryVector iv);
}

View File

@ -66,6 +66,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
Plaintext plaintext = messageRepo.getMessage(initialHash); Plaintext plaintext = messageRepo.getMessage(initialHash);
if (plaintext != null) { if (plaintext != null) {
plaintext.setInventoryVector(object.getInventoryVector()); plaintext.setInventoryVector(object.getInventoryVector());
ctx.getLabeler().markAsSent(plaintext);
messageRepo.save(plaintext); messageRepo.save(plaintext);
} }
ctx.getInventory().storeObject(object); ctx.getInventory().storeObject(object);

View File

@ -20,13 +20,14 @@ import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.entity.valueobject.Label;
import java.util.Iterator; import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
public class DefaultLabeler implements Labeler, InternalContext.ContextHolder { public class DefaultLabeler implements Labeler, InternalContext.ContextHolder {
private InternalContext ctx; private InternalContext ctx;
@Override @Override
public void setLabels(Plaintext msg) { public void setLabels(Plaintext msg) {
msg.setStatus(RECEIVED);
if (msg.getType() == Plaintext.Type.BROADCAST) { if (msg.getType() == Plaintext.Type.BROADCAST) {
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.INBOX, Label.Type.BROADCAST, Label.Type.UNREAD)); msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.INBOX, Label.Type.BROADCAST, Label.Type.UNREAD));
} else { } else {
@ -34,15 +35,38 @@ public class DefaultLabeler implements Labeler, InternalContext.ContextHolder {
} }
} }
@Override
public void markAsDraft(Plaintext msg) {
msg.setStatus(DRAFT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.DRAFT));
}
@Override
public void markAsSending(Plaintext msg) {
if (msg.getTo() != null || msg.getTo().getPubkey() == null) {
msg.setStatus(PUBKEY_REQUESTED);
} else {
msg.setStatus(DOING_PROOF_OF_WORK);
}
msg.removeLabel(Label.Type.DRAFT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX));
}
@Override
public void markAsSent(Plaintext msg) {
msg.setStatus(SENT);
msg.removeLabel(Label.Type.OUTBOX);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
}
@Override
public void markAsAcknowledged(Plaintext msg) {
msg.setStatus(SENT_ACKNOWLEDGED);
}
@Override @Override
public void markAsRead(Plaintext msg) { public void markAsRead(Plaintext msg) {
Iterator<Label> iterator = msg.getLabels().iterator(); msg.removeLabel(Label.Type.UNREAD);
while (iterator.hasNext()) {
Label label = iterator.next();
if (label.getType() == Label.Type.UNREAD) {
iterator.remove();
}
}
} }
@Override @Override

View File

@ -19,7 +19,11 @@ package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.Plaintext;
/** /**
* Defines and sets labels * Defines and sets labels. Note that it should also update the status field of a message.
* <p>
* As the labeler gets called whenever the state of a message changes, it can also be used
* as a listener.
* </p>
*/ */
public interface Labeler { public interface Labeler {
/** /**
@ -29,6 +33,14 @@ public interface Labeler {
*/ */
void setLabels(Plaintext msg); void setLabels(Plaintext msg);
void markAsDraft(Plaintext msg);
void markAsSending(Plaintext msg);
void markAsSent(Plaintext msg);
void markAsAcknowledged(Plaintext msg);
void markAsRead(Plaintext msg); void markAsRead(Plaintext msg);
void markAsUnread(Plaintext msg); void markAsUnread(Plaintext msg);

View File

@ -56,7 +56,6 @@ public class BitmessageContextTest {
.cryptography(new BouncyCryptography()) .cryptography(new BouncyCryptography())
.inventory(mock(Inventory.class)) .inventory(mock(Inventory.class))
.listener(listener) .listener(listener)
.messageCallback(mock(MessageCallback.class))
.messageRepo(mock(MessageRepository.class)) .messageRepo(mock(MessageRepository.class))
.networkHandler(mock(NetworkHandler.class)) .networkHandler(mock(NetworkHandler.class))
.nodeRegistry(mock(NodeRegistry.class)) .nodeRegistry(mock(NodeRegistry.class))