Merge branch 'feature/nio' into develop

This commit is contained in:
Christian Basler 2016-09-21 19:38:48 +02:00
commit 1003e7a582
67 changed files with 2455 additions and 922 deletions

7
.editorconfig Normal file
View File

@ -0,0 +1,7 @@
root = true
[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
indent_size = 4

View File

@ -37,7 +37,11 @@ class GitFlowVersion implements Plugin<Project> {
if (project.ext.isRelease) {
return getTag(project)
} else {
return getBranch(project).replaceAll("/", "-") + "-SNAPSHOT"
def branch = getBranch(project)
if ("develop" == branch) {
return "development-SNAPSHOT"
}
return branch.replaceAll("/", "-") + "-SNAPSHOT"
}
}

View File

@ -25,7 +25,7 @@ artifacts {
dependencies {
compile 'org.slf4j:slf4j-api:1.7.12'
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
testCompile 'org.hamcrest:hamcrest-library:1.3'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile project(':cryptography-bc')

View File

@ -62,16 +62,16 @@ public class BitmessageContext {
private final InternalContext ctx;
private final Labeler labeler;
private final NetworkHandler.MessageListener networkListener;
private final boolean sendPubkeyOnIdentityCreation;
private BitmessageContext(Builder builder) {
if (builder.listener instanceof Listener.WithContext) {
((Listener.WithContext) builder.listener).setContext(this);
}
ctx = new InternalContext(builder);
labeler = builder.labeler;
ctx.getProofOfWorkService().doMissingProofOfWork(30_000); // TODO: this should be configurable
networkListener = new DefaultMessageListener(ctx, labeler, builder.listener);
sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation;
}
@ -89,11 +89,11 @@ public class BitmessageContext {
public BitmessageAddress createIdentity(boolean shorter, Feature... features) {
final BitmessageAddress identity = new BitmessageAddress(new PrivateKey(
shorter,
ctx.getStreams()[0],
NETWORK_NONCE_TRIALS_PER_BYTE,
NETWORK_EXTRA_BYTES,
features
shorter,
ctx.getStreams()[0],
NETWORK_NONCE_TRIALS_PER_BYTE,
NETWORK_EXTRA_BYTES,
features
));
ctx.getAddressRepository().save(identity);
if (sendPubkeyOnIdentityCreation) {
@ -117,9 +117,9 @@ public class BitmessageContext {
}
public List<BitmessageAddress> createDeterministicAddresses(
String passphrase, int numberOfAddresses, long version, long stream, boolean shorter) {
String passphrase, int numberOfAddresses, long version, long stream, boolean shorter) {
List<BitmessageAddress> result = BitmessageAddress.deterministic(
passphrase, numberOfAddresses, version, stream, shorter);
passphrase, numberOfAddresses, version, stream, shorter);
for (int i = 0; i < result.size(); i++) {
BitmessageAddress address = result.get(i);
address.setAlias("deterministic (" + (i + 1) + ")");
@ -130,9 +130,9 @@ public class BitmessageContext {
public void broadcast(final BitmessageAddress from, final String subject, final String message) {
Plaintext msg = new Plaintext.Builder(BROADCAST)
.from(from)
.message(subject, message)
.build();
.from(from)
.message(subject, message)
.build();
send(msg);
}
@ -141,10 +141,10 @@ public class BitmessageContext {
throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key.");
}
Plaintext msg = new Plaintext.Builder(MSG)
.from(from)
.to(to)
.message(subject, message)
.build();
.from(from)
.to(to)
.message(subject, message)
.build();
send(msg);
}
@ -170,17 +170,17 @@ public class BitmessageContext {
ctx.send(msg);
} else {
ctx.send(
msg.getFrom(),
to,
Factory.getBroadcast(msg),
msg.getTTL()
msg.getFrom(),
to,
Factory.getBroadcast(msg),
msg.getTTL()
);
}
}
}
public void startup() {
ctx.getNetworkHandler().start(networkListener);
ctx.getNetworkHandler().start();
}
public void shutdown() {
@ -195,7 +195,7 @@ public class BitmessageContext {
* @param wait waits for the synchronization thread to finish
*/
public void synchronize(InetAddress host, int port, long timeoutInSeconds, boolean wait) {
Future<?> future = ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds);
Future<?> future = ctx.getNetworkHandler().synchronize(host, port, timeoutInSeconds);
if (wait) {
try {
future.get();
@ -271,7 +271,7 @@ public class BitmessageContext {
broadcast.decrypt(address);
// This decrypts it twice, but on the other hand it doesn't try to decrypt the objects with
// other subscriptions and the interface stays as simple as possible.
networkListener.receive(object);
ctx.getNetworkListener().receive(object);
} catch (DecryptionFailedException ignore) {
} catch (Exception e) {
LOG.debug(e.getMessage(), e);
@ -281,8 +281,8 @@ public class BitmessageContext {
public Property status() {
return new Property("status", null,
ctx.getNetworkHandler().getNetworkStatus(),
new Property("unacknowledged", ctx.getMessageRepository().findMessagesToResend().size())
ctx.getNetworkHandler().getNetworkStatus(),
new Property("unacknowledged", ctx.getMessageRepository().findMessagesToResend().size())
);
}
@ -296,6 +296,13 @@ public class BitmessageContext {
public interface Listener {
void receive(Plaintext plaintext);
/**
* A message listener that needs a {@link BitmessageContext}, i.e. for implementing some sort of chat bot.
*/
interface WithContext extends Listener {
void setContext(BitmessageContext ctx);
}
}
public static final class Builder {
@ -429,7 +436,7 @@ public class BitmessageContext {
@Override
public MessagePayload handle(CustomMessage request) {
throw new IllegalStateException(
"Received custom request, but no custom command handler configured.");
"Received custom request, but no custom command handler configured.");
}
};
}

View File

@ -24,7 +24,6 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.exception.DecryptionFailedException;
import ch.dissem.bitmessage.ports.Labeler;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.TTL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,21 +31,24 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
import static ch.dissem.bitmessage.utils.UnixTime.DAY;
import static ch.dissem.bitmessage.entity.Plaintext.Status.PUBKEY_REQUESTED;
class DefaultMessageListener implements NetworkHandler.MessageListener {
class DefaultMessageListener implements NetworkHandler.MessageListener, InternalContext.ContextHolder {
private final static Logger LOG = LoggerFactory.getLogger(DefaultMessageListener.class);
private final InternalContext ctx;
private final Labeler labeler;
private final BitmessageContext.Listener listener;
private InternalContext ctx;
public DefaultMessageListener(InternalContext context, Labeler labeler, BitmessageContext.Listener listener) {
this.ctx = context;
public DefaultMessageListener(Labeler labeler, BitmessageContext.Listener listener) {
this.labeler = labeler;
this.listener = listener;
}
@Override
public void setContext(InternalContext context) {
this.ctx = context;
}
@Override
@SuppressWarnings("ConstantConditions")
public void receive(ObjectMessage object) throws IOException {

View File

@ -56,6 +56,7 @@ public class InternalContext {
private final CustomCommandHandler customCommandHandler;
private final ProofOfWorkService proofOfWorkService;
private final Labeler labeler;
private final NetworkHandler.MessageListener networkListener;
private final TreeSet<Long> streams = new TreeSet<>();
private final int port;
@ -79,6 +80,7 @@ public class InternalContext {
this.connectionLimit = builder.connectionLimit;
this.connectionTTL = builder.connectionTTL;
this.labeler = builder.labeler;
this.networkListener = new DefaultMessageListener(labeler, builder.listener);
Singleton.initialize(cryptography);
@ -94,7 +96,8 @@ public class InternalContext {
}
init(cryptography, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository,
proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler);
proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler,
networkListener);
for (BitmessageAddress identity : addressRepository.getIdentities()) {
streams.add(identity.getStream());
}
@ -148,6 +151,10 @@ public class InternalContext {
return labeler;
}
public NetworkHandler.MessageListener getNetworkListener() {
return networkListener;
}
public long[] getStreams() {
long[] result = new long[streams.size()];
int i = 0;
@ -178,10 +185,10 @@ public class InternalContext {
long expires = UnixTime.now(+timeToLive);
LOG.info("Expires at " + expires);
final ObjectMessage object = new ObjectMessage.Builder()
.stream(recipient.getStream())
.expiresTime(expires)
.payload(payload)
.build();
.stream(recipient.getStream())
.expiresTime(expires)
.payload(payload)
.build();
if (object.isSigned()) {
object.sign(from.getPrivateKey());
}
@ -201,10 +208,10 @@ public class InternalContext {
long expires = UnixTime.now(TTL.pubkey());
LOG.info("Expires at " + expires);
final ObjectMessage response = new ObjectMessage.Builder()
.stream(targetStream)
.expiresTime(expires)
.payload(identity.getPubkey())
.build();
.stream(targetStream)
.expiresTime(expires)
.payload(identity.getPubkey())
.build();
response.sign(identity.getPrivateKey());
response.encrypt(cryptography.createPublicKey(identity.getPublicDecryptionKey()));
// TODO: remember that the pubkey is just about to be sent, and on which stream!
@ -239,10 +246,10 @@ public class InternalContext {
long expires = UnixTime.now(TTL.getpubkey());
LOG.info("Expires at " + expires);
final ObjectMessage request = new ObjectMessage.Builder()
.stream(contact.getStream())
.expiresTime(expires)
.payload(new GetPubkey(contact))
.build();
.stream(contact.getStream())
.expiresTime(expires)
.payload(new GetPubkey(contact))
.build();
proofOfWorkService.doProofOfWork(request);
}

View File

@ -11,6 +11,7 @@ import ch.dissem.bitmessage.ports.ProofOfWorkRepository.Item;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@ -42,7 +43,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
for (byte[] initialHash : items) {
Item item = powRepo.getItem(initialHash);
cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes,
ProofOfWorkService.this);
ProofOfWorkService.this);
}
}
}, delayInMilliseconds);
@ -71,7 +72,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
final ObjectMessage ack = plaintext.getAckMessage();
messageRepo.save(plaintext);
Item item = new Item(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES,
expirationTime, plaintext);
expirationTime, plaintext);
powRepo.putObject(item);
cryptography.doProofOfWork(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, this);
}
@ -89,15 +90,20 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
ctx.getLabeler().markAsSent(plaintext);
messageRepo.save(plaintext);
}
try {
ctx.getNetworkListener().receive(object);
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
ctx.getInventory().storeObject(object);
ctx.getNetworkHandler().offer(object.getInventoryVector());
} else {
item.message.getAckMessage().setNonce(nonce);
final ObjectMessage object = new ObjectMessage.Builder()
.stream(item.message.getStream())
.expiresTime(item.expirationTime)
.payload(new Msg(item.message))
.build();
.stream(item.message.getStream())
.expiresTime(item.expirationTime)
.payload(new Msg(item.message))
.build();
if (object.isSigned()) {
object.sign(item.message.getFrom().getPrivateKey());
}

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -47,10 +48,18 @@ public class Addr implements MessagePayload {
}
@Override
public void write(OutputStream stream) throws IOException {
Encode.varInt(addresses.size(), stream);
public void write(OutputStream out) throws IOException {
Encode.varInt(addresses.size(), out);
for (NetworkAddress address : addresses) {
address.write(stream);
address.write(out);
}
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(addresses.size(), buffer);
for (NetworkAddress address : addresses) {
address.write(buffer);
}
}

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.AccessCounter;
import ch.dissem.bitmessage.utils.Encode;
import java.io.*;
import java.nio.ByteBuffer;
import static ch.dissem.bitmessage.utils.Decode.bytes;
import static ch.dissem.bitmessage.utils.Decode.varString;
@ -85,6 +86,17 @@ public class CustomMessage implements MessagePayload {
}
}
@Override
public void write(ByteBuffer buffer) {
if (data != null) {
Encode.varString(command, buffer);
buffer.put(data);
} else {
throw new ApplicationException("Tried to write custom message without data. " +
"Programmer: did you forget to override #write()?");
}
}
public boolean isError() {
return COMMAND_ERROR.equals(command);
}

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
@ -55,6 +56,14 @@ public class GetData implements MessagePayload {
}
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(inventory.size(), buffer);
for (InventoryVector iv : inventory) {
iv.write(buffer);
}
}
public static final class Builder {
private List<InventoryVector> inventory = new LinkedList<>();

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
@ -53,6 +54,14 @@ public class Inv implements MessagePayload {
}
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(inventory.size(), buffer);
for (InventoryVector iv : inventory) {
iv.write(buffer);
}
}
public static final class Builder {
private List<InventoryVector> inventory = new LinkedList<>();

View File

@ -19,9 +19,9 @@ package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.utils.Encode;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
@ -74,9 +74,7 @@ public class NetworkMessage implements Streamable {
out.write('\0');
}
ByteArrayOutputStream payloadStream = new ByteArrayOutputStream();
payload.write(payloadStream);
byte[] payloadBytes = payloadStream.toByteArray();
byte[] payloadBytes = Encode.bytes(payload);
// Length of payload in number of bytes. Because of other restrictions, there is no reason why this length would
// ever be larger than 1600003 bytes. Some clients include a sanity-check to avoid processing messages which are
@ -93,4 +91,61 @@ public class NetworkMessage implements Streamable {
// message payload
out.write(payloadBytes);
}
/**
* A more efficient implementation of the write method, writing header data to the provided buffer and returning
* a new buffer containing the payload.
*
* @param headerBuffer where the header data is written to (24 bytes)
* @return a buffer containing the payload, ready to be read.
*/
public ByteBuffer writeHeaderAndGetPayloadBuffer(ByteBuffer headerBuffer) {
return ByteBuffer.wrap(writeHeader(headerBuffer));
}
/**
* For improved memory efficiency, you should use {@link #writeHeaderAndGetPayloadBuffer(ByteBuffer)}
* and write the header buffer as well as the returned payload buffer into the channel.
*
* @param buffer where everything gets written to. Needs to be large enough for the whole message
* to be written.
*/
@Override
public void write(ByteBuffer buffer) {
byte[] payloadBytes = writeHeader(buffer);
buffer.put(payloadBytes);
}
private byte[] writeHeader(ByteBuffer out) {
// magic
Encode.int32(MAGIC, out);
// ASCII string identifying the packet content, NULL padded (non-NULL padding results in packet rejected)
String command = payload.getCommand().name().toLowerCase();
try {
out.put(command.getBytes("ASCII"));
} catch (UnsupportedEncodingException e) {
throw new ApplicationException(e);
}
for (int i = command.length(); i < 12; i++) {
out.put((byte) 0);
}
byte[] payloadBytes = Encode.bytes(payload);
// Length of payload in number of bytes. Because of other restrictions, there is no reason why this length would
// ever be larger than 1600003 bytes. Some clients include a sanity-check to avoid processing messages which are
// larger than this.
Encode.int32(payloadBytes.length, out);
// checksum
try {
out.put(getChecksum(payloadBytes));
} catch (GeneralSecurityException e) {
throw new ApplicationException(e);
}
// message payload
return payloadBytes;
}
}

View File

@ -29,6 +29,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
@ -168,6 +169,16 @@ public class ObjectMessage implements MessagePayload {
out.write(getPayloadBytesWithoutNonce());
}
@Override
public void write(ByteBuffer buffer) {
if (nonce == null) {
buffer.put(new byte[8]);
} else {
buffer.put(nonce);
}
buffer.put(getPayloadBytesWithoutNonce());
}
private void writeHeaderWithoutNonce(OutputStream out) throws IOException {
Encode.int64(expiresTime, out);
Encode.int32(objectType, out);

View File

@ -25,6 +25,7 @@ import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.utils.*;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Collections;
@ -197,12 +198,49 @@ public class Plaintext implements Streamable {
}
}
}
public void write(ByteBuffer buffer, boolean includeSignature) {
Encode.varInt(from.getVersion(), buffer);
Encode.varInt(from.getStream(), buffer);
Encode.int32(from.getPubkey().getBehaviorBitfield(), buffer);
buffer.put(from.getPubkey().getSigningKey(), 1, 64);
buffer.put(from.getPubkey().getEncryptionKey(), 1, 64);
if (from.getVersion() >= 3) {
Encode.varInt(from.getPubkey().getNonceTrialsPerByte(), buffer);
Encode.varInt(from.getPubkey().getExtraBytes(), buffer);
}
if (type == Type.MSG) {
buffer.put(to.getRipe());
}
Encode.varInt(encoding, buffer);
Encode.varInt(message.length, buffer);
buffer.put(message);
if (type == Type.MSG) {
if (to.has(Feature.DOES_ACK) && getAckMessage() != null) {
Encode.varBytes(Encode.bytes(getAckMessage()), buffer);
} else {
Encode.varInt(0, buffer);
}
}
if (includeSignature) {
if (signature == null) {
Encode.varInt(0, buffer);
} else {
Encode.varInt(signature.length, buffer);
buffer.put(signature);
}
}
}
@Override
public void write(OutputStream out) throws IOException {
write(out, true);
}
@Override
public void write(ByteBuffer buffer) {
write(buffer, true);
}
public Object getId() {
return id;
}

View File

@ -19,10 +19,13 @@ package ch.dissem.bitmessage.entity;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
/**
* An object that can be written to an {@link OutputStream}
*/
public interface Streamable extends Serializable {
void write(OutputStream stream) throws IOException;
void write(ByteBuffer buffer);
}

View File

@ -18,6 +18,7 @@ package ch.dissem.bitmessage.entity;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* The 'verack' command answers a 'version' command, accepting the other node's version.
@ -34,4 +35,9 @@ public class VerAck implements MessagePayload {
public void write(OutputStream stream) throws IOException {
// 'verack' doesn't have any payload, so there is nothing to write
}
@Override
public void write(ByteBuffer buffer) {
// 'verack' doesn't have any payload, so there is nothing to write
}
}

View File

@ -23,7 +23,7 @@ import ch.dissem.bitmessage.utils.UnixTime;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Random;
import java.nio.ByteBuffer;
/**
* The 'version' command advertises this node's latest supported protocol version upon initiation.
@ -134,6 +134,18 @@ public class Version implements MessagePayload {
Encode.varIntList(streams, stream);
}
@Override
public void write(ByteBuffer buffer) {
Encode.int32(version, buffer);
Encode.int64(services, buffer);
Encode.int64(timestamp, buffer);
addrRecv.write(buffer, true);
addrFrom.write(buffer, true);
Encode.int64(nonce, buffer);
Encode.varString(userAgent, buffer);
Encode.varIntList(streams, buffer);
}
public static final class Builder {
private int version;

View File

@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static ch.dissem.bitmessage.entity.valueobject.PrivateKey.PRIVATE_KEY_SIZE;
@ -144,12 +145,29 @@ public class CryptoBox implements Streamable {
out.write(x, offset, length);
}
private void writeCoordinateComponent(ByteBuffer buffer, byte[] x) {
int offset = Bytes.numberOfLeadingZeros(x);
int length = x.length - offset;
Encode.int16(length, buffer);
buffer.put(x, offset, length);
}
@Override
public void write(OutputStream stream) throws IOException {
writeWithoutMAC(stream);
stream.write(mac);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(initializationVector);
Encode.int16(curveType, buffer);
writeCoordinateComponent(buffer, Points.getX(R));
writeCoordinateComponent(buffer, Points.getY(R));
buffer.put(encrypted);
buffer.put(mac);
}
public static final class Builder {
private byte[] initializationVector;
private int curveType;

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Decode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
@ -62,6 +63,11 @@ public class GenericPayload extends ObjectPayload {
stream.write(data);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(data);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Decode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* Request for a public key.
@ -73,4 +74,9 @@ public class GetPubkey extends ObjectPayload {
public void write(OutputStream stream) throws IOException {
stream.write(ripeTag);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(ripeTag);
}
}

View File

@ -24,6 +24,7 @@ import ch.dissem.bitmessage.exception.DecryptionFailedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
@ -111,6 +112,12 @@ public class Msg extends ObjectPayload implements Encrypted, PlaintextHolder {
encrypted.write(out);
}
@Override
public void write(ByteBuffer buffer) {
if (encrypted == null) throw new IllegalStateException("Msg must be signed and encrypted before writing it.");
encrypted.write(buffer);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -18,6 +18,7 @@ package ch.dissem.bitmessage.entity.payload;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
@ -60,6 +61,10 @@ public abstract class Pubkey extends ObjectPayload {
write(out);
}
public void writeUnencrypted(ByteBuffer buffer){
write(buffer);
}
protected byte[] add0x04(byte[] key) {
if (key.length == 65) return key;
byte[] result = new byte[65];

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* A version 2 public key.
@ -86,10 +87,17 @@ public class V2Pubkey extends Pubkey {
}
@Override
public void write(OutputStream os) throws IOException {
Encode.int32(behaviorBitfield, os);
os.write(publicSigningKey, 1, 64);
os.write(publicEncryptionKey, 1, 64);
public void write(OutputStream out) throws IOException {
Encode.int32(behaviorBitfield, out);
out.write(publicSigningKey, 1, 64);
out.write(publicEncryptionKey, 1, 64);
}
@Override
public void write(ByteBuffer buffer) {
Encode.int32(behaviorBitfield, buffer);
buffer.put(publicSigningKey, 1, 64);
buffer.put(publicEncryptionKey, 1, 64);
}
public static class Builder {

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.entity.Plaintext;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* Users who are subscribed to the sending address will see the message appear in their inbox.
@ -58,4 +59,9 @@ public class V4Broadcast extends Broadcast {
public void write(OutputStream out) throws IOException {
encrypted.write(out);
}
@Override
public void write(ByteBuffer buffer) {
encrypted.write(buffer);
}
}

View File

@ -24,6 +24,7 @@ import ch.dissem.bitmessage.utils.Decode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
@ -85,11 +86,22 @@ public class V4Pubkey extends Pubkey implements Encrypted {
encrypted.write(stream);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(tag);
encrypted.write(buffer);
}
@Override
public void writeUnencrypted(OutputStream out) throws IOException {
decrypted.write(out);
}
@Override
public void writeUnencrypted(ByteBuffer buffer) {
decrypted.write(buffer);
}
@Override
public void writeBytesToSign(OutputStream out) throws IOException {
out.write(tag);

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Strings;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class InventoryVector implements Streamable, Serializable {
@ -56,8 +57,13 @@ public class InventoryVector implements Streamable, Serializable {
}
@Override
public void write(OutputStream stream) throws IOException {
stream.write(hash);
public void write(OutputStream out) throws IOException {
out.write(hash);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(hash);
}
@Override

View File

@ -24,7 +24,10 @@ import ch.dissem.bitmessage.utils.UnixTime;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
@ -38,19 +41,19 @@ public class NetworkAddress implements Streamable {
/**
* Stream number for this node
*/
private long stream;
private final long stream;
/**
* same service(s) listed in version
*/
private long services;
private final long services;
/**
* IPv6 address. IPv4 addresses are written into the message as a 16 byte IPv4-mapped IPv6 address
* (12 bytes 00 00 00 00 00 00 00 00 00 00 FF FF, followed by the 4 bytes of the IPv4 address).
*/
private byte[] ipv6;
private int port;
private final byte[] ipv6;
private final int port;
private NetworkAddress(Builder builder) {
time = builder.time;
@ -119,14 +122,29 @@ public class NetworkAddress implements Streamable {
write(stream, false);
}
public void write(OutputStream stream, boolean light) throws IOException {
public void write(OutputStream out, boolean light) throws IOException {
if (!light) {
Encode.int64(time, stream);
Encode.int32(this.stream, stream);
Encode.int64(time, out);
Encode.int32(stream, out);
}
Encode.int64(services, stream);
stream.write(ipv6);
Encode.int16(port, stream);
Encode.int64(services, out);
out.write(ipv6);
Encode.int16(port, out);
}
@Override
public void write(ByteBuffer buffer) {
write(buffer, false);
}
public void write(ByteBuffer buffer, boolean light) {
if (!light) {
Encode.int64(time, buffer);
Encode.int32(stream, buffer);
}
Encode.int64(services, buffer);
buffer.put(ipv6);
Encode.int16(port, buffer);
}
public static final class Builder {
@ -199,6 +217,17 @@ public class NetworkAddress implements Streamable {
return this;
}
public Builder address(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ip(inetAddress.getAddress());
port(inetAddress.getPort());
} else {
throw new IllegalArgumentException("Unknown type of address: " + address.getClass());
}
return this;
}
public NetworkAddress build() {
if (time == 0) {
time = UnixTime.now();

View File

@ -27,6 +27,7 @@ import ch.dissem.bitmessage.utils.Decode;
import ch.dissem.bitmessage.utils.Encode;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -113,24 +114,20 @@ public class PrivateKey implements Streamable {
}
Builder generate() {
try {
long signingKeyNonce = nextNonce;
long encryptionKeyNonce = nextNonce + 1;
byte[] ripe;
do {
privEK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(encryptionKeyNonce)), 32);
privSK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(signingKeyNonce)), 32);
pubSK = cryptography().createPublicKey(privSK);
pubEK = cryptography().createPublicKey(privEK);
ripe = cryptography().ripemd160(cryptography().sha512(pubSK, pubEK));
long signingKeyNonce = nextNonce;
long encryptionKeyNonce = nextNonce + 1;
byte[] ripe;
do {
privEK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(encryptionKeyNonce)), 32);
privSK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(signingKeyNonce)), 32);
pubSK = cryptography().createPublicKey(privSK);
pubEK = cryptography().createPublicKey(privEK);
ripe = cryptography().ripemd160(cryptography().sha512(pubSK, pubEK));
signingKeyNonce += 2;
encryptionKeyNonce += 2;
} while (ripe[0] != 0 || (shorter && ripe[1] != 0));
nextNonce = signingKeyNonce;
} catch (IOException e) {
throw new ApplicationException(e);
}
signingKeyNonce += 2;
encryptionKeyNonce += 2;
} while (ripe[0] != 0 || (shorter && ripe[1] != 0));
nextNonce = signingKeyNonce;
return this;
}
}
@ -182,4 +179,20 @@ public class PrivateKey implements Streamable {
Encode.varInt(privateEncryptionKey.length, out);
out.write(privateEncryptionKey);
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(pubkey.getVersion(), buffer);
Encode.varInt(pubkey.getStream(), buffer);
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
pubkey.writeUnencrypted(baos);
Encode.varBytes(baos.toByteArray(), buffer);
} catch (IOException e) {
throw new ApplicationException(e);
}
Encode.varBytes(privateSigningKey, buffer);
Encode.varBytes(privateEncryptionKey, buffer);
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Stack;
import java.util.TreeMap;
import static ch.dissem.bitmessage.ports.NetworkHandler.HEADER_SIZE;
import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE;
/**
* A pool for {@link ByteBuffer}s. As they may use up a lot of memory,
* they should be reused as efficiently as possible.
*/
class BufferPool {
private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
public static final BufferPool bufferPool = new BufferPool();
private final Map<Integer, Stack<ByteBuffer>> pools = new TreeMap<>();
private BufferPool() {
pools.put(HEADER_SIZE, new Stack<ByteBuffer>());
pools.put(54, new Stack<ByteBuffer>());
pools.put(1000, new Stack<ByteBuffer>());
pools.put(60000, new Stack<ByteBuffer>());
pools.put(MAX_PAYLOAD_SIZE, new Stack<ByteBuffer>());
}
public synchronized ByteBuffer allocate(int capacity) {
Integer targetSize = getTargetSize(capacity);
Stack<ByteBuffer> pool = pools.get(targetSize);
if (pool.isEmpty()) {
LOG.trace("Creating new buffer of size " + targetSize);
return ByteBuffer.allocate(targetSize);
} else {
return pool.pop();
}
}
/**
* Returns a buffer that has the size of the Bitmessage network message header, 24 bytes.
*
* @return a buffer of size 24
*/
public synchronized ByteBuffer allocateHeaderBuffer() {
Stack<ByteBuffer> pool = pools.get(HEADER_SIZE);
if (pool.isEmpty()) {
return ByteBuffer.allocate(HEADER_SIZE);
} else {
return pool.pop();
}
}
public synchronized void deallocate(ByteBuffer buffer) {
buffer.clear();
Stack<ByteBuffer> pool = pools.get(buffer.capacity());
if (pool == null) {
throw new IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() +
" one of " + pools.keySet() + " expected.");
} else {
pool.push(buffer);
}
}
private Integer getTargetSize(int capacity) {
for (Integer size : pools.keySet()) {
if (size >= capacity) return size;
}
throw new IllegalArgumentException("Requested capacity too large: " +
"requested=" + capacity + "; max=" + MAX_PAYLOAD_SIZE);
}
}

View File

@ -40,7 +40,7 @@ import static ch.dissem.bitmessage.utils.Singleton.cryptography;
* Creates {@link NetworkMessage} objects from {@link InputStream InputStreams}
*/
public class Factory {
public static final Logger LOG = LoggerFactory.getLogger(Factory.class);
private static final Logger LOG = LoggerFactory.getLogger(Factory.class);
public static NetworkMessage getNetworkMessage(int version, InputStream stream) throws SocketTimeoutException {
try {

View File

@ -62,7 +62,7 @@ class V3MessageFactory {
}
}
private static MessagePayload getPayload(String command, InputStream stream, int length) throws IOException {
static MessagePayload getPayload(String command, InputStream stream, int length) throws IOException {
switch (command) {
case "version":
return parseVersion(stream);
@ -107,12 +107,12 @@ class V3MessageFactory {
}
return new ObjectMessage.Builder()
.nonce(nonce)
.expiresTime(expiresTime)
.objectType(objectType)
.stream(stream)
.payload(payload)
.build();
.nonce(nonce)
.expiresTime(expiresTime)
.objectType(objectType)
.stream(stream)
.payload(payload)
.build();
}
private static GetData parseGetData(InputStream stream) throws IOException {
@ -153,13 +153,13 @@ class V3MessageFactory {
long[] streamNumbers = Decode.varIntList(stream);
return new Version.Builder()
.version(version)
.services(services)
.timestamp(timestamp)
.addrRecv(addrRecv).addrFrom(addrFrom)
.nonce(nonce)
.userAgent(userAgent)
.streams(streamNumbers).build();
.version(version)
.services(services)
.timestamp(timestamp)
.addrRecv(addrRecv).addrFrom(addrFrom)
.nonce(nonce)
.userAgent(userAgent)
.streams(streamNumbers).build();
}
private static InventoryVector parseInventoryVector(InputStream stream) throws IOException {
@ -179,7 +179,13 @@ class V3MessageFactory {
long services = Decode.int64(stream);
byte[] ipv6 = Decode.bytes(stream, 16);
int port = Decode.uint16(stream);
return new NetworkAddress.Builder().time(time).stream(streamNumber).services(services).ipv6(ipv6).port(port).build();
return new NetworkAddress.Builder()
.time(time)
.stream(streamNumber)
.services(services)
.ipv6(ipv6)
.port(port)
.build();
}
private static boolean testChecksum(byte[] checksum, byte[] payload) {

View File

@ -0,0 +1,189 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.factory;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.utils.Decode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES;
import static ch.dissem.bitmessage.factory.BufferPool.bufferPool;
import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
/**
* Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message.
*/
public class V3MessageReader {
private ByteBuffer headerBuffer;
private ByteBuffer dataBuffer;
private ReaderState state = ReaderState.MAGIC;
private String command;
private int length;
private byte[] checksum;
private List<NetworkMessage> messages = new LinkedList<>();
public ByteBuffer getActiveBuffer() {
if (state != null && state != ReaderState.DATA) {
if (headerBuffer == null) {
headerBuffer = bufferPool.allocateHeaderBuffer();
}
}
return state == ReaderState.DATA ? dataBuffer : headerBuffer;
}
public void update() {
if (state != ReaderState.DATA) {
getActiveBuffer();
headerBuffer.flip();
}
switch (state) {
case MAGIC:
if (!findMagicBytes(headerBuffer)) {
headerBuffer.compact();
return;
}
state = ReaderState.HEADER;
case HEADER:
if (headerBuffer.remaining() < 20) {
headerBuffer.compact();
headerBuffer.limit(20);
return;
}
command = getCommand(headerBuffer);
length = (int) Decode.uint32(headerBuffer);
if (length > MAX_PAYLOAD_SIZE) {
throw new NodeException("Payload of " + length + " bytes received, no more than " +
MAX_PAYLOAD_SIZE + " was expected.");
}
checksum = new byte[4];
headerBuffer.get(checksum);
state = ReaderState.DATA;
bufferPool.deallocate(headerBuffer);
headerBuffer = null;
dataBuffer = bufferPool.allocate(length);
dataBuffer.clear();
dataBuffer.limit(length);
case DATA:
if (dataBuffer.position() < length) {
return;
} else {
dataBuffer.flip();
}
if (!testChecksum(dataBuffer)) {
state = ReaderState.MAGIC;
throw new NodeException("Checksum failed for message '" + command + "'");
}
try {
MessagePayload payload = V3MessageFactory.getPayload(
command,
new ByteArrayInputStream(dataBuffer.array(),
dataBuffer.arrayOffset() + dataBuffer.position(), length),
length);
if (payload != null) {
messages.add(new NetworkMessage(payload));
}
} catch (IOException e) {
throw new NodeException(e.getMessage());
} finally {
state = ReaderState.MAGIC;
bufferPool.deallocate(dataBuffer);
dataBuffer = null;
dataBuffer = null;
}
}
}
public List<NetworkMessage> getMessages() {
return messages;
}
private boolean findMagicBytes(ByteBuffer buffer) {
int i = 0;
while (buffer.hasRemaining()) {
if (i == 0) {
buffer.mark();
}
if (buffer.get() == MAGIC_BYTES[i]) {
i++;
if (i == MAGIC_BYTES.length) {
return true;
}
} else {
i = 0;
}
}
if (i > 0) {
buffer.reset();
}
return false;
}
private static String getCommand(ByteBuffer buffer) {
int start = buffer.position();
int l = 0;
while (l < 12 && buffer.get() != 0) l++;
int i = l + 1;
while (i < 12) {
if (buffer.get() != 0) throw new NodeException("'\\0' padding expected for command");
i++;
}
try {
return new String(buffer.array(), start, l, "ASCII");
} catch (UnsupportedEncodingException e) {
throw new ApplicationException(e);
}
}
private boolean testChecksum(ByteBuffer buffer) {
byte[] payloadChecksum = cryptography().sha512(buffer.array(),
buffer.arrayOffset() + buffer.position(), length);
for (int i = 0; i < checksum.length; i++) {
if (checksum[i] != payloadChecksum[i]) {
return false;
}
}
return true;
}
/**
* De-allocates all buffers. This method should be called iff the reader isn't used anymore, i.e. when its
* connection is severed.
*/
public void cleanup() {
state = null;
if (headerBuffer != null) {
bufferPool.deallocate(headerBuffer);
}
if (dataBuffer != null) {
bufferPool.deallocate(dataBuffer);
}
}
private enum ReaderState {MAGIC, HEADER, DATA}
}

View File

@ -43,7 +43,7 @@ import static ch.dissem.bitmessage.utils.Numbers.max;
* Implements everything that isn't directly dependent on either Spongy- or Bouncycastle.
*/
public abstract class AbstractCryptography implements Cryptography, InternalContext.ContextHolder {
public static final Logger LOG = LoggerFactory.getLogger(Cryptography.class);
protected static final Logger LOG = LoggerFactory.getLogger(Cryptography.class);
private static final SecureRandom RANDOM = new SecureRandom();
private static final BigInteger TWO = BigInteger.valueOf(2);
private static final BigInteger TWO_POW_64 = TWO.pow(64);
@ -61,6 +61,12 @@ public abstract class AbstractCryptography implements Cryptography, InternalCont
this.context = context;
}
public byte[] sha512(byte[] data, int offset, int length) {
MessageDigest mda = md("SHA-512");
mda.update(data, offset, length);
return mda.digest();
}
public byte[] sha512(byte[]... data) {
return hash("SHA-512", data);
}

View File

@ -30,6 +30,18 @@ import java.security.SecureRandom;
* which should be secure enough.
*/
public interface Cryptography {
/**
* A helper method to calculate SHA-512 hashes. Please note that a new {@link MessageDigest} object is created at
* each call (to ensure thread safety), so you shouldn't use this if you need to do many hash calculations in
* success on the same thread.
*
* @param data to get hashed
* @param offset of the data to be hashed
* @param length of the data to be hashed
* @return SHA-512 hash of data within the given range
*/
byte[] sha512(byte[] data, int offset, int length);
/**
* A helper method to calculate SHA-512 hashes. Please note that a new {@link MessageDigest} object is created at
* each call (to ensure thread safety), so you shouldn't use this if you need to do many hash calculations in

View File

@ -1,125 +0,0 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static ch.dissem.bitmessage.utils.Collections.selectRandom;
import static ch.dissem.bitmessage.utils.UnixTime.HOUR;
import static java.util.Collections.newSetFromMap;
public class MemoryNodeRegistry implements NodeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MemoryNodeRegistry.class);
private final Map<Long, Set<NetworkAddress>> stableNodes = new ConcurrentHashMap<>();
private final Map<Long, Set<NetworkAddress>> knownNodes = new ConcurrentHashMap<>();
private void loadStableNodes() {
try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) {
Scanner scanner = new Scanner(in);
long stream = 0;
Set<NetworkAddress> streamSet = null;
while (scanner.hasNext()) {
try {
String line = scanner.nextLine().trim();
if (line.startsWith("[stream")) {
stream = Long.parseLong(line.substring(8, line.lastIndexOf(']')));
streamSet = new HashSet<>();
stableNodes.put(stream, streamSet);
} else if (streamSet != null && !line.isEmpty() && !line.startsWith("#")) {
int portIndex = line.lastIndexOf(':');
InetAddress[] inetAddresses = InetAddress.getAllByName(line.substring(0, portIndex));
int port = Integer.valueOf(line.substring(portIndex + 1));
for (InetAddress inetAddress : inetAddresses) {
streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build());
}
}
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}
if (LOG.isDebugEnabled()) {
for (Map.Entry<Long, Set<NetworkAddress>> e : stableNodes.entrySet()) {
LOG.debug("Stream " + e.getKey() + ": loaded " + e.getValue().size() + " bootstrap nodes.");
}
}
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
List<NetworkAddress> result = new LinkedList<>();
for (long stream : streams) {
Set<NetworkAddress> known = knownNodes.get(stream);
if (known != null && !known.isEmpty()) {
for (NetworkAddress node : known) {
if (node.getTime() > UnixTime.now(-3 * HOUR)) {
result.add(node);
} else {
known.remove(node);
}
}
}
if (result.isEmpty()) {
if (stableNodes.isEmpty()) {
loadStableNodes();
}
Set<NetworkAddress> nodes = stableNodes.get(stream);
if (nodes != null && !nodes.isEmpty()) {
// To reduce load on stable nodes, only return one
result.add(selectRandom(nodes));
}
}
}
return selectRandom(limit, result);
}
@Override
public void offerAddresses(List<NetworkAddress> addresses) {
for (NetworkAddress node : addresses) {
if (node.getTime() <= UnixTime.now()) {
if (!knownNodes.containsKey(node.getStream())) {
synchronized (knownNodes) {
if (!knownNodes.containsKey(node.getStream())) {
knownNodes.put(
node.getStream(),
newSetFromMap(new ConcurrentHashMap<NetworkAddress, Boolean>())
);
}
}
}
if (node.getTime() <= UnixTime.now()) {
// TODO: This isn't quite correct
// If the node is already known, the one with the more recent time should be used
knownNodes.get(node.getStream()).add(node);
}
}
}
}
}

View File

@ -23,19 +23,25 @@ import ch.dissem.bitmessage.utils.Property;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.concurrent.Future;
/**
* Handles incoming messages
*/
public interface NetworkHandler {
int NETWORK_MAGIC_NUMBER = 8;
int HEADER_SIZE = 24;
int MAX_PAYLOAD_SIZE = 1600003;
int MAX_MESSAGE_SIZE = HEADER_SIZE + MAX_PAYLOAD_SIZE;
/**
* Connects to the trusted host, fetches and offers new messages and disconnects afterwards.
* <p>
* An implementation should disconnect if either the timeout is reached or the returned thread is interrupted.
* </p>
*/
Future<?> synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds);
Future<?> synchronize(InetAddress server, int port, long timeoutInSeconds);
/**
* Send a custom message to a specific node (that should implement handling for this message type) and returns
@ -51,7 +57,7 @@ public interface NetworkHandler {
/**
* Start a full network node, accepting incoming connections and relaying objects.
*/
void start(MessageListener listener);
void start();
/**
* Stop the full network node.
@ -63,6 +69,13 @@ public interface NetworkHandler {
*/
void offer(InventoryVector iv);
/**
* Request each of those objects from a node that knows of the requested object.
*
* @param inventoryVectors of the objects to be requested
*/
void request(Collection<InventoryVector> inventoryVectors);
Property getNetworkStatus();
boolean isRunning();

View File

@ -0,0 +1,54 @@
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.ApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.*;
/**
* Helper class to kick start node registries.
*/
public class NodeRegistryHelper {
private static final Logger LOG = LoggerFactory.getLogger(NodeRegistryHelper.class);
public static Map<Long, Set<NetworkAddress>> loadStableNodes() {
try (InputStream in = NodeRegistryHelper.class.getClassLoader().getResourceAsStream("nodes.txt")) {
Scanner scanner = new Scanner(in);
long stream = 0;
Map<Long, Set<NetworkAddress>> result = new HashMap<>();
Set<NetworkAddress> streamSet = null;
while (scanner.hasNext()) {
try {
String line = scanner.nextLine().trim();
if (line.startsWith("[stream")) {
stream = Long.parseLong(line.substring(8, line.lastIndexOf(']')));
streamSet = new HashSet<>();
result.put(stream, streamSet);
} else if (streamSet != null && !line.isEmpty() && !line.startsWith("#")) {
int portIndex = line.lastIndexOf(':');
InetAddress[] inetAddresses = InetAddress.getAllByName(line.substring(0, portIndex));
int port = Integer.valueOf(line.substring(portIndex + 1));
for (InetAddress inetAddress : inetAddresses) {
streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build());
}
}
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}
if (LOG.isDebugEnabled()) {
for (Map.Entry<Long, Set<NetworkAddress>> e : result.entrySet()) {
LOG.debug("Stream " + e.getKey() + ": loaded " + e.getValue().size() + " bootstrap nodes.");
}
}
return result;
} catch (IOException e) {
throw new ApplicationException(e);
}
}
}

View File

@ -23,6 +23,8 @@ package ch.dissem.bitmessage.utils;
* situations.
*/
public class Bytes {
public static final byte BYTE_0x80 = (byte) 0x80;
public static void inc(byte[] nonce) {
for (int i = nonce.length - 1; i >= 0; i--) {
nonce[i]++;
@ -82,11 +84,7 @@ public class Bytes {
}
private static boolean lt(byte a, byte b) {
if (a < 0) return b < 0 && a < b;
if (b < 0) return a >= 0 || a < b;
return a < b;
// This would be easier to understand, but is (slightly) slower:
// return (a & 0xff) < (b & 0xff);
return (a ^ BYTE_0x80) < (b ^ BYTE_0x80);
}
/**

View File

@ -27,30 +27,29 @@ import static ch.dissem.bitmessage.utils.AccessCounter.inc;
* https://bitmessage.org/wiki/Protocol_specification#Common_structures
*/
public class Decode {
public static byte[] shortVarBytes(InputStream stream, AccessCounter counter) throws IOException {
int length = uint16(stream, counter);
return bytes(stream, length, counter);
public static byte[] shortVarBytes(InputStream in, AccessCounter counter) throws IOException {
int length = uint16(in, counter);
return bytes(in, length, counter);
}
public static byte[] varBytes(InputStream stream) throws IOException {
int length = (int) varInt(stream, null);
return bytes(stream, length, null);
public static byte[] varBytes(InputStream in) throws IOException {
return varBytes(in, null);
}
public static byte[] varBytes(InputStream stream, AccessCounter counter) throws IOException {
int length = (int) varInt(stream, counter);
return bytes(stream, length, counter);
public static byte[] varBytes(InputStream in, AccessCounter counter) throws IOException {
int length = (int) varInt(in, counter);
return bytes(in, length, counter);
}
public static byte[] bytes(InputStream stream, int count) throws IOException {
return bytes(stream, count, null);
public static byte[] bytes(InputStream in, int count) throws IOException {
return bytes(in, count, null);
}
public static byte[] bytes(InputStream stream, int count, AccessCounter counter) throws IOException {
public static byte[] bytes(InputStream in, int count, AccessCounter counter) throws IOException {
byte[] result = new byte[count];
int off = 0;
while (off < count) {
int read = stream.read(result, off, count - off);
int read = in.read(result, off, count - off);
if (read < 0) {
throw new IOException("Unexpected end of stream, wanted to read " + count + " bytes but only got " + off);
}
@ -60,83 +59,94 @@ public class Decode {
return result;
}
public static long[] varIntList(InputStream stream) throws IOException {
int length = (int) varInt(stream);
public static long[] varIntList(InputStream in) throws IOException {
int length = (int) varInt(in);
long[] result = new long[length];
for (int i = 0; i < length; i++) {
result[i] = varInt(stream);
result[i] = varInt(in);
}
return result;
}
public static long varInt(InputStream stream) throws IOException {
return varInt(stream, null);
public static long varInt(InputStream in) throws IOException {
return varInt(in, null);
}
public static long varInt(InputStream stream, AccessCounter counter) throws IOException {
int first = stream.read();
public static long varInt(InputStream in, AccessCounter counter) throws IOException {
int first = in.read();
inc(counter);
switch (first) {
case 0xfd:
return uint16(stream, counter);
return uint16(in, counter);
case 0xfe:
return uint32(stream, counter);
return uint32(in, counter);
case 0xff:
return int64(stream, counter);
return int64(in, counter);
default:
return first;
}
}
public static int uint8(InputStream stream) throws IOException {
return stream.read();
public static int uint8(InputStream in) throws IOException {
return in.read();
}
public static int uint16(InputStream stream) throws IOException {
return uint16(stream, null);
public static int uint16(InputStream in) throws IOException {
return uint16(in, null);
}
public static int uint16(InputStream stream, AccessCounter counter) throws IOException {
public static int uint16(InputStream in, AccessCounter counter) throws IOException {
inc(counter, 2);
return stream.read() * 256 + stream.read();
return in.read() << 8 | in.read();
}
public static long uint32(InputStream stream) throws IOException {
return uint32(stream, null);
public static long uint32(InputStream in) throws IOException {
return uint32(in, null);
}
public static long uint32(InputStream stream, AccessCounter counter) throws IOException {
public static long uint32(InputStream in, AccessCounter counter) throws IOException {
inc(counter, 4);
return stream.read() * 16777216L + stream.read() * 65536L + stream.read() * 256L + stream.read();
return in.read() << 24 | in.read() << 16 | in.read() << 8 | in.read();
}
public static int int32(InputStream stream) throws IOException {
return int32(stream, null);
public static long uint32(ByteBuffer in) {
return u(in.get()) << 24 | u(in.get()) << 16 | u(in.get()) << 8 | u(in.get());
}
public static int int32(InputStream stream, AccessCounter counter) throws IOException {
public static int int32(InputStream in) throws IOException {
return int32(in, null);
}
public static int int32(InputStream in, AccessCounter counter) throws IOException {
inc(counter, 4);
return ByteBuffer.wrap(bytes(stream, 4)).getInt();
return ByteBuffer.wrap(bytes(in, 4)).getInt();
}
public static long int64(InputStream stream) throws IOException {
return int64(stream, null);
public static long int64(InputStream in) throws IOException {
return int64(in, null);
}
public static long int64(InputStream stream, AccessCounter counter) throws IOException {
public static long int64(InputStream in, AccessCounter counter) throws IOException {
inc(counter, 8);
return ByteBuffer.wrap(bytes(stream, 8)).getLong();
return ByteBuffer.wrap(bytes(in, 8)).getLong();
}
public static String varString(InputStream stream) throws IOException {
return varString(stream, null);
public static String varString(InputStream in) throws IOException {
return varString(in, null);
}
public static String varString(InputStream stream, AccessCounter counter) throws IOException {
int length = (int) varInt(stream, counter);
public static String varString(InputStream in, AccessCounter counter) throws IOException {
int length = (int) varInt(in, counter);
// FIXME: technically, it says the length in characters, but I think this one might be correct
// otherwise it will get complicated, as we'll need to read UTF-8 char by char...
return new String(bytes(stream, length, counter), "utf-8");
return new String(bytes(in, length, counter), "utf-8");
}
/**
* Returns the given byte as if it were unsigned.
*/
private static int u(byte b) {
return b & 0xFF;
}
}

View File

@ -17,10 +17,13 @@
package ch.dissem.bitmessage.utils;
import ch.dissem.bitmessage.entity.Streamable;
import ch.dissem.bitmessage.exception.ApplicationException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import static ch.dissem.bitmessage.utils.AccessCounter.inc;
@ -37,62 +40,52 @@ public class Encode {
}
}
public static void varIntList(long[] values, ByteBuffer buffer) {
varInt(values.length, buffer);
for (long value : values) {
varInt(value, buffer);
}
}
public static void varInt(long value, OutputStream stream) throws IOException {
varInt(value, stream, null);
}
public static byte[] varInt(long value) throws IOException {
final byte[] result;
public static void varInt(long value, ByteBuffer buffer) {
if (value < 0) {
// This is due to the fact that Java doesn't really support unsigned values.
// Please be aware that this might be an error due to a smaller negative value being cast to long.
// Normally, negative values shouldn't occur within the protocol, and I large enough longs
// to being recognized as negatives aren't realistic.
ByteBuffer buffer = ByteBuffer.allocate(9);
// Normally, negative values shouldn't occur within the protocol, and longs large enough for being
// recognized as negatives aren't realistic.
buffer.put((byte) 0xff);
result = buffer.putLong(value).array();
buffer.putLong(value);
} else if (value < 0xfd) {
result = new byte[]{(byte) value};
buffer.put((byte) value);
} else if (value <= 0xffffL) {
ByteBuffer buffer = ByteBuffer.allocate(3);
buffer.put((byte) 0xfd);
result = buffer.putShort((short) value).array();
buffer.putShort((short) value);
} else if (value <= 0xffffffffL) {
ByteBuffer buffer = ByteBuffer.allocate(5);
buffer.put((byte) 0xfe);
result = buffer.putInt((int) value).array();
buffer.putInt((int) value);
} else {
ByteBuffer buffer = ByteBuffer.allocate(9);
buffer.put((byte) 0xff);
result = buffer.putLong(value).array();
buffer.putLong(value);
}
return result;
}
public static byte[] varInt(long value) {
ByteBuffer buffer = ByteBuffer.allocate(9);
varInt(value, buffer);
buffer.flip();
return Bytes.truncate(buffer.array(), buffer.limit());
}
public static void varInt(long value, OutputStream stream, AccessCounter counter) throws IOException {
if (value < 0) {
// This is due to the fact that Java doesn't really support unsigned values.
// Please be aware that this might be an error due to a smaller negative value being cast to long.
// Normally, negative values shouldn't occur within the protocol, and I large enough longs
// to being recognized as negatives aren't realistic.
stream.write(0xff);
inc(counter);
int64(value, stream, counter);
} else if (value < 0xfd) {
int8(value, stream, counter);
} else if (value <= 0xffffL) {
stream.write(0xfd);
inc(counter);
int16(value, stream, counter);
} else if (value <= 0xffffffffL) {
stream.write(0xfe);
inc(counter);
int32(value, stream, counter);
} else {
stream.write(0xff);
inc(counter);
int64(value, stream, counter);
}
ByteBuffer buffer = ByteBuffer.allocate(9);
varInt(value, buffer);
buffer.flip();
stream.write(buffer.array(), 0, buffer.limit());
inc(counter, buffer.limit());
}
public static void int8(long value, OutputStream stream) throws IOException {
@ -113,6 +106,10 @@ public class Encode {
inc(counter, 2);
}
public static void int16(long value, ByteBuffer buffer) {
buffer.putShort((short) value);
}
public static void int32(long value, OutputStream stream) throws IOException {
int32(value, stream, null);
}
@ -122,6 +119,10 @@ public class Encode {
inc(counter, 4);
}
public static void int32(long value, ByteBuffer buffer) {
buffer.putInt((int) value);
}
public static void int64(long value, OutputStream stream) throws IOException {
int64(value, stream, null);
}
@ -131,6 +132,10 @@ public class Encode {
inc(counter, 8);
}
public static void int64(long value, ByteBuffer buffer) {
buffer.putLong(value);
}
public static void varString(String value, OutputStream out) throws IOException {
byte[] bytes = value.getBytes("utf-8");
// Technically, it says the length in characters, but I think this one might be correct.
@ -140,23 +145,44 @@ public class Encode {
out.write(bytes);
}
public static void varString(String value, ByteBuffer buffer) {
try {
byte[] bytes = value.getBytes("utf-8");
// Technically, it says the length in characters, but I think this one might be correct.
// It doesn't really matter, as only ASCII characters are being used.
// see also Decode#varString()
buffer.put(varInt(bytes.length));
buffer.put(bytes);
} catch (UnsupportedEncodingException e) {
throw new ApplicationException(e);
}
}
public static void varBytes(byte[] data, OutputStream out) throws IOException {
varInt(data.length, out);
out.write(data);
}
public static void varBytes(byte[] data, ByteBuffer buffer) {
varInt(data.length, buffer);
buffer.put(data);
}
/**
* Serializes a {@link Streamable} object and returns the byte array.
*
* @param streamable the object to be serialized
* @return an array of bytes representing the given streamable object.
* @throws IOException if an I/O error occurs.
*/
public static byte[] bytes(Streamable streamable) throws IOException {
public static byte[] bytes(Streamable streamable) {
if (streamable == null) return null;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
streamable.write(stream);
try {
streamable.write(stream);
} catch (IOException e) {
throw new ApplicationException(e);
}
return stream.toByteArray();
}
@ -164,11 +190,14 @@ public class Encode {
* @param streamable the object to be serialized
* @param padding the result will be padded such that its length is a multiple of <em>padding</em>
* @return the bytes of the given {@link Streamable} object, 0-padded such that the final length is x*padding.
* @throws IOException if an I/O error occurs.
*/
public static byte[] bytes(Streamable streamable, int padding) throws IOException {
public static byte[] bytes(Streamable streamable, int padding) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
streamable.write(stream);
try {
streamable.write(stream);
} catch (IOException e) {
throw new ApplicationException(e);
}
int offset = padding - stream.size() % padding;
int length = stream.size() + offset;
byte[] result = new byte[length];

View File

@ -5,4 +5,4 @@ bootstrap8080.bitmessage.org:8080
bootstrap8444.bitmessage.org:8444
[stream 2]
# none yet
# none yet

View File

@ -68,7 +68,9 @@ public class DefaultMessageListenerTest extends TestBase {
when(ctx.getNetworkHandler()).thenReturn(networkHandler);
when(ctx.getLabeler()).thenReturn(mock(Labeler.class));
listener = new DefaultMessageListener(ctx, mock(Labeler.class), mock(BitmessageContext.Listener.class));
listener = new DefaultMessageListener(mock(Labeler.class), mock(BitmessageContext.Listener.class));
when(ctx.getNetworkListener()).thenReturn(listener);
listener.setContext(ctx);
}
@Test

View File

@ -66,6 +66,7 @@ public class ProofOfWorkServiceTest {
when(ctx.getNetworkHandler()).thenReturn(networkHandler);
when(ctx.getMessageRepository()).thenReturn(messageRepo);
when(ctx.getLabeler()).thenReturn(mock(Labeler.class));
when(ctx.getNetworkListener()).thenReturn(mock(NetworkHandler.MessageListener.class));
proofOfWorkService = new ProofOfWorkService();
proofOfWorkService.setContext(ctx);
@ -80,7 +81,7 @@ public class ProofOfWorkServiceTest {
proofOfWorkService.doMissingProofOfWork(10);
verify(cryptography, timeout(1000)).doProofOfWork((ObjectMessage) isNull(), eq(1001L), eq(1002L),
any(ProofOfWorkEngine.Callback.class));
any(ProofOfWorkEngine.Callback.class));
}
@Test
@ -89,8 +90,8 @@ public class ProofOfWorkServiceTest {
BitmessageAddress address = TestUtils.loadContact();
Plaintext plaintext = new Plaintext.Builder(MSG).from(identity).to(address).message("", "").build();
ObjectMessage object = new ObjectMessage.Builder()
.payload(new Msg(plaintext))
.build();
.payload(new Msg(plaintext))
.build();
object.sign(identity.getPrivateKey());
object.encrypt(address.getPubkey());
byte[] initialHash = new byte[64];

View File

@ -1,99 +0,0 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.utils.UnixTime;
import org.junit.Test;
import java.util.Arrays;
import static ch.dissem.bitmessage.utils.UnixTime.HOUR;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
public class NodeRegistryTest {
private NodeRegistry registry = new MemoryNodeRegistry();
@Test
public void ensureGetKnownNodesWithoutStreamsYieldsEmpty() {
assertThat(registry.getKnownAddresses(10), empty());
}
/**
* Please note that this test fails if there is no internet connection,
* as the initial nodes' IP addresses are determined by DNS lookup.
*/
@Test
public void ensureGetKnownNodesForStream1YieldsResult() {
assertThat(registry.getKnownAddresses(10, 1), hasSize(1));
}
@Test
public void ensureNodeIsStored() {
registry.offerAddresses(Arrays.asList(
new NetworkAddress.Builder()
.ipv4(127, 0, 0, 1)
.port(42)
.stream(1)
.time(UnixTime.now())
.build(),
new NetworkAddress.Builder()
.ipv4(127, 0, 0, 2)
.port(42)
.stream(1)
.time(UnixTime.now())
.build(),
new NetworkAddress.Builder()
.ipv4(127, 0, 0, 2)
.port(42)
.stream(2)
.time(UnixTime.now())
.build()
));
assertThat(registry.getKnownAddresses(10, 1).size(), is(2));
assertThat(registry.getKnownAddresses(10, 2).size(), is(1));
assertThat(registry.getKnownAddresses(10, 1, 2).size(), is(3));
}
@Test
public void ensureOldNodesAreRemoved() {
registry.offerAddresses(Arrays.asList(
new NetworkAddress.Builder()
.ipv4(127, 0, 0, 1)
.port(42)
.stream(1)
.time(UnixTime.now())
.build(),
new NetworkAddress.Builder()
.ipv4(127, 0, 0, 2)
.port(42)
.stream(1)
.time(UnixTime.now(-4 * HOUR))
.build(),
new NetworkAddress.Builder()
.ipv4(127, 0, 0, 2)
.port(42)
.stream(2)
.time(UnixTime.now())
.build()
));
assertThat(registry.getKnownAddresses(10, 1).size(), is(1));
assertThat(registry.getKnownAddresses(10, 2).size(), is(1));
assertThat(registry.getKnownAddresses(10, 1, 2).size(), is(2));
}
}

View File

@ -13,6 +13,6 @@ uploadArchives {
dependencies {
compile project(':core')
compile 'org.bouncycastle:bcprov-jdk15on:1.52'
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'
}

View File

@ -13,5 +13,5 @@ uploadArchives {
dependencies {
compile project(':core')
compile 'com.madgag.spongycastle:prov:1.52.0.0'
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
}

View File

@ -30,8 +30,8 @@ dependencies {
compile project(':wif')
compile 'org.slf4j:slf4j-simple:1.7.12'
compile 'args4j:args4j:2.32'
compile 'com.h2database:h2:1.4.190'
compile 'com.h2database:h2:1.4.192'
compile 'org.apache.commons:commons-lang3:3.4'
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'
}

View File

@ -17,14 +17,10 @@
package ch.dissem.bitmessage.demo;
import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography;
import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.payload.Pubkey;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.ports.MemoryNodeRegistry;
import ch.dissem.bitmessage.repository.*;
import org.apache.commons.lang3.text.WordUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,19 +41,10 @@ public class Application {
private BitmessageContext ctx;
public Application(InetAddress syncServer, int syncPort) {
JdbcConfig jdbcConfig = new JdbcConfig();
ctx = new BitmessageContext.Builder()
.addressRepo(new JdbcAddressRepository(jdbcConfig))
.inventory(new JdbcInventory(jdbcConfig))
.nodeRegistry(new MemoryNodeRegistry())
.messageRepo(new JdbcMessageRepository(jdbcConfig))
.powRepo(new JdbcProofOfWorkRepository(jdbcConfig))
.networkHandler(new DefaultNetworkHandler())
.cryptography(new BouncyCryptography())
.port(48444)
.listener(plaintext -> System.out.println("New Message from " + plaintext.getFrom() + ": " + plaintext.getSubject()))
.build();
public Application(BitmessageContext.Builder ctxBuilder, InetAddress syncServer, int syncPort) {
ctx = ctxBuilder
.listener(plaintext -> System.out.println("New Message from " + plaintext.getFrom() + ": " + plaintext.getSubject()))
.build();
if (syncServer == null) {
ctx.startup();
@ -392,7 +379,7 @@ public class Application {
System.out.println(WordUtils.wrap(message.getText(), 120));
System.out.println();
System.out.println(message.getLabels().stream().map(Label::toString).collect(
Collectors.joining(", ", "Labels: ", "")));
Collectors.joining(", ", "Labels: ", "")));
System.out.println();
ctx.labeler().markAsRead(message);
ctx.messages().save(message);

View File

@ -18,20 +18,28 @@ package ch.dissem.bitmessage.demo;
import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.ports.MemoryNodeRegistry;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.NodeRegistry;
import ch.dissem.bitmessage.repository.*;
import ch.dissem.bitmessage.wif.WifExporter;
import ch.dissem.bitmessage.wif.WifImporter;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class Main {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws IOException {
if (System.getProperty("org.slf4j.simpleLogger.defaultLogLevel") == null)
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "ERROR");
@ -45,18 +53,39 @@ public class Main {
} catch (CmdLineException e) {
parser.printUsage(System.err);
}
JdbcConfig jdbcConfig = new JdbcConfig();
BitmessageContext.Builder ctxBuilder = new BitmessageContext.Builder()
.addressRepo(new JdbcAddressRepository(jdbcConfig))
.inventory(new JdbcInventory(jdbcConfig))
.messageRepo(new JdbcMessageRepository(jdbcConfig))
.powRepo(new JdbcProofOfWorkRepository(jdbcConfig))
.networkHandler(new NioNetworkHandler())
.cryptography(new BouncyCryptography())
.port(48444);
if (options.localPort != null) {
ctxBuilder.nodeRegistry(new NodeRegistry() {
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
return Arrays.stream(streams)
.mapToObj(s -> new NetworkAddress.Builder()
.ipv4(127, 0, 0, 1)
.port(options.localPort)
.stream(s).build())
.collect(Collectors.toList());
}
@Override
public void offerAddresses(List<NetworkAddress> addresses) {
LOG.info("Local node registry ignored offered addresses: " + addresses);
}
});
} else {
ctxBuilder.nodeRegistry(new JdbcNodeRegistry(jdbcConfig));
}
if (options.exportWIF != null || options.importWIF != null) {
JdbcConfig jdbcConfig = new JdbcConfig();
BitmessageContext ctx = new BitmessageContext.Builder()
.addressRepo(new JdbcAddressRepository(jdbcConfig))
.inventory(new JdbcInventory(jdbcConfig))
.nodeRegistry(new MemoryNodeRegistry())
.messageRepo(new JdbcMessageRepository(jdbcConfig))
.powRepo(new JdbcProofOfWorkRepository(jdbcConfig))
.networkHandler(new DefaultNetworkHandler())
.cryptography(new BouncyCryptography())
.port(48444)
.build();
BitmessageContext ctx = ctxBuilder.build();
if (options.exportWIF != null) {
new WifExporter(ctx).addAll().write(options.exportWIF);
@ -66,11 +95,14 @@ public class Main {
}
} else {
InetAddress syncServer = options.syncServer == null ? null : InetAddress.getByName(options.syncServer);
new Application(syncServer, options.syncPort);
new Application(ctxBuilder, syncServer, options.syncPort);
}
}
private static class CmdLineOptions {
@Option(name = "-local", usage = "Connect to local Bitmessage client on given port, instead of the usual connections from node.txt")
private Integer localPort;
@Option(name = "-import", usage = "Import from keys.dat or other WIF file.")
private File importWIF;

View File

@ -4,17 +4,23 @@ import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography;
import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.DefaultLabeler;
import ch.dissem.bitmessage.ports.Labeler;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.repository.*;
import ch.dissem.bitmessage.utils.TTL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -27,8 +33,11 @@ import static org.mockito.Matchers.any;
/**
* @author Christian Basler
*/
@RunWith(Parameterized.class)
public class SystemTest {
private static int port = 6000;
private final NetworkHandler aliceNetworkHandler;
private final NetworkHandler bobNetworkHandler;
private BitmessageContext alice;
private TestListener aliceListener = new TestListener();
@ -39,6 +48,19 @@ public class SystemTest {
private TestListener bobListener = new TestListener();
private BitmessageAddress bobIdentity;
public SystemTest(NetworkHandler peer, NetworkHandler node) {
this.aliceNetworkHandler = peer;
this.bobNetworkHandler = node;
}
@Parameterized.Parameters
public static List<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{new NioNetworkHandler(), new DefaultNetworkHandler()},
{new NioNetworkHandler(), new NioNetworkHandler()}
});
}
@Before
public void setUp() {
int alicePort = port++;
@ -54,7 +76,7 @@ public class SystemTest {
.powRepo(new JdbcProofOfWorkRepository(aliceDB))
.port(alicePort)
.nodeRegistry(new TestNodeRegistry(bobPort))
.networkHandler(new DefaultNetworkHandler())
.networkHandler(aliceNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(aliceListener)
.labeler(aliceLabeler)
@ -70,7 +92,7 @@ public class SystemTest {
.powRepo(new JdbcProofOfWorkRepository(bobDB))
.port(bobPort)
.nodeRegistry(new TestNodeRegistry(alicePort))
.networkHandler(new DefaultNetworkHandler())
.networkHandler(bobNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(bobListener)
.labeler(new DebugLabeler("Bob"))
@ -88,7 +110,7 @@ public class SystemTest {
bob.shutdown();
}
@Test
@Test(timeout = 60_000)
public void ensureAliceCanSendMessageToBob() throws Exception {
String originalMessage = UUID.randomUUID().toString();
alice.send(aliceIdentity, new BitmessageAddress(bobIdentity.getAddress()), "Subject", originalMessage);
@ -102,7 +124,7 @@ public class SystemTest {
.markAsAcknowledged(any());
}
@Test
@Test(timeout = 30_000)
public void ensureBobCanReceiveBroadcastFromAlice() throws Exception {
String originalMessage = UUID.randomUUID().toString();
bob.addSubscribtion(new BitmessageAddress(aliceIdentity.getAddress()));

View File

@ -28,7 +28,7 @@ uploadArchives {
dependencies {
compile project(':core')
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
testCompile 'org.slf4j:slf4j-simple:1.7.12'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile project(path: ':core', configuration: 'testArtifacts')

View File

@ -24,6 +24,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static ch.dissem.bitmessage.utils.Decode.*;
@ -83,6 +84,13 @@ public class ProofOfWorkRequest implements Streamable {
Encode.varBytes(data, out);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(initialHash);
Encode.varString(request.name(), buffer);
Encode.varBytes(data, buffer);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -12,7 +12,7 @@ uploadArchives {
dependencies {
compile project(':core')
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
testCompile 'org.slf4j:slf4j-simple:1.7.12'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile project(path: ':core', configuration: 'testArtifacts')

View File

@ -0,0 +1,343 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.*;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/**
* Contains everything used by both the old streams-oriented NetworkHandler and the new NioNetworkHandler,
* respectively their connection objects.
*/
public abstract class AbstractConnection {
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnection.class);
protected final InternalContext ctx;
protected final Mode mode;
protected final NetworkAddress host;
protected final NetworkAddress node;
protected final NetworkHandler.MessageListener listener;
protected final Map<InventoryVector, Long> ivCache;
protected final Deque<MessagePayload> sendingQueue;
protected final Set<InventoryVector> commonRequestedObjects;
protected final Set<InventoryVector> requestedObjects;
protected volatile State state;
protected long lastObjectTime;
private final long syncTimeout;
private long syncReadTimeout = Long.MAX_VALUE;
protected long peerNonce;
protected int version;
protected long[] streams;
private boolean verackSent;
private boolean verackReceived;
public AbstractConnection(InternalContext context, Mode mode,
NetworkAddress node,
Set<InventoryVector> commonRequestedObjects,
long syncTimeout) {
this.ctx = context;
this.mode = mode;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node;
this.listener = context.getNetworkListener();
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000));
this.ivCache = new ConcurrentHashMap<>();
this.sendingQueue = new ConcurrentLinkedDeque<>();
this.state = CONNECTING;
this.commonRequestedObjects = commonRequestedObjects;
}
public Mode getMode() {
return mode;
}
public NetworkAddress getNode() {
return node;
}
public State getState() {
return state;
}
public long[] getStreams() {
return streams;
}
protected void handleMessage(MessagePayload payload) {
switch (state) {
case ACTIVE:
receiveMessage(payload);
break;
case DISCONNECTED:
break;
default:
handleCommand(payload);
break;
}
}
private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) {
case INV:
receiveMessage((Inv) messagePayload);
break;
case GETDATA:
receiveMessage((GetData) messagePayload);
break;
case OBJECT:
receiveMessage((ObjectMessage) messagePayload);
break;
case ADDR:
receiveMessage((Addr) messagePayload);
break;
case CUSTOM:
case VERACK:
case VERSION:
default:
throw new IllegalStateException("Unexpectedly received '" + messagePayload.getCommand() + "' command");
}
}
private void receiveMessage(Inv inv) {
int originalSize = inv.getInventory().size();
updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(commonRequestedObjects);
LOG.trace("Received inventory with " + originalSize + " elements, of which are "
+ missing.size() + " missing.");
send(new GetData.Builder().inventory(missing).build());
}
private void receiveMessage(GetData getData) {
for (InventoryVector iv : getData.getInventory()) {
ObjectMessage om = ctx.getInventory().getObject(iv);
if (om != null) sendingQueue.offer(om);
}
}
private void receiveMessage(ObjectMessage objectMessage) {
requestedObjects.remove(objectMessage.getInventoryVector());
if (ctx.getInventory().contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
return;
}
try {
listener.receive(objectMessage);
cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES);
ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network:
ctx.getNetworkHandler().offer(objectMessage.getInventoryVector());
lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) {
LOG.warn(e.getMessage());
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally {
if (!commonRequestedObjects.remove(objectMessage.getInventoryVector())) {
LOG.debug("Received object that wasn't requested.");
}
}
}
private void receiveMessage(Addr addr) {
LOG.trace("Received " + addr.getAddresses().size() + " addresses.");
ctx.getNodeRegistry().offerAddresses(addr.getAddresses());
}
private void updateIvCache(List<InventoryVector> inventory) {
cleanupIvCache();
Long now = UnixTime.now();
for (InventoryVector iv : inventory) {
ivCache.put(iv, now);
}
}
public void offer(InventoryVector iv) {
sendingQueue.offer(new Inv.Builder()
.addInventoryVector(iv)
.build());
updateIvCache(Collections.singletonList(iv));
}
public boolean knowsOf(InventoryVector iv) {
return ivCache.containsKey(iv);
}
private void cleanupIvCache() {
Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE);
for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) {
if (entry.getValue() < fiveMinutesAgo) {
ivCache.remove(entry.getKey());
}
}
}
private void handleCommand(MessagePayload payload) {
switch (payload.getCommand()) {
case VERSION:
handleVersion((Version) payload);
break;
case VERACK:
if (verackSent) {
activateConnection();
}
verackReceived = true;
break;
case CUSTOM:
MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload);
if (response == null) {
disconnect();
} else {
send(response);
}
break;
default:
throw new NodeException("Command 'version' or 'verack' expected, but was '"
+ payload.getCommand() + "'");
}
}
private void activateConnection() {
LOG.info("Successfully established connection with node " + node);
state = ACTIVE;
node.setTime(UnixTime.now());
if (mode != SYNC) {
sendAddresses();
ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
}
sendInventory();
}
private void sendAddresses() {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(1000, streams);
sendingQueue.offer(new Addr.Builder().addresses(addresses).build());
}
private void sendInventory() {
List<InventoryVector> inventory = ctx.getInventory().getInventory(streams);
for (int i = 0; i < inventory.size(); i += 50000) {
sendingQueue.offer(new Inv.Builder()
.inventory(inventory.subList(i, Math.min(inventory.size(), i + 50000)))
.build());
}
}
private void handleVersion(Version version) {
if (version.getNonce() == ctx.getClientNonce()) {
LOG.info("Tried to connect to self, disconnecting.");
disconnect();
} else if (version.getVersion() >= BitmessageContext.CURRENT_VERSION) {
this.peerNonce = version.getNonce();
if (peerNonce == ctx.getClientNonce()) disconnect();
this.version = version.getVersion();
this.streams = version.getStreams();
verackSent = true;
send(new VerAck());
if (mode == SERVER) {
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
}
if (verackReceived) {
activateConnection();
}
} else {
LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting.");
disconnect();
}
}
@SuppressWarnings("RedundantIfStatement")
protected boolean syncFinished(NetworkMessage msg) {
if (mode != SYNC) {
return false;
}
if (Thread.interrupted()) {
return true;
}
if (state != ACTIVE) {
return false;
}
if (syncTimeout < UnixTime.now()) {
LOG.info("Synchronization timed out");
return true;
}
if (!sendingQueue.isEmpty()) {
syncReadTimeout = System.currentTimeMillis() + 1000;
return false;
}
if (msg == null) {
return syncReadTimeout < System.currentTimeMillis();
} else {
syncReadTimeout = System.currentTimeMillis() + 1000;
return false;
}
}
public void disconnect() {
state = DISCONNECTED;
// Make sure objects that are still missing are requested from other nodes
ctx.getNetworkHandler().request(requestedObjects);
}
protected abstract void send(MessagePayload payload);
public enum Mode {SERVER, CLIENT, SYNC}
public enum State {CONNECTING, ACTIVE, DISCONNECTED}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbstractConnection that = (AbstractConnection) o;
return Objects.equals(node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(node);
}
}

View File

@ -16,13 +16,13 @@
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener;
import ch.dissem.bitmessage.utils.UnixTime;
@ -36,94 +36,59 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.Connection.State.*;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/**
* A connection to a specific node
*/
class Connection {
class Connection extends AbstractConnection {
public static final int READ_TIMEOUT = 2000;
private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
private static final int CONNECT_TIMEOUT = 5000;
private final long startTime;
private final ConcurrentMap<InventoryVector, Long> ivCache;
private final InternalContext ctx;
private final Mode mode;
private final Socket socket;
private final MessageListener listener;
private final NetworkAddress host;
private final NetworkAddress node;
private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
private final Set<InventoryVector> commonRequestedObjects;
private final Set<InventoryVector> requestedObjects;
private final long syncTimeout;
private final ReaderRunnable reader = new ReaderRunnable();
private final WriterRunnable writer = new WriterRunnable();
private final DefaultNetworkHandler networkHandler;
private final long clientNonce;
private volatile State state;
private InputStream in;
private OutputStream out;
private int version;
private long[] streams;
private int readTimeoutCounter;
private boolean socketInitialized;
private long lastObjectTime;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
Set<InventoryVector> requestedObjectsMap, long clientNonce) throws IOException {
this(context, mode, listener, socket, requestedObjectsMap,
Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)),
public Connection(InternalContext context, Mode mode, Socket socket,
Set<InventoryVector> requestedObjectsMap) throws IOException {
this(context, mode, socket, requestedObjectsMap,
new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(),
0, clientNonce);
0);
}
public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener,
Set<InventoryVector> requestedObjectsMap, long clientNonce) {
this(context, mode, listener, new Socket(), requestedObjectsMap,
Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)),
node, 0, clientNonce);
public Connection(InternalContext context, Mode mode, NetworkAddress node,
Set<InventoryVector> requestedObjectsMap) {
this(context, mode, new Socket(), requestedObjectsMap,
node, 0);
}
private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket,
Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> requestedObjects,
NetworkAddress node, long syncTimeout, long clientNonce) {
private Connection(InternalContext context, Mode mode, Socket socket,
Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) {
super(context, mode, node, commonRequestedObjects, syncTimeout);
this.startTime = UnixTime.now();
this.ctx = context;
this.mode = mode;
this.state = CONNECTING;
this.listener = listener;
this.socket = socket;
this.commonRequestedObjects = commonRequestedObjects;
this.requestedObjects = requestedObjects;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node;
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
this.ivCache = new ConcurrentHashMap<>();
this.networkHandler = (DefaultNetworkHandler) ctx.getNetworkHandler();
this.clientNonce = clientNonce;
}
public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener,
long timeoutInSeconds) throws IOException {
return new Connection(ctx, SYNC, listener, new Socket(address, port),
new HashSet<InventoryVector>(),
return new Connection(ctx, SYNC, new Socket(address, port),
new HashSet<InventoryVector>(),
new NetworkAddress.Builder().ip(address).port(port).stream(1).build(),
timeoutInSeconds, cryptography().randomNonce());
timeoutInSeconds);
}
public long getStartTime() {
@ -142,160 +107,8 @@ class Connection {
return node;
}
@SuppressWarnings("RedundantIfStatement")
private boolean syncFinished(NetworkMessage msg) {
if (mode != SYNC) {
return false;
}
if (Thread.interrupted()) {
return true;
}
if (state != ACTIVE) {
return false;
}
if (syncTimeout < UnixTime.now()) {
LOG.info("Synchronization timed out");
return true;
}
if (msg == null) {
if (requestedObjects.isEmpty() && sendingQueue.isEmpty())
return true;
readTimeoutCounter++;
return readTimeoutCounter > 1;
} else {
readTimeoutCounter = 0;
return false;
}
}
private void activateConnection() {
LOG.info("Successfully established connection with node " + node);
state = ACTIVE;
if (mode != SYNC) {
sendAddresses();
ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
}
sendInventory();
node.setTime(UnixTime.now());
}
private void cleanupIvCache() {
Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE);
for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) {
if (entry.getValue() < fiveMinutesAgo) {
ivCache.remove(entry.getKey());
}
}
}
private void updateIvCache(InventoryVector... inventory) {
cleanupIvCache();
Long now = UnixTime.now();
for (InventoryVector iv : inventory) {
ivCache.put(iv, now);
}
}
private void updateIvCache(List<InventoryVector> inventory) {
cleanupIvCache();
Long now = UnixTime.now();
for (InventoryVector iv : inventory) {
ivCache.put(iv, now);
}
}
private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) {
case INV:
receiveMessage((Inv) messagePayload);
break;
case GETDATA:
receiveMessage((GetData) messagePayload);
break;
case OBJECT:
receiveMessage((ObjectMessage) messagePayload);
break;
case ADDR:
receiveMessage((Addr) messagePayload);
break;
case CUSTOM:
case VERACK:
case VERSION:
default:
throw new IllegalStateException("Unexpectedly received '" + messagePayload.getCommand() + "' command");
}
}
private void receiveMessage(Inv inv) {
int originalSize = inv.getInventory().size();
updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(commonRequestedObjects);
LOG.debug("Received inventory with " + originalSize + " elements, of which are "
+ missing.size() + " missing.");
send(new GetData.Builder().inventory(missing).build());
}
private void receiveMessage(GetData getData) {
for (InventoryVector iv : getData.getInventory()) {
ObjectMessage om = ctx.getInventory().getObject(iv);
if (om != null) sendingQueue.offer(om);
}
}
private void receiveMessage(ObjectMessage objectMessage) {
requestedObjects.remove(objectMessage.getInventoryVector());
if (ctx.getInventory().contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
return;
}
try {
listener.receive(objectMessage);
cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES);
ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network:
networkHandler.offer(objectMessage.getInventoryVector());
lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) {
LOG.warn(e.getMessage());
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally {
if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) {
LOG.debug("Received object that wasn't requested.");
}
}
}
private void receiveMessage(Addr addr) {
LOG.debug("Received " + addr.getAddresses().size() + " addresses.");
ctx.getNodeRegistry().offerAddresses(addr.getAddresses());
}
private void sendAddresses() {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(1000, streams);
sendingQueue.offer(new Addr.Builder().addresses(addresses).build());
}
private void sendInventory() {
List<InventoryVector> inventory = ctx.getInventory().getInventory(streams);
for (int i = 0; i < inventory.size(); i += 50000) {
sendingQueue.offer(new Inv.Builder()
.inventory(inventory.subList(i, Math.min(inventory.size(), i + 50000)))
.build());
}
}
public void disconnect() {
state = DISCONNECTED;
// Make sure objects that are still missing are requested from other nodes
networkHandler.request(requestedObjects);
}
void send(MessagePayload payload) {
@Override
protected void send(MessagePayload payload) {
try {
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
@ -309,17 +122,6 @@ class Connection {
}
}
public void offer(InventoryVector iv) {
sendingQueue.offer(new Inv.Builder()
.addInventoryVector(iv)
.build());
updateIvCache(iv);
}
public boolean knowsOf(InventoryVector iv) {
return ivCache.containsKey(iv);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -354,18 +156,13 @@ class Connection {
return writer;
}
public enum Mode {SERVER, CLIENT, SYNC}
public enum State {CONNECTING, ACTIVE, DISCONNECTED}
public class ReaderRunnable implements Runnable {
@Override
public void run() {
lastObjectTime = 0;
try (Socket socket = Connection.this.socket) {
initSocket(socket);
if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build());
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
}
while (state != DISCONNECTED) {
if (mode != SYNC) {
@ -394,75 +191,13 @@ class Connection {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
return;
switch (state) {
case ACTIVE:
receiveMessage(msg.getPayload());
break;
default:
handleCommand(msg.getPayload());
break;
}
handleMessage(msg.getPayload());
if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect();
} catch (SocketTimeoutException ignore) {
if (state == ACTIVE && syncFinished(null)) disconnect();
}
}
private void handleCommand(MessagePayload payload) {
switch (payload.getCommand()) {
case VERSION:
handleVersion((Version) payload);
break;
case VERACK:
switch (mode) {
case SERVER:
activateConnection();
break;
case CLIENT:
case SYNC:
default:
// NO OP
break;
}
break;
case CUSTOM:
MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload);
if (response != null) {
send(response);
}
disconnect();
break;
default:
throw new NodeException("Command 'version' or 'verack' expected, but was '"
+ payload.getCommand() + "'");
}
}
private void handleVersion(Version version) {
if (version.getNonce() == ctx.getClientNonce()) {
LOG.info("Tried to connect to self, disconnecting.");
disconnect();
} else if (version.getVersion() >= BitmessageContext.CURRENT_VERSION) {
Connection.this.version = version.getVersion();
streams = version.getStreams();
send(new VerAck());
switch (mode) {
case SERVER:
send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build());
break;
case CLIENT:
case SYNC:
activateConnection();
break;
default:
// NO OP
}
} else {
LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting.");
disconnect();
}
}
}
private boolean checkOpenRequests() {

View File

@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.DefaultNetworkHandler.NETWORK_MAGIC_NUMBER;
/**
@ -38,17 +38,14 @@ public class ConnectionOrganizer implements Runnable {
private final InternalContext ctx;
private final DefaultNetworkHandler networkHandler;
private final NetworkHandler.MessageListener listener;
private final long clientNonce;
private Connection initialConnection;
public ConnectionOrganizer(InternalContext ctx,
DefaultNetworkHandler networkHandler,
NetworkHandler.MessageListener listener, long clientNonce) {
DefaultNetworkHandler networkHandler) {
this.ctx = ctx;
this.networkHandler = networkHandler;
this.listener = listener;
this.clientNonce = clientNonce;
this.listener = ctx.getNetworkListener();
}
@Override
@ -93,8 +90,7 @@ public class ConnectionOrganizer implements Runnable {
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) {
Connection c = new Connection(ctx, CLIENT, address, listener,
networkHandler.requestedObjects, clientNonce);
Connection c = new Connection(ctx, CLIENT, address, networkHandler.requestedObjects);
if (first) {
initialConnection = c;
first = false;

View File

@ -35,24 +35,26 @@ import java.net.Socket;
import java.util.*;
import java.util.concurrent.*;
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.util.Collections.newSetFromMap;
/**
* Handles all the networky stuff.
*
* @deprecated use {@link ch.dissem.bitmessage.networking.nio.NioNetworkHandler NioNetworkHandler} instead.
*/
@Deprecated
public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
public final static int NETWORK_MAGIC_NUMBER = 8;
final Collection<Connection> connections = new ConcurrentLinkedQueue<>();
private final ExecutorService pool = Executors.newCachedThreadPool(
pool("network")
.lowPrio()
.daemon()
.build());
pool("network")
.lowPrio()
.daemon()
.build());
private InternalContext ctx;
private ServerRunnable server;
private volatile boolean running;
@ -65,9 +67,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
}
@Override
public Future<?> synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds) {
public Future<?> synchronize(InetAddress server, int port, long timeoutInSeconds) {
try {
Connection connection = Connection.sync(ctx, server, port, listener, timeoutInSeconds);
Connection connection = Connection.sync(ctx, server, port, ctx.getNetworkListener(), timeoutInSeconds);
Future<?> reader = pool.submit(connection.getReader());
pool.execute(connection.getWriter());
return reader;
@ -89,28 +91,25 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
throw new NodeException("No response from node " + server);
} else {
throw new NodeException("Unexpected response from node " +
server + ": " + networkMessage.getPayload().getCommand());
server + ": " + networkMessage.getPayload().getCommand());
}
}
} catch (IOException e) {
throw new ApplicationException(e);
throw new NodeException(e.getMessage(), e);
}
}
@Override
public void start(final MessageListener listener) {
if (listener == null) {
throw new IllegalStateException("Listener must be set at start");
}
public void start() {
if (running) {
throw new IllegalStateException("Network already running - you need to stop first.");
}
try {
running = true;
connections.clear();
server = new ServerRunnable(ctx, this, listener, ctx.getClientNonce());
server = new ServerRunnable(ctx, this);
pool.execute(server);
pool.execute(new ConnectionOrganizer(ctx, this, listener, ctx.getClientNonce()));
pool.execute(new ConnectionOrganizer(ctx, this));
} catch (IOException e) {
throw new ApplicationException(e);
}
@ -171,12 +170,13 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
long stream = connection.getNode().getStream();
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
for (long stream : connection.getStreams()) {
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
}
}
}
@ -186,20 +186,21 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0;
int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0;
streamProperties[i] = new Property("stream " + stream,
null, new Property("nodes", incoming + outgoing),
new Property("incoming", incoming),
new Property("outgoing", outgoing)
null, new Property("nodes", incoming + outgoing),
new Property("incoming", incoming),
new Property("outgoing", outgoing)
);
i++;
}
return new Property("network", null,
new Property("connectionManager", running ? "running" : "stopped"),
new Property("connections", null, streamProperties),
new Property("requestedObjects", requestedObjects.size())
new Property("connectionManager", running ? "running" : "stopped"),
new Property("connections", null, streamProperties),
new Property("requestedObjects", requestedObjects.size())
);
}
void request(Set<InventoryVector> inventoryVectors) {
@Override
public void request(Collection<InventoryVector> inventoryVectors) {
if (!running || inventoryVectors.isEmpty()) return;
Map<Connection, List<InventoryVector>> distribution = new HashMap<>();

View File

@ -26,7 +26,7 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
/**
* @author Christian Basler
@ -37,15 +37,12 @@ public class ServerRunnable implements Runnable, Closeable {
private final ServerSocket serverSocket;
private final DefaultNetworkHandler networkHandler;
private final NetworkHandler.MessageListener listener;
private final long clientNonce;
public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler,
NetworkHandler.MessageListener listener, long clientNonce) throws IOException {
public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler) throws IOException {
this.ctx = ctx;
this.networkHandler = networkHandler;
this.listener = listener;
this.listener = ctx.getNetworkListener();
this.serverSocket = new ServerSocket(ctx.getPort());
this.clientNonce = clientNonce;
}
@Override
@ -54,8 +51,7 @@ public class ServerRunnable implements Runnable, Closeable {
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(Connection.READ_TIMEOUT);
networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener,
networkHandler.requestedObjects, clientNonce));
networkHandler.startConnection(new Connection(ctx, SERVER, socket, networkHandler.requestedObjects));
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}

View File

@ -0,0 +1,160 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.V3MessageReader;
import ch.dissem.bitmessage.networking.AbstractConnection;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
/**
* Represents the current state of a connection.
*/
public class ConnectionInfo extends AbstractConnection {
private final ByteBuffer headerOut = ByteBuffer.allocate(24);
private ByteBuffer payloadOut;
private V3MessageReader reader = new V3MessageReader();
private boolean syncFinished;
private long lastUpdate = System.currentTimeMillis();
public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node,
Set<InventoryVector> commonRequestedObjects, long syncTimeout) {
super(context, mode, node, commonRequestedObjects, syncTimeout);
headerOut.flip();
if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
}
}
public State getState() {
return state;
}
public boolean knowsOf(InventoryVector iv) {
return ivCache.containsKey(iv);
}
public Queue<MessagePayload> getSendingQueue() {
return sendingQueue;
}
public ByteBuffer getInBuffer() {
if (reader == null) {
throw new NodeException("Node is disconnected");
}
return reader.getActiveBuffer();
}
public void updateWriter() {
if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) {
headerOut.clear();
MessagePayload payload = sendingQueue.poll();
payloadOut = new NetworkMessage(payload).writeHeaderAndGetPayloadBuffer(headerOut);
headerOut.flip();
lastUpdate = System.currentTimeMillis();
}
}
public ByteBuffer[] getOutBuffers() {
return new ByteBuffer[]{headerOut, payloadOut};
}
public void cleanupBuffers() {
if (payloadOut != null && !payloadOut.hasRemaining()) {
payloadOut = null;
}
}
public void updateReader() {
reader.update();
if (!reader.getMessages().isEmpty()) {
Iterator<NetworkMessage> iterator = reader.getMessages().iterator();
NetworkMessage msg = null;
while (iterator.hasNext()) {
msg = iterator.next();
handleMessage(msg.getPayload());
iterator.remove();
}
syncFinished = syncFinished(msg);
}
lastUpdate = System.currentTimeMillis();
}
public void updateSyncStatus() {
if (!syncFinished) {
syncFinished = (reader == null || reader.getMessages().isEmpty()) && syncFinished(null);
}
}
public boolean isExpired() {
switch (state) {
case CONNECTING:
// the TCP timeout starts out at 20 seconds
return lastUpdate < System.currentTimeMillis() - 20_000;
case ACTIVE:
// after verack messages are exchanged, the timeout is raised to 10 minutes
return lastUpdate < System.currentTimeMillis() - 600_000;
case DISCONNECTED:
return true;
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
@Override
public void disconnect() {
super.disconnect();
if (reader != null) {
reader.cleanup();
reader = null;
}
payloadOut = null;
}
public boolean isSyncFinished() {
return syncFinished;
}
@Override
protected void send(MessagePayload payload) {
sendingQueue.add(payload);
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
commonRequestedObjects.addAll(((GetData) payload).getInventory());
}
}
public boolean isWritePending() {
return !sendingQueue.isEmpty()
|| headerOut != null && headerOut.hasRemaining()
|| payloadOut != null && payloadOut.hasRemaining();
}
}

View File

@ -0,0 +1,506 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.V3MessageReader;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.Property;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.Collections.selectRandom;
import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.*;
import static java.util.Collections.newSetFromMap;
/**
* Network handler using java.nio, resulting in less threads.
*/
public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder {
private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class);
private static final long REQUESTED_OBJECTS_MAX_TIME = 30 * 60_000; // 30 minutes
private final ExecutorService threadPool = Executors.newCachedThreadPool(
pool("network")
.lowPrio()
.daemon()
.build());
private InternalContext ctx;
private Selector selector;
private ServerSocketChannel serverChannel;
private Queue<NetworkAddress> connectionQueue = new ConcurrentLinkedQueue<>();
private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>();
private final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000));
private long requestedObjectsTimeout = 0;
private Thread starter;
@Override
public Future<Void> synchronize(final InetAddress server, final int port, final long timeoutInSeconds) {
return threadPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
new HashSet<InventoryVector>(), timeoutInSeconds);
while (channel.isConnected() && !connection.isSyncFinished()) {
write(channel, connection);
read(channel, connection);
Thread.sleep(10);
}
LOG.info("Synchronization finished");
}
return null;
}
});
}
@Override
public CustomMessage send(InetAddress server, int port, CustomMessage request) {
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.configureBlocking(true);
ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
ByteBuffer payloadBuffer = new NetworkMessage(request).writeHeaderAndGetPayloadBuffer(headerBuffer);
headerBuffer.flip();
while (headerBuffer.hasRemaining()) {
channel.write(headerBuffer);
}
while (payloadBuffer.hasRemaining()) {
channel.write(payloadBuffer);
}
V3MessageReader reader = new V3MessageReader();
while (channel.isConnected() && reader.getMessages().isEmpty()) {
if (channel.read(reader.getActiveBuffer()) > 0) {
reader.update();
} else {
throw new NodeException("No response from node " + server);
}
}
NetworkMessage networkMessage;
if (reader.getMessages().isEmpty()) {
throw new NodeException("No response from node " + server);
} else {
networkMessage = reader.getMessages().get(0);
}
if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) {
return (CustomMessage) networkMessage.getPayload();
} else {
if (networkMessage == null || networkMessage.getPayload() == null) {
throw new NodeException("Empty response from node " + server);
} else {
throw new NodeException("Unexpected response from node " + server + ": "
+ networkMessage.getPayload().getClass());
}
}
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public void start() {
if (selector != null && selector.isOpen()) {
throw new IllegalStateException("Network already running - you need to stop first.");
}
try {
selector = Selector.open();
} catch (IOException e) {
throw new ApplicationException(e);
}
requestedObjectsTimeout = System.currentTimeMillis() + REQUESTED_OBJECTS_MAX_TIME;
requestedObjects.clear();
starter = thread("connection manager", new Runnable() {
@Override
public void run() {
while (selector.isOpen()) {
int missing = NETWORK_MAGIC_NUMBER;
for (ConnectionInfo connectionInfo : connections.keySet()) {
if (connectionInfo.getState() == ACTIVE) {
missing--;
if (missing == 0) break;
}
}
if (missing > 0) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(100, ctx.getStreams());
addresses = selectRandom(missing, addresses);
for (NetworkAddress address : addresses) {
if (!isConnectedTo(address)) {
connectionQueue.offer(address);
}
}
}
Iterator<Map.Entry<ConnectionInfo, SelectionKey>> it = connections.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ConnectionInfo, SelectionKey> e = it.next();
if (!e.getValue().isValid() || e.getKey().isExpired()) {
try {
e.getValue().channel().close();
} catch (Exception ignore) {
}
e.getValue().cancel();
e.getValue().attach(null);
e.getKey().disconnect();
it.remove();
}
}
// The list 'requested objects' helps to prevent downloading an object
// twice. From time to time there is an error though, and an object is
// never downloaded. To prevent a large list of failed objects and give
// them a chance to get downloaded again, let's clear the list from time
// to time. The timeout should be such that most of the initial object
// sync should be done by then, but small enough to prevent objects with
// a normal time out from not being downloaded at all.
long now = System.currentTimeMillis();
if (now > requestedObjectsTimeout) {
requestedObjectsTimeout = now + REQUESTED_OBJECTS_MAX_TIME;
requestedObjects.clear();
}
try {
Thread.sleep(30_000);
} catch (InterruptedException e) {
return;
}
}
}
});
thread("selector worker", new Runnable() {
@Override
public void run() {
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(ctx.getPort()));
serverChannel.register(selector, OP_ACCEPT, null);
while (selector.isOpen()) {
selector.select(1000);
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.attachment() == null) {
try {
if (key.isAcceptable()) {
// handle accept
try {
SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept();
accepted.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SERVER,
new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(),
requestedObjects, 0
);
connections.put(
connection,
accepted.register(selector, OP_READ | OP_WRITE, connection)
);
} catch (AsynchronousCloseException e) {
LOG.trace(e.getMessage());
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
} catch (CancelledKeyException e) {
LOG.error(e.getMessage(), e);
}
} else {
// handle read/write
SocketChannel channel = (SocketChannel) key.channel();
ConnectionInfo connection = (ConnectionInfo) key.attachment();
try {
if (key.isConnectable()) {
if (!channel.finishConnect()) {
continue;
}
}
if (key.isWritable()) {
write(channel, connection);
}
if (key.isReadable()) {
read(channel, connection);
}
if (connection.getState() == DISCONNECTED) {
key.interestOps(0);
channel.close();
} else if (connection.isWritePending()) {
key.interestOps(OP_READ | OP_WRITE);
} else {
key.interestOps(OP_READ);
}
} catch (CancelledKeyException | NodeException | IOException e) {
connection.disconnect();
}
}
}
// set interest ops
for (Map.Entry<ConnectionInfo, SelectionKey> e : connections.entrySet()) {
if (e.getValue().isValid()
&& (e.getValue().interestOps() & OP_WRITE) == 0
&& (e.getValue().interestOps() & OP_CONNECT) == 0
&& !e.getKey().getSendingQueue().isEmpty()) {
e.getValue().interestOps(OP_READ | OP_WRITE);
}
}
// start new connections
if (!connectionQueue.isEmpty()) {
NetworkAddress address = connectionQueue.poll();
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort()));
ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT,
address,
requestedObjects, 0
);
connections.put(
connection,
channel.register(selector, OP_CONNECT, connection)
);
} catch (NoRouteToHostException ignore) {
// We'll try to connect to many offline nodes, so
// this is expected to happen quite a lot.
} catch (AsynchronousCloseException e) {
// The exception is expected if the network is being
// shut down, as we actually do asynchronously close
// the connections.
if (isRunning()) {
LOG.error(e.getMessage(), e);
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
}
selector.close();
} catch (ClosedSelectorException ignore) {
} catch (IOException e) {
throw new ApplicationException(e);
}
}
});
}
private static void write(SocketChannel channel, ConnectionInfo connection)
throws IOException {
writeBuffer(connection.getOutBuffers(), channel);
connection.updateWriter();
writeBuffer(connection.getOutBuffers(), channel);
connection.cleanupBuffers();
}
private static void writeBuffer(ByteBuffer[] buffers, SocketChannel channel) throws IOException {
if (buffers[1] == null) {
if (buffers[0].hasRemaining()) {
channel.write(buffers[0]);
}
} else if (buffers[1].hasRemaining() || buffers[0].hasRemaining()) {
channel.write(buffers);
}
}
private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException {
if (channel.read(connection.getInBuffer()) > 0) {
connection.updateReader();
}
connection.updateSyncStatus();
}
private Thread thread(String threadName, Runnable runnable) {
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
thread.start();
return thread;
}
@Override
public void stop() {
try {
serverChannel.socket().close();
selector.close();
for (SelectionKey selectionKey : connections.values()) {
selectionKey.channel().close();
}
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public void offer(InventoryVector iv) {
List<ConnectionInfo> target = new LinkedList<>();
for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
target.add(connection);
}
}
List<ConnectionInfo> randomSubset = selectRandom(NETWORK_MAGIC_NUMBER, target);
for (ConnectionInfo connection : randomSubset) {
connection.offer(iv);
}
}
@Override
public void request(Collection<InventoryVector> inventoryVectors) {
if (!isRunning()) {
requestedObjects.clear();
return;
}
Iterator<InventoryVector> iterator = inventoryVectors.iterator();
if (!iterator.hasNext()) {
return;
}
Map<ConnectionInfo, List<InventoryVector>> distribution = new HashMap<>();
for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>());
}
}
if (distribution.isEmpty()) {
return;
}
InventoryVector next = iterator.next();
ConnectionInfo previous = null;
do {
for (ConnectionInfo connection : distribution.keySet()) {
if (connection == previous || previous == null) {
if (iterator.hasNext()) {
previous = connection;
next = iterator.next();
} else {
break;
}
}
if (connection.knowsOf(next)) {
List<InventoryVector> ivs = distribution.get(connection);
if (ivs.size() == GetData.MAX_INVENTORY_SIZE) {
connection.send(new GetData.Builder().inventory(ivs).build());
ivs.clear();
}
ivs.add(next);
iterator.remove();
if (iterator.hasNext()) {
next = iterator.next();
previous = connection;
} else {
break;
}
}
}
} while (iterator.hasNext());
// remove objects nobody knows of
requestedObjects.removeAll(inventoryVectors);
for (ConnectionInfo connection : distribution.keySet()) {
List<InventoryVector> ivs = distribution.get(connection);
if (!ivs.isEmpty()) {
connection.send(new GetData.Builder().inventory(ivs).build());
}
}
}
@Override
public Property getNetworkStatus() {
TreeSet<Long> streams = new TreeSet<>();
TreeMap<Long, Integer> incomingConnections = new TreeMap<>();
TreeMap<Long, Integer> outgoingConnections = new TreeMap<>();
for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE) {
for (long stream : connection.getStreams()) {
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
}
}
}
Property[] streamProperties = new Property[streams.size()];
int i = 0;
for (Long stream : streams) {
int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0;
int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0;
streamProperties[i] = new Property("stream " + stream,
null, new Property("nodes", incoming + outgoing),
new Property("incoming", incoming),
new Property("outgoing", outgoing)
);
i++;
}
return new Property("network", null,
new Property("connectionManager", isRunning() ? "running" : "stopped"),
new Property("connections", null, streamProperties),
new Property("requestedObjects", requestedObjects.size())
);
}
private boolean isConnectedTo(NetworkAddress address) {
for (ConnectionInfo c : connections.keySet()) {
if (c.getNode().equals(address)) {
return true;
}
}
return false;
}
@Override
public boolean isRunning() {
return selector != null && selector.isOpen() && starter.isAlive();
}
@Override
public void setContext(InternalContext context) {
this.ctx = context;
}
}

View File

@ -22,11 +22,23 @@ import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.*;
import ch.dissem.bitmessage.utils.Property;
import org.junit.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
@ -37,70 +49,98 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
/**
* FIXME: there really should be sensible tests for the network handler
* Tests network handlers. This test is parametrized, so it can test both the nio and classic implementation
* as well as their combinations. It might be slightly over the top and will most probably be cleaned up once
* the nio implementation is deemed stable.
*/
@RunWith(Parameterized.class)
public class NetworkHandlerTest {
private static NetworkAddress localhost = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build();
private static final Logger LOG = LoggerFactory.getLogger(NetworkHandlerTest.class);
private static NetworkAddress peerAddress = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build();
private TestInventory peerInventory;
private TestInventory nodeInventory;
private BitmessageContext peer;
private BitmessageContext node;
private NetworkHandler networkHandler;
private final NetworkHandler peerNetworkHandler;
private final NetworkHandler nodeNetworkHandler;
@Rule
public final TestRule timeout = new DisableOnDebug(Timeout.seconds(60));
public NetworkHandlerTest(NetworkHandler peer, NetworkHandler node) {
this.peerNetworkHandler = peer;
this.nodeNetworkHandler = node;
}
@Parameterized.Parameters
public static List<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{new DefaultNetworkHandler(), new DefaultNetworkHandler()},
{new DefaultNetworkHandler(), new NioNetworkHandler()},
{new NioNetworkHandler(), new DefaultNetworkHandler()},
{new NioNetworkHandler(), new NioNetworkHandler()}
});
}
@Before
public void setUp() {
public void setUp() throws InterruptedException {
peerInventory = new TestInventory();
peer = new BitmessageContext.Builder()
.addressRepo(mock(AddressRepository.class))
.inventory(peerInventory)
.messageRepo(mock(MessageRepository.class))
.powRepo(mock(ProofOfWorkRepository.class))
.port(6001)
.nodeRegistry(new TestNodeRegistry())
.networkHandler(new DefaultNetworkHandler())
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.build();
.addressRepo(mock(AddressRepository.class))
.inventory(peerInventory)
.messageRepo(mock(MessageRepository.class))
.powRepo(mock(ProofOfWorkRepository.class))
.port(peerAddress.getPort())
.nodeRegistry(new TestNodeRegistry())
.networkHandler(peerNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.customCommandHandler(new CustomCommandHandler() {
@Override
public MessagePayload handle(CustomMessage request) {
byte[] data = request.getData();
if (data.length > 0) {
switch (data[0]) {
case 0:
return null;
case 1:
break;
case 3:
data[0] = 0;
break;
default:
break;
}
}
return new CustomMessage("test response", request.getData());
}
})
.build();
peer.startup();
Thread.sleep(100);
nodeInventory = new TestInventory();
networkHandler = new DefaultNetworkHandler();
node = new BitmessageContext.Builder()
.addressRepo(mock(AddressRepository.class))
.inventory(nodeInventory)
.messageRepo(mock(MessageRepository.class))
.powRepo(mock(ProofOfWorkRepository.class))
.port(6002)
.nodeRegistry(new TestNodeRegistry(localhost))
.networkHandler(networkHandler)
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.customCommandHandler(new CustomCommandHandler() {
@Override
public MessagePayload handle(CustomMessage request) {
byte[] data = request.getData();
if (data.length > 0) {
switch (data[0]) {
case 0:
return null;
case 1:
break;
case 3:
data[0] = 0;
}
}
return new CustomMessage("test response", request.getData());
}
})
.build();
.addressRepo(mock(AddressRepository.class))
.inventory(nodeInventory)
.messageRepo(mock(MessageRepository.class))
.powRepo(mock(ProofOfWorkRepository.class))
.port(6002)
.nodeRegistry(new TestNodeRegistry(peerAddress))
.networkHandler(nodeNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.build();
}
@After
public void cleanUp() {
shutdown(peer);
shutdown(node);
shutdown(nodeNetworkHandler);
}
private static void shutdown(BitmessageContext ctx) {
@ -115,93 +155,112 @@ public class NetworkHandlerTest {
} while (ctx.isRunning());
}
@Test(timeout = 5_000)
public void ensureNodesAreConnecting() {
node.startup();
Property status;
private static void shutdown(NetworkHandler networkHandler) {
if (!networkHandler.isRunning()) return;
networkHandler.stop();
do {
Thread.yield();
status = node.status().getProperty("network", "connections", "stream 0");
} while (status == null);
assertEquals(1, status.getProperty("outgoing").getValue());
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
if (networkHandler.isRunning()) {
LOG.warn("Thread interrupted while waiting for network shutdown - " +
"this could cause problems in subsequent tests.");
}
return;
}
} while (networkHandler.isRunning());
}
@Test(timeout = 5_000)
private Property waitForNetworkStatus(BitmessageContext ctx) throws InterruptedException {
Property status;
do {
Thread.sleep(100);
status = ctx.status().getProperty("network", "connections", "stream 1");
} while (status == null);
return status;
}
@Test
public void ensureNodesAreConnecting() throws Exception {
node.startup();
Property nodeStatus = waitForNetworkStatus(node);
Property peerStatus = waitForNetworkStatus(peer);
assertEquals(1, nodeStatus.getProperty("outgoing").getValue());
assertEquals(1, peerStatus.getProperty("incoming").getValue());
}
@Test
public void ensureCustomMessageIsSentAndResponseRetrieved() throws Exception {
byte[] data = cryptography().randomBytes(8);
data[0] = (byte) 1;
CustomMessage request = new CustomMessage("test request", data);
node.startup();
CustomMessage response = networkHandler.send(InetAddress.getLocalHost(), 6002, request);
CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
assertThat(response, notNullValue());
assertThat(response.getCustomCommand(), is("test response"));
assertThat(response.getData(), is(data));
}
@Test(timeout = 5_000, expected = NodeException.class)
public void ensureCustomMessageWithoutResponsYieldsException() throws Exception {
@Test(expected = NodeException.class)
public void ensureCustomMessageWithoutResponseYieldsException() throws Exception {
byte[] data = cryptography().randomBytes(8);
data[0] = (byte) 0;
CustomMessage request = new CustomMessage("test request", data);
node.startup();
CustomMessage response = networkHandler.send(InetAddress.getLocalHost(), 6002, request);
CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
assertThat(response, notNullValue());
assertThat(response.getCustomCommand(), is("test response"));
assertThat(response.getData(), is(request.getData()));
}
@Test(timeout = 5_000)
@Test
public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception {
peerInventory.init(
"V4Pubkey.payload",
"V5Broadcast.payload"
"V4Pubkey.payload",
"V5Broadcast.payload"
);
nodeInventory.init(
"V1Msg.payload",
"V4Pubkey.payload"
"V1Msg.payload",
"V4Pubkey.payload"
);
Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001,
mock(NetworkHandler.MessageListener.class),
10);
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
future.get();
assertInventorySize(3, nodeInventory);
assertInventorySize(3, peerInventory);
}
@Test(timeout = 5_000)
@Test
public void ensureObjectsAreSynchronizedIfOnlyPeerHasObjects() throws Exception {
peerInventory.init(
"V4Pubkey.payload",
"V5Broadcast.payload"
"V4Pubkey.payload",
"V5Broadcast.payload"
);
nodeInventory.init();
Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001,
mock(NetworkHandler.MessageListener.class),
10);
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
future.get();
assertInventorySize(2, nodeInventory);
assertInventorySize(2, peerInventory);
}
@Test(timeout = 5_000)
@Test
public void ensureObjectsAreSynchronizedIfOnlyNodeHasObjects() throws Exception {
peerInventory.init();
nodeInventory.init(
"V1Msg.payload"
"V1Msg.payload"
);
Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001,
mock(NetworkHandler.MessageListener.class),
10);
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
future.get();
assertInventorySize(1, nodeInventory);
assertInventorySize(1, peerInventory);

View File

@ -14,10 +14,10 @@ sourceCompatibility = 1.8
dependencies {
compile project(':core')
compile 'org.flywaydb:flyway-core:3.2.1'
compile 'org.flywaydb:flyway-core:4.0.3'
testCompile 'junit:junit:4.12'
testCompile 'com.h2database:h2:1.4.190'
testCompile 'com.h2database:h2:1.4.192'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile project(path: ':core', configuration: 'testArtifacts')
testCompile project(':cryptography-bc')
}
}

View File

@ -81,8 +81,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
try (
Connection connection = config.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT count(*) FROM Message WHERE " + where
+ " ORDER BY received DESC")
ResultSet rs = stmt.executeQuery("SELECT count(*) FROM Message WHERE " + where)
) {
if (rs.next()) {
return rs.getInt(1);
@ -121,7 +120,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
builder.retries(rs.getInt("retries"));
builder.nextTry(rs.getLong("next_try"));
builder.labels(findLabels(connection,
"WHERE id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord"));
"id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord"));
Plaintext message = builder.build();
message.setInitialHash(rs.getBytes("initial_hash"));
result.add(message);

View File

@ -0,0 +1,167 @@
package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.ports.NodeRegistry;
import ch.dissem.bitmessage.utils.Collections;
import ch.dissem.bitmessage.utils.SqlStrings;
import ch.dissem.bitmessage.utils.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static ch.dissem.bitmessage.ports.NodeRegistryHelper.loadStableNodes;
import static ch.dissem.bitmessage.utils.UnixTime.*;
public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class);
private Map<Long, Set<NetworkAddress>> stableNodes;
public JdbcNodeRegistry(JdbcConfig config) {
super(config);
cleanUp();
}
private void cleanUp() {
try (
Connection connection = config.getConnection();
PreparedStatement ps = connection.prepareStatement(
"DELETE FROM Node WHERE time<?")
) {
ps.setLong(1, now(-28 * DAY));
ps.executeUpdate();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
private NetworkAddress loadExisting(NetworkAddress node) {
String query =
"SELECT stream, address, port, services, time" +
" FROM Node" +
" WHERE stream = " + node.getStream() +
" AND address = X'" + Strings.hex(node.getIPv6()) + "'" +
" AND port = " + node.getPort();
try (
Connection connection = config.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query)
) {
if (rs.next()) {
return new NetworkAddress.Builder()
.stream(rs.getLong("stream"))
.ipv6(rs.getBytes("address"))
.port(rs.getInt("port"))
.services(rs.getLong("services"))
.time(rs.getLong("time"))
.build();
} else {
return null;
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new ApplicationException(e);
}
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
List<NetworkAddress> result = new LinkedList<>();
String query =
"SELECT stream, address, port, services, time" +
" FROM Node WHERE stream IN (" + SqlStrings.join(streams) + ")" +
" ORDER BY TIME DESC" +
" LIMIT " + limit;
try (
Connection connection = config.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query)
) {
while (rs.next()) {
result.add(
new NetworkAddress.Builder()
.stream(rs.getLong("stream"))
.ipv6(rs.getBytes("address"))
.port(rs.getInt("port"))
.services(rs.getLong("services"))
.time(rs.getLong("time"))
.build()
);
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new ApplicationException(e);
}
if (result.isEmpty()) {
synchronized (this) {
if (stableNodes == null) {
stableNodes = loadStableNodes();
}
}
for (long stream : streams) {
Set<NetworkAddress> nodes = stableNodes.get(stream);
if (nodes != null && !nodes.isEmpty()) {
result.add(Collections.selectRandom(nodes));
}
}
}
return result;
}
@Override
public void offerAddresses(List<NetworkAddress> nodes) {
cleanUp();
nodes.stream()
.filter(node -> node.getTime() < now(+2 * MINUTE) && node.getTime() > now(-28 * DAY))
.forEach(node -> {
synchronized (this) {
NetworkAddress existing = loadExisting(node);
if (existing == null) {
insert(node);
} else if (node.getTime() > existing.getTime()) {
update(node);
}
}
});
}
private void insert(NetworkAddress node) {
try (
Connection connection = config.getConnection();
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO Node (stream, address, port, services, time) " +
"VALUES (?, ?, ?, ?, ?)")
) {
ps.setLong(1, node.getStream());
ps.setBytes(2, node.getIPv6());
ps.setInt(3, node.getPort());
ps.setLong(4, node.getServices());
ps.setLong(5, node.getTime());
ps.executeUpdate();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
private void update(NetworkAddress node) {
try (
Connection connection = config.getConnection();
PreparedStatement ps = connection.prepareStatement(
"UPDATE Node SET services=?, time=? WHERE stream=? AND address=? AND port=?")
) {
ps.setLong(1, node.getServices());
ps.setLong(2, node.getTime());
ps.setLong(3, node.getStream());
ps.setBytes(4, node.getIPv6());
ps.setInt(5, node.getPort());
ps.executeUpdate();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,9 @@
CREATE TABLE Node (
stream BIGINT NOT NULL,
address BINARY(32) NOT NULL,
port INT NOT NULL,
services BIGINT NOT NULL,
time BIGINT NOT NULL,
PRIMARY KEY (stream, address, port)
);
CREATE INDEX idx_time on Node(time);

View File

@ -17,8 +17,8 @@
package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.MemoryNodeRegistry;
import ch.dissem.bitmessage.ports.NodeRegistry;
import ch.dissem.bitmessage.utils.UnixTime;
import org.junit.Before;
import org.junit.Test;
@ -27,8 +27,15 @@ import java.util.Collections;
import java.util.List;
import static ch.dissem.bitmessage.utils.UnixTime.now;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
* Please note that some tests fail if there is no internet connection,
* as the initial nodes' IP addresses are determined by DNS lookup.
*/
public class JdbcNodeRegistryTest extends TestBase {
private TestJdbcConfig config;
private NodeRegistry registry;
@ -37,21 +44,26 @@ public class JdbcNodeRegistryTest extends TestBase {
public void setUp() throws Exception {
config = new TestJdbcConfig();
config.reset();
registry = new MemoryNodeRegistry();
registry = new JdbcNodeRegistry(config);
registry.offerAddresses(Arrays.asList(
createAddress(1, 8444, 1, now()),
createAddress(2, 8444, 1, now()),
createAddress(3, 8444, 1, now()),
createAddress(4, 8444, 2, now())
createAddress(1, 8444, 1, now()),
createAddress(2, 8444, 1, now()),
createAddress(3, 8444, 1, now()),
createAddress(4, 8444, 2, now())
));
}
@Test
public void testInitNodes() throws Exception {
public void ensureGetKnownNodesWithoutStreamsYieldsEmpty() {
assertThat(registry.getKnownAddresses(10), empty());
}
@Test
public void ensurePredefinedNodeIsReturnedWhenDatabaseIsEmpty() throws Exception {
config.reset();
List<NetworkAddress> knownAddresses = registry.getKnownAddresses(2, 1);
assertEquals(2, knownAddresses.size());
assertEquals(1, knownAddresses.size());
}
@Test
@ -66,16 +78,16 @@ public class JdbcNodeRegistryTest extends TestBase {
@Test
public void testOfferAddresses() throws Exception {
registry.offerAddresses(Arrays.asList(
createAddress(1, 8444, 1, now()),
createAddress(10, 8444, 1, now()),
createAddress(11, 8444, 1, now())
createAddress(1, 8444, 1, now()),
createAddress(10, 8444, 1, now()),
createAddress(11, 8444, 1, now())
));
List<NetworkAddress> knownAddresses = registry.getKnownAddresses(1000, 1);
assertEquals(5, knownAddresses.size());
registry.offerAddresses(Collections.singletonList(
createAddress(1, 8445, 1, now())
createAddress(1, 8445, 1, now())
));
knownAddresses = registry.getKnownAddresses(1000, 1);
@ -84,10 +96,10 @@ public class JdbcNodeRegistryTest extends TestBase {
private NetworkAddress createAddress(int lastByte, int port, long stream, long time) {
return new NetworkAddress.Builder()
.ipv6(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, lastByte)
.port(port)
.stream(stream)
.time(time)
.build();
.ipv6(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, lastByte)
.port(port)
.stream(stream)
.time(time)
.build();
}
}
}

View File

@ -13,7 +13,7 @@ uploadArchives {
dependencies {
compile project(':core')
compile 'org.ini4j:ini4j:0.5.4'
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile project(':cryptography-bc')
}