Fixed / improved message sending and added a system test (it's a start)

This commit is contained in:
Christian Basler 2016-02-02 20:40:01 +01:00
parent 5f4dbfc985
commit edd8045327
7 changed files with 246 additions and 116 deletions

View File

@ -17,20 +17,21 @@
package ch.dissem.bitmessage; package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.payload.Broadcast;
import ch.dissem.bitmessage.entity.payload.Msg;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.payload.ObjectType;
import ch.dissem.bitmessage.entity.payload.Pubkey.Feature; import ch.dissem.bitmessage.entity.payload.Pubkey.Feature;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.entity.valueobject.PrivateKey; import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
import ch.dissem.bitmessage.exception.DecryptionFailedException; import ch.dissem.bitmessage.exception.DecryptionFailedException;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.*; import ch.dissem.bitmessage.ports.*;
import ch.dissem.bitmessage.utils.Property; import ch.dissem.bitmessage.utils.Property;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Arrays;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -38,9 +39,7 @@ import java.util.concurrent.*;
import static ch.dissem.bitmessage.entity.Plaintext.Status.*; import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST; import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
import static ch.dissem.bitmessage.utils.UnixTime.DAY; import static ch.dissem.bitmessage.utils.UnixTime.*;
import static ch.dissem.bitmessage.utils.UnixTime.HOUR;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/** /**
* <p>Use this class if you want to create a Bitmessage client.</p> * <p>Use this class if you want to create a Bitmessage client.</p>
@ -122,67 +121,24 @@ public class BitmessageContext {
} }
public void broadcast(final BitmessageAddress from, final String subject, final String message) { public void broadcast(final BitmessageAddress from, final String subject, final String message) {
pool.submit(new Runnable() { Plaintext msg = new Plaintext.Builder(BROADCAST)
@Override .from(from)
public void run() { .message(subject, message)
Plaintext msg = new Plaintext.Builder(BROADCAST) .build();
.from(from) send(msg);
.message(subject, message)
.build();
LOG.info("Sending message.");
msg.setStatus(DOING_PROOF_OF_WORK);
ctx.getMessageRepository().save(msg);
ctx.send(
from,
from,
Factory.getBroadcast(from, msg),
+2 * DAY
);
msg.setStatus(SENT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.BROADCAST, Label.Type.SENT));
ctx.getMessageRepository().save(msg);
}
});
} }
public void send(final BitmessageAddress from, final BitmessageAddress to, final String subject, final String message) { public void send(final BitmessageAddress from, final BitmessageAddress to, final String subject, final String message) {
if (from.getPrivateKey() == null) { if (from.getPrivateKey() == null) {
throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key."); throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key.");
} }
pool.submit(new Runnable() { Plaintext msg = new Plaintext.Builder(MSG)
@Override .from(from)
public void run() { .to(to)
Plaintext msg = new Plaintext.Builder(MSG) .message(subject, message)
.from(from) .labels(messages().getLabels(Label.Type.SENT))
.to(to) .build();
.message(subject, message) send(msg);
.labels(messages().getLabels(Label.Type.SENT))
.build();
if (to.getPubkey() == null) {
tryToFindMatchingPubkey(to);
}
if (to.getPubkey() == null) {
LOG.info("Public key is missing from recipient. Requesting.");
requestPubkey(from, to);
msg.setStatus(PUBKEY_REQUESTED);
ctx.getMessageRepository().save(msg);
} else {
LOG.info("Sending message.");
msg.setStatus(DOING_PROOF_OF_WORK);
ctx.getMessageRepository().save(msg);
ctx.send(
from,
to,
new Msg(msg),
+2 * DAY
);
msg.setStatus(SENT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
ctx.getMessageRepository().save(msg);
}
}
});
} }
public void send(final Plaintext msg) { public void send(final Plaintext msg) {
@ -193,16 +149,18 @@ public class BitmessageContext {
@Override @Override
public void run() { public void run() {
BitmessageAddress to = msg.getTo(); BitmessageAddress to = msg.getTo();
if (to.getPubkey() == null) { if (to != null) {
tryToFindMatchingPubkey(to); if (to.getPubkey() == null) {
LOG.info("Public key is missing from recipient. Requesting.");
ctx.requestPubkey(to);
}
if (to.getPubkey() == null) {
msg.setStatus(PUBKEY_REQUESTED);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX));
ctx.getMessageRepository().save(msg);
}
} }
if (to.getPubkey() == null) { if (to == null || to.getPubkey() != null) {
LOG.info("Public key is missing from recipient. Requesting.");
requestPubkey(msg.getFrom(), to);
msg.setStatus(PUBKEY_REQUESTED);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX));
ctx.getMessageRepository().save(msg);
} else {
LOG.info("Sending message."); LOG.info("Sending message.");
msg.setStatus(DOING_PROOF_OF_WORK); msg.setStatus(DOING_PROOF_OF_WORK);
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);
@ -220,15 +178,6 @@ public class BitmessageContext {
}); });
} }
private void requestPubkey(BitmessageAddress requestingIdentity, BitmessageAddress address) {
ctx.send(
requestingIdentity,
address,
new GetPubkey(address),
+2 * DAY
);
}
public void startup() { public void startup() {
ctx.getNetworkHandler().start(networkListener); ctx.getNetworkHandler().start(networkListener);
} }
@ -281,41 +230,11 @@ public class BitmessageContext {
public void addContact(BitmessageAddress contact) { public void addContact(BitmessageAddress contact) {
ctx.getAddressRepository().save(contact); ctx.getAddressRepository().save(contact);
tryToFindMatchingPubkey(contact);
if (contact.getPubkey() == null) { if (contact.getPubkey() == null) {
ctx.requestPubkey(contact); ctx.requestPubkey(contact);
} }
} }
private void tryToFindMatchingPubkey(BitmessageAddress address) {
for (ObjectMessage object : ctx.getInventory().getObjects(address.getStream(), address.getVersion(), ObjectType.PUBKEY)) {
try {
Pubkey pubkey = (Pubkey) object.getPayload();
if (address.getVersion() == 4) {
V4Pubkey v4Pubkey = (V4Pubkey) pubkey;
if (Arrays.equals(address.getTag(), v4Pubkey.getTag())) {
v4Pubkey.decrypt(address.getPublicDecryptionKey());
if (object.isSignatureValid(v4Pubkey)) {
address.setPubkey(v4Pubkey);
ctx.getAddressRepository().save(address);
break;
} else {
LOG.info("Found pubkey for " + address + " but signature is invalid");
}
}
} else {
if (Arrays.equals(pubkey.getRipe(), address.getRipe())) {
address.setPubkey(pubkey);
ctx.getAddressRepository().save(address);
break;
}
}
} catch (Exception e) {
LOG.debug(e.getMessage(), e);
}
}
}
public void addSubscribtion(BitmessageAddress address) { public void addSubscribtion(BitmessageAddress address) {
address.setSubscribed(true); address.setSubscribed(true);
ctx.getAddressRepository().save(address); ctx.getAddressRepository().save(address);

View File

@ -19,9 +19,7 @@ package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Encrypted; import ch.dissem.bitmessage.entity.Encrypted;
import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.payload.Broadcast; import ch.dissem.bitmessage.entity.payload.*;
import ch.dissem.bitmessage.entity.payload.GetPubkey;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.ports.*; import ch.dissem.bitmessage.ports.*;
import ch.dissem.bitmessage.utils.Singleton; import ch.dissem.bitmessage.utils.Singleton;
import ch.dissem.bitmessage.utils.UnixTime; import ch.dissem.bitmessage.utils.UnixTime;
@ -29,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.TreeSet; import java.util.TreeSet;
/** /**
@ -212,16 +211,71 @@ public class InternalContext {
} }
} }
/**
* Be aware that if the pubkey already exists in the inventory, the metods will not request it and the callback
* for freshly received pubkeys will not be called. Instead the pubkey is added to the contact and stored on DB.
*/
public void requestPubkey(final BitmessageAddress contact) { public void requestPubkey(final BitmessageAddress contact) {
BitmessageAddress stored = addressRepository.getAddress(contact.getAddress());
tryToFindMatchingPubkey(contact);
if (contact.getPubkey() != null) {
if (stored != null) {
stored.setPubkey(contact.getPubkey());
addressRepository.save(stored);
} else {
addressRepository.save(contact);
}
return;
}
if (stored == null) {
addressRepository.save(contact);
}
long expires = UnixTime.now(+pubkeyTTL); long expires = UnixTime.now(+pubkeyTTL);
LOG.info("Expires at " + expires); LOG.info("Expires at " + expires);
final ObjectMessage response = new ObjectMessage.Builder() final ObjectMessage request = new ObjectMessage.Builder()
.stream(contact.getStream()) .stream(contact.getStream())
.expiresTime(expires) .expiresTime(expires)
.payload(new GetPubkey(contact)) .payload(new GetPubkey(contact))
.build(); .build();
messageCallback.proofOfWorkStarted(response.getPayload()); messageCallback.proofOfWorkStarted(request.getPayload());
proofOfWorkService.doProofOfWork(response); proofOfWorkService.doProofOfWork(request);
}
private void tryToFindMatchingPubkey(BitmessageAddress address) {
BitmessageAddress stored = addressRepository.getAddress(address.getAddress());
if (stored != null) {
address.setAlias(stored.getAlias());
address.setSubscribed(stored.isSubscribed());
}
for (ObjectMessage object : inventory.getObjects(address.getStream(), address.getVersion(), ObjectType.PUBKEY)) {
try {
Pubkey pubkey = (Pubkey) object.getPayload();
if (address.getVersion() == 4) {
V4Pubkey v4Pubkey = (V4Pubkey) pubkey;
if (Arrays.equals(address.getTag(), v4Pubkey.getTag())) {
v4Pubkey.decrypt(address.getPublicDecryptionKey());
if (object.isSignatureValid(v4Pubkey)) {
address.setPubkey(v4Pubkey);
addressRepository.save(address);
break;
} else {
LOG.info("Found pubkey for " + address + " but signature is invalid");
}
}
} else {
if (Arrays.equals(pubkey.getRipe(), address.getRipe())) {
address.setPubkey(pubkey);
addressRepository.save(address);
break;
}
}
} catch (Exception e) {
LOG.debug(e.getMessage(), e);
}
}
} }
public long getClientNonce() { public long getClientNonce() {

View File

@ -1,7 +1,7 @@
package ch.dissem.bitmessage.utils; package ch.dissem.bitmessage.utils;
/** /**
* Created by chrig on 07.12.2015. * @author Christian Basler
*/ */
public class Numbers { public class Numbers {
public static long max(long a, long b) { public static long max(long a, long b) {

View File

@ -30,4 +30,5 @@ dependencies {
compile 'args4j:args4j:2.32' compile 'args4j:args4j:2.32'
compile 'com.h2database:h2:1.4.190' compile 'com.h2database:h2:1.4.190'
testCompile 'junit:junit:4.11' testCompile 'junit:junit:4.11'
testCompile 'org.mockito:mockito-core:1.10.19'
} }

View File

@ -0,0 +1,79 @@
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography;
import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.repository.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
/**
* @author Christian Basler
*/
public class SystemTest {
static BitmessageContext alice;
static TestListener aliceListener = new TestListener();
static BitmessageAddress aliceIdentity;
static BitmessageContext bob;
static TestListener bobListener = new TestListener();
static BitmessageAddress bobIdentity;
@BeforeClass
public static void setUp() {
JdbcConfig aliceDB = new JdbcConfig("jdbc:h2:mem:alice;DB_CLOSE_DELAY=-1", "sa", "");
alice = new BitmessageContext.Builder()
.addressRepo(new JdbcAddressRepository(aliceDB))
.inventory(new JdbcInventory(aliceDB))
.messageRepo(new JdbcMessageRepository(aliceDB))
.powRepo(new JdbcProofOfWorkRepository(aliceDB))
.port(6001)
.nodeRegistry(new TestNodeRegistry(6002))
.networkHandler(new DefaultNetworkHandler())
.cryptography(new BouncyCryptography())
.listener(aliceListener)
.build();
alice.startup();
aliceIdentity = alice.createIdentity(false);
JdbcConfig bobDB = new JdbcConfig("jdbc:h2:mem:bob;DB_CLOSE_DELAY=-1", "sa", "");
bob = new BitmessageContext.Builder()
.addressRepo(new JdbcAddressRepository(bobDB))
.inventory(new JdbcInventory(bobDB))
.messageRepo(new JdbcMessageRepository(bobDB))
.powRepo(new JdbcProofOfWorkRepository(bobDB))
.port(6002)
.nodeRegistry(new TestNodeRegistry(6001))
.networkHandler(new DefaultNetworkHandler())
.cryptography(new BouncyCryptography())
.listener(bobListener)
.build();
bob.startup();
bobIdentity = bob.createIdentity(false);
}
@AfterClass
public static void tearDown() {
alice.shutdown();
bob.shutdown();
}
@Test
public void ensureAliceCanSendMessageToBob() throws Exception {
bobListener.reset();
String originalMessage = UUID.randomUUID().toString();
alice.send(aliceIdentity, new BitmessageAddress(bobIdentity.getAddress()), "Subject", originalMessage);
Plaintext plaintext = bobListener.get(5, TimeUnit.MINUTES);
assertThat(plaintext.getText(), equalTo(originalMessage));
}
}

View File

@ -0,0 +1,26 @@
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.Plaintext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Created by chrig on 02.02.2016.
*/
public class TestListener implements BitmessageContext.Listener {
private CompletableFuture<Plaintext> future = new CompletableFuture<>();
@Override
public void receive(Plaintext plaintext) {
future.complete(plaintext);
}
public void reset() {
future = new CompletableFuture<>();
}
public Plaintext get(long timeout, TimeUnit unit) throws Exception {
return future.get(timeout, unit);
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.NodeRegistry;
import java.util.LinkedList;
import java.util.List;
/**
* Empty {@link NodeRegistry} that doesn't do anything, but shouldn't break things either.
*/
class TestNodeRegistry implements NodeRegistry {
private List<NetworkAddress> nodes = new LinkedList<>();
public TestNodeRegistry(int... ports) {
for (int port : ports) {
nodes.add(
new NetworkAddress.Builder()
.ipv4(127, 0, 0, 1)
.port(port)
.build()
);
}
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
return nodes;
}
@Override
public void offerAddresses(List<NetworkAddress> addresses) {
// Ignore
}
}