Added write(ByteBuffer) to Streamable interface and a first draft for a NioNetworkHandler

This commit is contained in:
Christian Basler 2016-05-28 10:22:47 +02:00
parent b8f88b02d1
commit 08f2d5d6f1
27 changed files with 561 additions and 79 deletions

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -47,10 +48,18 @@ public class Addr implements MessagePayload {
}
@Override
public void write(OutputStream stream) throws IOException {
Encode.varInt(addresses.size(), stream);
public void write(OutputStream out) throws IOException {
Encode.varInt(addresses.size(), out);
for (NetworkAddress address : addresses) {
address.write(stream);
address.write(out);
}
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(addresses.size(), buffer);
for (NetworkAddress address : addresses) {
address.write(buffer);
}
}

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.AccessCounter;
import ch.dissem.bitmessage.utils.Encode;
import java.io.*;
import java.nio.ByteBuffer;
import static ch.dissem.bitmessage.utils.Decode.bytes;
import static ch.dissem.bitmessage.utils.Decode.varString;
@ -85,6 +86,17 @@ public class CustomMessage implements MessagePayload {
}
}
@Override
public void write(ByteBuffer buffer) {
if (data != null) {
Encode.varString(command, buffer);
buffer.put(data);
} else {
throw new ApplicationException("Tried to write custom message without data. " +
"Programmer: did you forget to override #write()?");
}
}
public boolean isError() {
return COMMAND_ERROR.equals(command);
}

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
@ -55,6 +56,14 @@ public class GetData implements MessagePayload {
}
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(inventory.size(), buffer);
for (InventoryVector iv : inventory) {
iv.write(buffer);
}
}
public static final class Builder {
private List<InventoryVector> inventory = new LinkedList<>();

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
@ -53,6 +54,14 @@ public class Inv implements MessagePayload {
}
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(inventory.size(), buffer);
for (InventoryVector iv : inventory) {
iv.write(buffer);
}
}
public static final class Builder {
private List<InventoryVector> inventory = new LinkedList<>();

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
@ -74,9 +75,7 @@ public class NetworkMessage implements Streamable {
out.write('\0');
}
ByteArrayOutputStream payloadStream = new ByteArrayOutputStream();
payload.write(payloadStream);
byte[] payloadBytes = payloadStream.toByteArray();
byte[] payloadBytes = Encode.bytes(payload);
// Length of payload in number of bytes. Because of other restrictions, there is no reason why this length would
// ever be larger than 1600003 bytes. Some clients include a sanity-check to avoid processing messages which are
@ -93,4 +92,38 @@ public class NetworkMessage implements Streamable {
// message payload
out.write(payloadBytes);
}
@Override
public void write(ByteBuffer out) {
// magic
Encode.int32(MAGIC, out);
// ASCII string identifying the packet content, NULL padded (non-NULL padding results in packet rejected)
String command = payload.getCommand().name().toLowerCase();
try {
out.put(command.getBytes("ASCII"));
} catch (UnsupportedEncodingException e) {
throw new ApplicationException(e);
}
for (int i = command.length(); i < 12; i++) {
out.put((byte) 0);
}
byte[] payloadBytes = Encode.bytes(payload);
// Length of payload in number of bytes. Because of other restrictions, there is no reason why this length would
// ever be larger than 1600003 bytes. Some clients include a sanity-check to avoid processing messages which are
// larger than this.
Encode.int32(payloadBytes.length, out);
// checksum
try {
out.put(getChecksum(payloadBytes));
} catch (GeneralSecurityException e) {
throw new ApplicationException(e);
}
// message payload
out.put(payloadBytes);
}
}

View File

@ -29,6 +29,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
@ -168,6 +169,16 @@ public class ObjectMessage implements MessagePayload {
out.write(getPayloadBytesWithoutNonce());
}
@Override
public void write(ByteBuffer buffer) {
if (nonce == null) {
buffer.put(new byte[8]);
} else {
buffer.put(nonce);
}
buffer.put(getPayloadBytesWithoutNonce());
}
private void writeHeaderWithoutNonce(OutputStream out) throws IOException {
Encode.int64(expiresTime, out);
Encode.int32(objectType, out);

View File

@ -25,6 +25,7 @@ import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.utils.*;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Collections;
@ -197,12 +198,49 @@ public class Plaintext implements Streamable {
}
}
}
public void write(ByteBuffer buffer, boolean includeSignature) {
Encode.varInt(from.getVersion(), buffer);
Encode.varInt(from.getStream(), buffer);
Encode.int32(from.getPubkey().getBehaviorBitfield(), buffer);
buffer.put(from.getPubkey().getSigningKey(), 1, 64);
buffer.put(from.getPubkey().getEncryptionKey(), 1, 64);
if (from.getVersion() >= 3) {
Encode.varInt(from.getPubkey().getNonceTrialsPerByte(), buffer);
Encode.varInt(from.getPubkey().getExtraBytes(), buffer);
}
if (type == Type.MSG) {
buffer.put(to.getRipe());
}
Encode.varInt(encoding, buffer);
Encode.varInt(message.length, buffer);
buffer.put(message);
if (type == Type.MSG) {
if (to.has(Feature.DOES_ACK) && getAckMessage() != null) {
Encode.varBytes(Encode.bytes(getAckMessage()), buffer);
} else {
Encode.varInt(0, buffer);
}
}
if (includeSignature) {
if (signature == null) {
Encode.varInt(0, buffer);
} else {
Encode.varInt(signature.length, buffer);
buffer.put(signature);
}
}
}
@Override
public void write(OutputStream out) throws IOException {
write(out, true);
}
@Override
public void write(ByteBuffer buffer) {
write(buffer, true);
}
public Object getId() {
return id;
}

View File

@ -19,10 +19,13 @@ package ch.dissem.bitmessage.entity;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
/**
* An object that can be written to an {@link OutputStream}
*/
public interface Streamable extends Serializable {
void write(OutputStream stream) throws IOException;
void write(ByteBuffer buffer);
}

View File

@ -18,6 +18,7 @@ package ch.dissem.bitmessage.entity;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* The 'verack' command answers a 'version' command, accepting the other node's version.
@ -34,4 +35,9 @@ public class VerAck implements MessagePayload {
public void write(OutputStream stream) throws IOException {
// 'verack' doesn't have any payload, so there is nothing to write
}
@Override
public void write(ByteBuffer buffer) {
// 'verack' doesn't have any payload, so there is nothing to write
}
}

View File

@ -23,6 +23,7 @@ import ch.dissem.bitmessage.utils.UnixTime;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Random;
/**
@ -134,6 +135,18 @@ public class Version implements MessagePayload {
Encode.varIntList(streams, stream);
}
@Override
public void write(ByteBuffer buffer) {
Encode.int32(version, buffer);
Encode.int64(services, buffer);
Encode.int64(timestamp, buffer);
addrRecv.write(buffer, true);
addrFrom.write(buffer, true);
Encode.int64(nonce, buffer);
Encode.varString(userAgent, buffer);
Encode.varIntList(streams, buffer);
}
public static final class Builder {
private int version;

View File

@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static ch.dissem.bitmessage.entity.valueobject.PrivateKey.PRIVATE_KEY_SIZE;
@ -144,12 +145,29 @@ public class CryptoBox implements Streamable {
out.write(x, offset, length);
}
private void writeCoordinateComponent(ByteBuffer buffer, byte[] x) {
int offset = Bytes.numberOfLeadingZeros(x);
int length = x.length - offset;
Encode.int16(length, buffer);
buffer.put(x, offset, length);
}
@Override
public void write(OutputStream stream) throws IOException {
writeWithoutMAC(stream);
stream.write(mac);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(initializationVector);
Encode.int16(curveType, buffer);
writeCoordinateComponent(buffer, Points.getX(R));
writeCoordinateComponent(buffer, Points.getY(R));
buffer.put(encrypted);
buffer.put(mac);
}
public static final class Builder {
private byte[] initializationVector;
private int curveType;

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.utils.Decode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
@ -62,6 +63,11 @@ public class GenericPayload extends ObjectPayload {
stream.write(data);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(data);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Decode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* Request for a public key.
@ -73,4 +74,9 @@ public class GetPubkey extends ObjectPayload {
public void write(OutputStream stream) throws IOException {
stream.write(ripeTag);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(ripeTag);
}
}

View File

@ -24,6 +24,7 @@ import ch.dissem.bitmessage.exception.DecryptionFailedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
@ -111,6 +112,12 @@ public class Msg extends ObjectPayload implements Encrypted, PlaintextHolder {
encrypted.write(out);
}
@Override
public void write(ByteBuffer buffer) {
if (encrypted == null) throw new IllegalStateException("Msg must be signed and encrypted before writing it.");
encrypted.write(buffer);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -18,6 +18,7 @@ package ch.dissem.bitmessage.entity.payload;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
@ -60,6 +61,10 @@ public abstract class Pubkey extends ObjectPayload {
write(out);
}
public void writeUnencrypted(ByteBuffer buffer){
write(buffer);
}
protected byte[] add0x04(byte[] key) {
if (key.length == 65) return key;
byte[] result = new byte[65];

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* A version 2 public key.
@ -86,10 +87,17 @@ public class V2Pubkey extends Pubkey {
}
@Override
public void write(OutputStream os) throws IOException {
Encode.int32(behaviorBitfield, os);
os.write(publicSigningKey, 1, 64);
os.write(publicEncryptionKey, 1, 64);
public void write(OutputStream out) throws IOException {
Encode.int32(behaviorBitfield, out);
out.write(publicSigningKey, 1, 64);
out.write(publicEncryptionKey, 1, 64);
}
@Override
public void write(ByteBuffer buffer) {
Encode.int32(behaviorBitfield, buffer);
buffer.put(publicSigningKey, 1, 64);
buffer.put(publicEncryptionKey, 1, 64);
}
public static class Builder {

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.entity.Plaintext;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* Users who are subscribed to the sending address will see the message appear in their inbox.
@ -58,4 +59,9 @@ public class V4Broadcast extends Broadcast {
public void write(OutputStream out) throws IOException {
encrypted.write(out);
}
@Override
public void write(ByteBuffer buffer) {
encrypted.write(buffer);
}
}

View File

@ -24,6 +24,7 @@ import ch.dissem.bitmessage.utils.Decode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
@ -85,11 +86,22 @@ public class V4Pubkey extends Pubkey implements Encrypted {
encrypted.write(stream);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(tag);
encrypted.write(buffer);
}
@Override
public void writeUnencrypted(OutputStream out) throws IOException {
decrypted.write(out);
}
@Override
public void writeUnencrypted(ByteBuffer buffer) {
decrypted.write(buffer);
}
@Override
public void writeBytesToSign(OutputStream out) throws IOException {
out.write(tag);

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Strings;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class InventoryVector implements Streamable, Serializable {
@ -56,8 +57,13 @@ public class InventoryVector implements Streamable, Serializable {
}
@Override
public void write(OutputStream stream) throws IOException {
stream.write(hash);
public void write(OutputStream out) throws IOException {
out.write(hash);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(hash);
}
@Override

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
@ -119,14 +120,29 @@ public class NetworkAddress implements Streamable {
write(stream, false);
}
public void write(OutputStream stream, boolean light) throws IOException {
public void write(OutputStream out, boolean light) throws IOException {
if (!light) {
Encode.int64(time, stream);
Encode.int32(this.stream, stream);
Encode.int64(time, out);
Encode.int32(stream, out);
}
Encode.int64(services, stream);
stream.write(ipv6);
Encode.int16(port, stream);
Encode.int64(services, out);
out.write(ipv6);
Encode.int16(port, out);
}
@Override
public void write(ByteBuffer buffer) {
write(buffer, false);
}
public void write(ByteBuffer buffer, boolean light) {
if (!light) {
Encode.int64(time, buffer);
Encode.int32(stream, buffer);
}
Encode.int64(services, buffer);
buffer.put(ipv6);
Encode.int16(port, buffer);
}
public static final class Builder {

View File

@ -27,6 +27,7 @@ import ch.dissem.bitmessage.utils.Decode;
import ch.dissem.bitmessage.utils.Encode;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -113,24 +114,20 @@ public class PrivateKey implements Streamable {
}
Builder generate() {
try {
long signingKeyNonce = nextNonce;
long encryptionKeyNonce = nextNonce + 1;
byte[] ripe;
do {
privEK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(encryptionKeyNonce)), 32);
privSK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(signingKeyNonce)), 32);
pubSK = cryptography().createPublicKey(privSK);
pubEK = cryptography().createPublicKey(privEK);
ripe = cryptography().ripemd160(cryptography().sha512(pubSK, pubEK));
long signingKeyNonce = nextNonce;
long encryptionKeyNonce = nextNonce + 1;
byte[] ripe;
do {
privEK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(encryptionKeyNonce)), 32);
privSK = Bytes.truncate(cryptography().sha512(seed, Encode.varInt(signingKeyNonce)), 32);
pubSK = cryptography().createPublicKey(privSK);
pubEK = cryptography().createPublicKey(privEK);
ripe = cryptography().ripemd160(cryptography().sha512(pubSK, pubEK));
signingKeyNonce += 2;
encryptionKeyNonce += 2;
} while (ripe[0] != 0 || (shorter && ripe[1] != 0));
nextNonce = signingKeyNonce;
} catch (IOException e) {
throw new ApplicationException(e);
}
signingKeyNonce += 2;
encryptionKeyNonce += 2;
} while (ripe[0] != 0 || (shorter && ripe[1] != 0));
nextNonce = signingKeyNonce;
return this;
}
}
@ -182,4 +179,20 @@ public class PrivateKey implements Streamable {
Encode.varInt(privateEncryptionKey.length, out);
out.write(privateEncryptionKey);
}
@Override
public void write(ByteBuffer buffer) {
Encode.varInt(pubkey.getVersion(), buffer);
Encode.varInt(pubkey.getStream(), buffer);
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
pubkey.writeUnencrypted(baos);
Encode.varBytes(baos.toByteArray(), buffer);
} catch (IOException e) {
throw new ApplicationException(e);
}
Encode.varBytes(privateSigningKey, buffer);
Encode.varBytes(privateEncryptionKey, buffer);
}
}

View File

@ -29,6 +29,8 @@ import java.util.concurrent.Future;
* Handles incoming messages
*/
public interface NetworkHandler {
int NETWORK_MAGIC_NUMBER = 8;
/**
* Connects to the trusted host, fetches and offers new messages and disconnects afterwards.
* <p>

View File

@ -17,10 +17,13 @@
package ch.dissem.bitmessage.utils;
import ch.dissem.bitmessage.entity.Streamable;
import ch.dissem.bitmessage.exception.ApplicationException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import static ch.dissem.bitmessage.utils.AccessCounter.inc;
@ -37,62 +40,52 @@ public class Encode {
}
}
public static void varIntList(long[] values, ByteBuffer buffer) {
varInt(values.length, buffer);
for (long value : values) {
varInt(value, buffer);
}
}
public static void varInt(long value, OutputStream stream) throws IOException {
varInt(value, stream, null);
}
public static byte[] varInt(long value) throws IOException {
final byte[] result;
public static void varInt(long value, ByteBuffer buffer) {
if (value < 0) {
// This is due to the fact that Java doesn't really support unsigned values.
// Please be aware that this might be an error due to a smaller negative value being cast to long.
// Normally, negative values shouldn't occur within the protocol, and I large enough longs
// to being recognized as negatives aren't realistic.
ByteBuffer buffer = ByteBuffer.allocate(9);
// Normally, negative values shouldn't occur within the protocol, and longs large enough for being
// recognized as negatives aren't realistic.
buffer.put((byte) 0xff);
result = buffer.putLong(value).array();
buffer.putLong(value);
} else if (value < 0xfd) {
result = new byte[]{(byte) value};
buffer.put((byte) value);
} else if (value <= 0xffffL) {
ByteBuffer buffer = ByteBuffer.allocate(3);
buffer.put((byte) 0xfd);
result = buffer.putShort((short) value).array();
buffer.putShort((short) value);
} else if (value <= 0xffffffffL) {
ByteBuffer buffer = ByteBuffer.allocate(5);
buffer.put((byte) 0xfe);
result = buffer.putInt((int) value).array();
buffer.putInt((int) value);
} else {
ByteBuffer buffer = ByteBuffer.allocate(9);
buffer.put((byte) 0xff);
result = buffer.putLong(value).array();
buffer.putLong(value);
}
return result;
}
public static byte[] varInt(long value) {
ByteBuffer buffer = ByteBuffer.allocate(9);
varInt(value, buffer);
buffer.flip();
return Bytes.truncate(buffer.array(), buffer.limit());
}
public static void varInt(long value, OutputStream stream, AccessCounter counter) throws IOException {
if (value < 0) {
// This is due to the fact that Java doesn't really support unsigned values.
// Please be aware that this might be an error due to a smaller negative value being cast to long.
// Normally, negative values shouldn't occur within the protocol, and I large enough longs
// to being recognized as negatives aren't realistic.
stream.write(0xff);
inc(counter);
int64(value, stream, counter);
} else if (value < 0xfd) {
int8(value, stream, counter);
} else if (value <= 0xffffL) {
stream.write(0xfd);
inc(counter);
int16(value, stream, counter);
} else if (value <= 0xffffffffL) {
stream.write(0xfe);
inc(counter);
int32(value, stream, counter);
} else {
stream.write(0xff);
inc(counter);
int64(value, stream, counter);
}
ByteBuffer buffer = ByteBuffer.allocate(9);
varInt(value, buffer);
buffer.flip();
stream.write(buffer.array(), 0, buffer.limit());
inc(counter, buffer.limit());
}
public static void int8(long value, OutputStream stream) throws IOException {
@ -113,6 +106,10 @@ public class Encode {
inc(counter, 2);
}
public static void int16(long value, ByteBuffer buffer) {
buffer.putShort((short) value);
}
public static void int32(long value, OutputStream stream) throws IOException {
int32(value, stream, null);
}
@ -122,6 +119,10 @@ public class Encode {
inc(counter, 4);
}
public static void int32(long value, ByteBuffer buffer) {
buffer.putInt((int) value);
}
public static void int64(long value, OutputStream stream) throws IOException {
int64(value, stream, null);
}
@ -131,6 +132,10 @@ public class Encode {
inc(counter, 8);
}
public static void int64(long value, ByteBuffer buffer) {
buffer.putLong(value);
}
public static void varString(String value, OutputStream out) throws IOException {
byte[] bytes = value.getBytes("utf-8");
// Technically, it says the length in characters, but I think this one might be correct.
@ -140,23 +145,44 @@ public class Encode {
out.write(bytes);
}
public static void varString(String value, ByteBuffer buffer) {
try {
byte[] bytes = value.getBytes("utf-8");
// Technically, it says the length in characters, but I think this one might be correct.
// It doesn't really matter, as only ASCII characters are being used.
// see also Decode#varString()
buffer.put(varInt(bytes.length));
buffer.put(bytes);
} catch (UnsupportedEncodingException e) {
throw new ApplicationException(e);
}
}
public static void varBytes(byte[] data, OutputStream out) throws IOException {
varInt(data.length, out);
out.write(data);
}
public static void varBytes(byte[] data, ByteBuffer buffer) {
varInt(data.length, buffer);
buffer.put(data);
}
/**
* Serializes a {@link Streamable} object and returns the byte array.
*
* @param streamable the object to be serialized
* @return an array of bytes representing the given streamable object.
* @throws IOException if an I/O error occurs.
*/
public static byte[] bytes(Streamable streamable) throws IOException {
public static byte[] bytes(Streamable streamable) {
if (streamable == null) return null;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
streamable.write(stream);
try {
streamable.write(stream);
} catch (IOException e) {
throw new ApplicationException(e);
}
return stream.toByteArray();
}
@ -164,11 +190,14 @@ public class Encode {
* @param streamable the object to be serialized
* @param padding the result will be padded such that its length is a multiple of <em>padding</em>
* @return the bytes of the given {@link Streamable} object, 0-padded such that the final length is x*padding.
* @throws IOException if an I/O error occurs.
*/
public static byte[] bytes(Streamable streamable, int padding) throws IOException {
public static byte[] bytes(Streamable streamable, int padding) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
streamable.write(stream);
try {
streamable.write(stream);
} catch (IOException e) {
throw new ApplicationException(e);
}
int offset = padding - stream.size() % padding;
int length = stream.size() + offset;
byte[] result = new byte[length];

View File

@ -24,6 +24,7 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static ch.dissem.bitmessage.utils.Decode.*;
@ -83,6 +84,13 @@ public class ProofOfWorkRequest implements Streamable {
Encode.varBytes(data, out);
}
@Override
public void write(ByteBuffer buffer) {
buffer.put(initialHash);
Encode.varString(request.name(), buffer);
Encode.varBytes(data, buffer);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -45,7 +45,6 @@ import static java.util.Collections.newSetFromMap;
* Handles all the networky stuff.
*/
public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
public final static int NETWORK_MAGIC_NUMBER = 8;
final Collection<Connection> connections = new ConcurrentLinkedQueue<>();
private final ExecutorService pool = Executors.newCachedThreadPool(

View File

@ -0,0 +1,51 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.entity.MessagePayload;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
/**
* Created by chrig on 27.05.2016.
*/
public class ConnectionInfo {
private State state;
private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
private ByteBuffer in = ByteBuffer.allocate(10);
private ByteBuffer out = ByteBuffer.allocate(10);
public State getState() {
return state;
}
public Queue<MessagePayload> getSendingQueue() {
return sendingQueue;
}
public ByteBuffer getInBuffer() {
return in;
}
public ByteBuffer getOutBuffer() {
return out;
}
public enum State {CONNECTING, ACTIVE, DISCONNECTED}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.Property;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Future;
import static java.nio.channels.SelectionKey.*;
/**
* Network handler using java.nio, resulting in less threads.
*/
public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder {
private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class);
private InternalContext ctx;
private Selector selector;
@Override
public Future<?> synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds) {
return null;
}
@Override
public CustomMessage send(InetAddress server, int port, CustomMessage request) {
return null;
}
@Override
public void start(MessageListener listener) {
if (listener == null) {
throw new IllegalStateException("Listener must be set at start");
}
if (selector != null && selector.isOpen()) {
throw new IllegalStateException("Network already running - you need to stop first.");
}
try {
final Set<InventoryVector> requestedObjects = new HashSet<>();
selector = Selector.open();
{
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(ctx.getPort()));
server.register(selector, OP_ACCEPT);
}
while (selector.isOpen()) {
// TODO: establish outgoing connections
selector.select();
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept();
accepted.configureBlocking(false);
accepted.register(selector, OP_READ | OP_WRITE).attach(new ConnectionInfo());
}
if (key.attachment() instanceof ConnectionInfo) {
SocketChannel channel = (SocketChannel) key.channel();
ConnectionInfo connection = (ConnectionInfo) key.attachment();
if (key.isWritable()) {
if (connection.getOutBuffer().hasRemaining()) {
channel.write(connection.getOutBuffer());
}
while (!connection.getOutBuffer().hasRemaining() && !connection.getSendingQueue().isEmpty()) {
MessagePayload payload = connection.getSendingQueue().poll();
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
}
new NetworkMessage(payload).write(connection.getOutBuffer());
}
}
if (key.isReadable()) {
// TODO
channel.read(connection.getInBuffer());
}
}
keyIterator.remove();
}
}
selector.close();
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public void stop() {
}
@Override
public void offer(InventoryVector iv) {
}
@Override
public Property getNetworkStatus() {
return null;
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void setContext(InternalContext context) {
this.ctx = context;
}
}