diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java index e239e0b..05d2e36 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java @@ -18,10 +18,11 @@ package ch.dissem.bitmessage.demo; import ch.dissem.bitmessage.Context; import ch.dissem.bitmessage.entity.payload.ObjectPayload; -import ch.dissem.bitmessage.inventory.SimpleAddressRepository; -import ch.dissem.bitmessage.inventory.SimpleInventory; +import ch.dissem.bitmessage.inventory.DatabaseRepository; import ch.dissem.bitmessage.networking.NetworkNode; import ch.dissem.bitmessage.ports.NetworkHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; @@ -31,14 +32,18 @@ import java.io.InputStreamReader; * Created by chris on 06.04.15. */ public class Main { + private final static Logger LOG = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws IOException { NetworkNode networkNode = new NetworkNode(); - Context.init(new SimpleInventory(), new SimpleAddressRepository(), networkNode, 48444); + DatabaseRepository repo = new DatabaseRepository(); + Context.init(repo, repo, networkNode, 48444); Context.getInstance().addStream(1); networkNode.setListener(new NetworkHandler.MessageListener() { @Override public void receive(ObjectPayload payload) { - // TODO +// LOG.info("message received: " + payload); +// System.out.print('.'); } }); networkNode.start(); diff --git a/domain/src/main/java/ch/dissem/bitmessage/Context.java b/domain/src/main/java/ch/dissem/bitmessage/Context.java index e09f1ad..2474779 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/Context.java +++ b/domain/src/main/java/ch/dissem/bitmessage/Context.java @@ -21,8 +21,6 @@ import ch.dissem.bitmessage.ports.Inventory; import ch.dissem.bitmessage.ports.NetworkHandler; import java.util.Collection; -import java.util.LinkedList; -import java.util.List; import java.util.TreeSet; /** @@ -41,6 +39,9 @@ public class Context { private int port; + private long networkNonceTrialsPerByte = 1000; + private long networkExtraBytes = 1000; + private Context(Inventory inventory, AddressRepository addressRepo, NetworkHandler networkHandler, int port) { this.inventory = inventory; @@ -85,4 +86,12 @@ public class Context { public void removeStream(long stream) { streams.remove(stream); } + + public long getNetworkNonceTrialsPerByte() { + return networkNonceTrialsPerByte; + } + + public long getNetworkExtraBytes() { + return networkExtraBytes; + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java b/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java index efe708d..51a388d 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java @@ -18,7 +18,9 @@ package ch.dissem.bitmessage.entity; import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.utils.Bytes; import ch.dissem.bitmessage.utils.Encode; +import ch.dissem.bitmessage.utils.Security; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -35,7 +37,7 @@ public class ObjectMessage implements MessagePayload { * The object's version */ private long version; - private long streamNumber; + private long stream; private ObjectPayload payload; private byte[] payloadBytes; @@ -45,7 +47,7 @@ public class ObjectMessage implements MessagePayload { expiresTime = builder.expiresTime; objectType = builder.objectType; version = builder.version; - streamNumber = builder.streamNumber; + stream = builder.streamNumber; payload = builder.payload; } @@ -58,6 +60,10 @@ public class ObjectMessage implements MessagePayload { return nonce; } + public void setNonce(byte[] nonce) { + this.nonce = nonce; + } + public long getExpiresTime() { return expiresTime; } @@ -66,34 +72,33 @@ public class ObjectMessage implements MessagePayload { return payload; } - public InventoryVector getInventoryVector() { - // TODO - return null; + public long getStream() { + return stream; + } + + public InventoryVector getInventoryVector() throws IOException { + return new InventoryVector(Bytes.truncate(Security.doubleSha512(nonce, getPayloadBytesWithoutNonce()), 32)); } @Override public void write(OutputStream stream) throws IOException { stream.write(nonce); - stream.write(getPayloadBytes()); + stream.write(getPayloadBytesWithoutNonce()); } - public byte[] getPayloadBytes() throws IOException { + public byte[] getPayloadBytesWithoutNonce() throws IOException { if (payloadBytes == null) { ByteArrayOutputStream stream = new ByteArrayOutputStream(); Encode.int64(expiresTime, stream); Encode.int32(objectType, stream); Encode.varInt(version, stream); - Encode.varInt(streamNumber, stream); + Encode.varInt(this.stream, stream); payload.write(stream); payloadBytes = stream.toByteArray(); } return payloadBytes; } - public void setNonce(byte[] nonce) { - this.nonce = nonce; - } - public static final class Builder { private byte[] nonce; private long expiresTime; diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java index af950a8..e32cbe9 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java @@ -69,7 +69,7 @@ public class Version implements MessagePayload { * The stream numbers that the emitting node is interested in. Sending nodes must not include more than 160000 * stream numbers. */ - private final long[] streamNumbers; + private final long[] streams; private Version(Builder builder) { version = builder.version; @@ -79,7 +79,7 @@ public class Version implements MessagePayload { addrFrom = builder.addrFrom; nonce = builder.nonce; userAgent = builder.userAgent; - streamNumbers = builder.streamNumbers; + streams = builder.streamNumbers; } public int getVersion() { @@ -111,7 +111,7 @@ public class Version implements MessagePayload { } public long[] getStreams() { - return streamNumbers; + return streams; } @Override @@ -128,7 +128,7 @@ public class Version implements MessagePayload { addrFrom.write(stream, true); Encode.int64(nonce, stream); Encode.varString(userAgent, stream); - Encode.varIntList(streamNumbers, stream); + Encode.varIntList(streams, stream); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java index 80caf94..5bfedde 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java @@ -20,6 +20,17 @@ package ch.dissem.bitmessage.entity.payload; * Public keys for signing and encryption, the answer to a 'getpubkey' request. */ public interface Pubkey extends ObjectPayload { + // bits 0 through 29 are yet undefined + /** + * Receiving node expects that the RIPE hash encoded in their address preceedes the encrypted message data of msg + * messages bound for them. + */ + int FEATURE_INCLUDE_DESTINATION = 30; + /** + * If true, the receiving node does send acknowledgements (rather than dropping them). + */ + int FEATURE_DOES_ACK = 31; + long getVersion(); byte[] getSigningKey(); diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java index 4cd1b62..2b56e62 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/InventoryVector.java @@ -17,6 +17,7 @@ package ch.dissem.bitmessage.entity.valueobject; import ch.dissem.bitmessage.entity.Streamable; +import ch.dissem.bitmessage.utils.Strings; import java.io.IOException; import java.io.OutputStream; @@ -30,6 +31,10 @@ public class InventoryVector implements Streamable { */ private final byte[] hash; + public byte[] getHash() { + return hash; + } + public InventoryVector(byte[] hash) { this.hash = hash; } @@ -38,4 +43,9 @@ public class InventoryVector implements Streamable { public void write(OutputStream stream) throws IOException { stream.write(hash); } + + @Override + public String toString() { + return Strings.hex(hash).toString(); + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java index a08e31d..0be2d7e 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java @@ -56,10 +56,26 @@ public class NetworkAddress implements Streamable { port = builder.port; } + public byte[] getIPv6() { + return ipv6; + } + public int getPort() { return port; } + public long getServices() { + return services; + } + + public long getStream() { + return stream; + } + + public long getTime() { + return time; + } + public InetAddress toInetAddress() { try { return InetAddress.getByAddress(ipv6); diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java index 0212ca3..8650651 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java @@ -17,6 +17,7 @@ package ch.dissem.bitmessage.factory; import ch.dissem.bitmessage.entity.NetworkMessage; +import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.utils.Decode; import org.slf4j.Logger; @@ -24,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.net.SocketTimeoutException; /** * Creates {@link NetworkMessage} objects from {@link InputStream InputStreams} @@ -31,8 +33,24 @@ import java.io.InputStream; public class Factory { public static final Logger LOG = LoggerFactory.getLogger(Factory.class); - public static NetworkMessage getNetworkMessage(int version, InputStream stream) throws IOException { - return new V3MessageFactory().read(stream); + public static NetworkMessage getNetworkMessage(int version, InputStream stream) throws SocketTimeoutException { + try { + return new V3MessageFactory().read(stream); + } catch (SocketTimeoutException e) { + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + public static ObjectMessage getObjectMessage(int version, InputStream stream, int length) { + try { + return new V3MessageFactory().readObject(stream, length); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return null; + } } static ObjectPayload getObjectPayload(long objectType, long version, long streamNumber, InputStream stream, int length) throws IOException { @@ -46,11 +64,12 @@ public class Factory { return parseMsg((int) version, streamNumber, stream, length); case 3: // broadcast return parseBroadcast((int) version, streamNumber, stream, length); + default: + LOG.error("This should not happen, someone broke something in the code!"); } - LOG.error("This should not happen, someone broke something in the code!"); } // fallback: just store the message - we don't really care what it is - LOG.error("Unexpected object type: " + objectType); + LOG.warn("Unexpected object type: " + objectType); return new GenericPayload(streamNumber, Decode.bytes(stream, length)); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java index bb6143f..a5d1798 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java @@ -20,6 +20,7 @@ import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.utils.AccessCounter; import ch.dissem.bitmessage.utils.Decode; import ch.dissem.bitmessage.utils.Security; import org.slf4j.Logger; @@ -68,21 +69,22 @@ class V3MessageFactory { case "getdata": return parseGetData(stream); case "object": - return parseObject(stream, length); + return readObject(stream, length); default: LOG.debug("Unknown command: " + command); return null; } } - private ObjectMessage parseObject(InputStream stream, int length) throws IOException { - byte nonce[] = Decode.bytes(stream, 8); - long expiresTime = Decode.int64(stream); - long objectType = Decode.uint32(stream); - long version = Decode.varInt(stream); - long streamNumber = Decode.varInt(stream); + public ObjectMessage readObject(InputStream stream, int length) throws IOException { + AccessCounter counter = new AccessCounter(); + byte nonce[] = Decode.bytes(stream, 8, counter); + long expiresTime = Decode.int64(stream, counter); + long objectType = Decode.uint32(stream, counter); + long version = Decode.varInt(stream, counter); + long streamNumber = Decode.varInt(stream, counter); - ObjectPayload payload = Factory.getObjectPayload(objectType, version, streamNumber, stream, length); + ObjectPayload payload = Factory.getObjectPayload(objectType, version, streamNumber, stream, length-counter.length()); return new ObjectMessage.Builder() .nonce(nonce) diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java index 310a7bb..837e865 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java @@ -27,11 +27,11 @@ import java.util.List; public interface Inventory { List getInventory(long... streams); - List getMissing(List offer); + List getMissing(List offer, long... streams); ObjectMessage getObject(InventoryVector vector); - void storeObject(ObjectMessage object); + void storeObject(int version, ObjectMessage object); void cleanup(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/AccessCounter.java b/domain/src/main/java/ch/dissem/bitmessage/utils/AccessCounter.java new file mode 100644 index 0000000..aa662fa --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/AccessCounter.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +/** + * Created by chris on 13.04.15. + */ +public class AccessCounter { + private int count; + + public static void inc(AccessCounter counter) { + if (counter != null) counter.inc(); + } + + public static void inc(AccessCounter counter, int count) { + if (counter != null) counter.inc(count); + } + + private void inc() { + count++; + } + + private void inc(int length) { + count += length; + } + + public int length() { + return count; + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Bytes.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Bytes.java index bcd505d..b83be90 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Bytes.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Bytes.java @@ -63,4 +63,10 @@ public class Bytes { System.arraycopy(source, 0, result, size - source.length, source.length); return result; } + + public static byte[] truncate(byte[] source, int size) { + byte[] result = new byte[size]; + System.arraycopy(source, 0, result, 0, size); + return result; + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java index e70305c..20a18f7 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java @@ -20,17 +20,28 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import static ch.dissem.bitmessage.utils.AccessCounter.inc; + /** * This class handles decoding simple types from byte stream, according to * https://bitmessage.org/wiki/Protocol_specification#Common_structures */ public class Decode { public static byte[] bytes(InputStream stream, int count) throws IOException { + return bytes(stream, count, null); + } + + public static byte[] bytes(InputStream stream, int count, AccessCounter counter) throws IOException { byte[] result = new byte[count]; int off = 0; while (off < count) { - off += stream.read(result, off, count - off); + int read = stream.read(result, off, count - off); + if (read < 0) { + throw new IOException("Unexpected end of stream, wanted to read " + count + " bytes but only got " + off); + } + off += read; } + inc(counter, count); return result; } @@ -45,14 +56,19 @@ public class Decode { } public static long varInt(InputStream stream) throws IOException { + return varInt(stream, null); + } + + public static long varInt(InputStream stream, AccessCounter counter) throws IOException { int first = stream.read(); + inc(counter); switch (first) { case 0xfd: - return uint16(stream); + return uint16(stream, counter); case 0xfe: - return uint32(stream); + return uint32(stream, counter); case 0xff: - return int64(stream); + return int64(stream, counter); default: return first; } @@ -63,10 +79,20 @@ public class Decode { } public static int uint16(InputStream stream) throws IOException { + return uint16(stream, null); + } + + public static int uint16(InputStream stream, AccessCounter counter) throws IOException { + inc(counter, 2); return stream.read() * 256 + stream.read(); } public static long uint32(InputStream stream) throws IOException { + return uint32(stream, null); + } + + public static long uint32(InputStream stream, AccessCounter counter) throws IOException { + inc(counter, 4); return stream.read() * 16777216L + stream.read() * 65536L + stream.read() * 256L + stream.read(); } @@ -75,6 +101,11 @@ public class Decode { } public static long int64(InputStream stream) throws IOException { + return int64(stream, null); + } + + public static long int64(InputStream stream, AccessCounter counter) throws IOException { + inc(counter, 8); return ByteBuffer.wrap(bytes(stream, 8)).getLong(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Encode.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Encode.java index d98d2b9..6669e3a 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Encode.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Encode.java @@ -22,7 +22,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.Arrays; + +import static ch.dissem.bitmessage.utils.AccessCounter.inc; /** * This class handles encoding simple types from byte stream, according to @@ -37,41 +38,69 @@ public class Encode { } public static void varInt(long value, OutputStream stream) throws IOException { + varInt(value, stream, null); + } + + public static void varInt(long value, OutputStream stream, AccessCounter counter) throws IOException { if (value < 0) { // This is due to the fact that Java doesn't really support unsigned values. // Please be aware that this might be an error due to a smaller negative value being cast to long. // Normally, negative values shouldn't occur within the protocol, and I large enough longs // to being recognized as negatives aren't realistic. stream.write(0xff); - int64(value, stream); + inc(counter); + int64(value, stream, counter); } else if (value < 0xfd) { - int8(value, stream); + int8(value, stream, counter); } else if (value <= 0xffffL) { stream.write(0xfd); - int16(value, stream); + inc(counter); + int16(value, stream, counter); } else if (value <= 0xffffffffL) { stream.write(0xfe); - int32(value, stream); + inc(counter); + int32(value, stream, counter); } else { stream.write(0xff); - int64(value, stream); + inc(counter); + int64(value, stream, counter); } } public static void int8(long value, OutputStream stream) throws IOException { + int8(value, stream, null); + } + + public static void int8(long value, OutputStream stream, AccessCounter counter) throws IOException { stream.write((int) value); + inc(counter); } public static void int16(long value, OutputStream stream) throws IOException { + int16(value, stream, null); + } + + public static void int16(long value, OutputStream stream, AccessCounter counter) throws IOException { stream.write(ByteBuffer.allocate(4).putInt((int) value).array(), 2, 2); + inc(counter, 2); } public static void int32(long value, OutputStream stream) throws IOException { + int32(value, stream, null); + } + + public static void int32(long value, OutputStream stream, AccessCounter counter) throws IOException { stream.write(ByteBuffer.allocate(4).putInt((int) value).array()); + inc(counter, 4); } public static void int64(long value, OutputStream stream) throws IOException { + int64(value, stream, null); + } + + public static void int64(long value, OutputStream stream, AccessCounter counter) throws IOException { stream.write(ByteBuffer.allocate(8).putLong(value).array()); + inc(counter, 8); } public static void varString(String value, OutputStream stream) throws IOException { diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java index fc0ab8e..30cfef3 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java @@ -69,20 +69,12 @@ public class Security { } public static void doProofOfWork(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException { - // payload = embeddedTime + encodedObjectVersion + encodedStreamNumber + encrypted - byte[] payload = object.getPayloadBytes(); - // payloadLength = the length of payload, in bytes, + 8 (to account for the nonce which we will append later) - // TTL = the number of seconds in between now and the object expiresTime. - // initialHash = hash(payload) byte[] initialHash = getInitialHash(object); byte[] target = getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes); - if (target.length < 8) { - target = Bytes.expand(target, 8); - } // also start with nonce = 0 where nonce is 8 bytes in length and can be hashed as if it is a string. byte[] nonce = new byte[8]; - MessageDigest mda = null; + MessageDigest mda; try { mda = MessageDigest.getInstance("SHA-512"); } catch (NoSuchAlgorithmException e) { @@ -100,30 +92,25 @@ public class Security { * @throws IOException if proof of work doesn't check out */ public static void checkProofOfWork(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException { - // nonce = the first 8 bytes of payload - byte[] nonce = object.getNonce(); - byte[] initialHash = getInitialHash(object); - // resultHash = hash(hash( nonce || initialHash )) - byte[] resultHash = Security.doubleSha512(nonce, initialHash); - // POWValue = the first eight bytes of resultHash converted to an integer - byte[] powValue = bytes(resultHash, 8); - - if (Bytes.lt(getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes), powValue)) { + if (Bytes.lt( + getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes), + Security.doubleSha512(object.getNonce(), getInitialHash(object)), + 8)) { throw new IOException("Insufficient proof of work"); } } private static byte[] getInitialHash(ObjectMessage object) throws IOException { - return Security.sha512(object.getPayloadBytes()); + return Security.sha512(object.getPayloadBytesWithoutNonce()); } private static byte[] getProofOfWorkTarget(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException { BigInteger TTL = BigInteger.valueOf(object.getExpiresTime() - (System.currentTimeMillis() / 1000)); LOG.debug("TTL: " + TTL + "s"); BigInteger numerator = TWO.pow(64); - BigInteger powLength = BigInteger.valueOf(object.getPayloadBytes().length + extraBytes); + BigInteger powLength = BigInteger.valueOf(object.getPayloadBytesWithoutNonce().length + extraBytes); BigInteger denominator = BigInteger.valueOf(nonceTrialsPerByte).multiply(powLength.add(powLength.multiply(TTL).divide(BigInteger.valueOf(2).pow(16)))); - return numerator.divide(denominator).toByteArray(); + return Bytes.expand(numerator.divide(denominator).toByteArray(), 8); } private static byte[] hash(String algorithm, byte[]... data) { @@ -141,10 +128,4 @@ public class Security { throw new RuntimeException(e); } } - - private static byte[] bytes(byte[] data, int count) { - byte[] result = new byte[count]; - System.arraycopy(data, 0, result, 0, count); - return result; - } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Strings.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Strings.java new file mode 100644 index 0000000..46da936 --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Strings.java @@ -0,0 +1,58 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +/** + * Created by chris on 13.04.15. + */ +public class Strings { + public static CharSequence join(byte[]... objects) { + StringBuilder streamList = new StringBuilder(); + for (int i = 0; i < objects.length; i++) { + if (i > 0) streamList.append(", "); + streamList.append(hex(objects[i])); + } + return streamList; + } + + public static CharSequence join(long... objects) { + StringBuilder streamList = new StringBuilder(); + for (int i = 0; i < objects.length; i++) { + if (i > 0) streamList.append(", "); + streamList.append(objects[i]); + } + return streamList; + } + + public static CharSequence join(Object... objects) { + StringBuilder streamList = new StringBuilder(); + for (int i = 0; i < objects.length; i++) { + if (i > 0) streamList.append(", "); + streamList.append(objects[i]); + } + return streamList; + } + + public static CharSequence hex(byte[] bytes) { + StringBuilder hex = new StringBuilder(bytes.length + 2); + hex.append("0x"); + for (byte b : bytes) { + hex.append(String.format("%02x", b)); + } + return hex; + } +} diff --git a/domain/src/test/java/ch/dissem/bitmessage/utils/StringsTest.java b/domain/src/test/java/ch/dissem/bitmessage/utils/StringsTest.java new file mode 100644 index 0000000..9fb87a4 --- /dev/null +++ b/domain/src/test/java/ch/dissem/bitmessage/utils/StringsTest.java @@ -0,0 +1,34 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class StringsTest { + @Test + public void ensureJoinWorksWithLongArray() { + long[] test = {1L, 2L}; + assertEquals("1, 2", Strings.join(test).toString()); + } + + @Test + public void testHexString() { + assertEquals("0x48656c6c6f21", Strings.hex("Hello!".getBytes())); + } +} diff --git a/inventory/build.gradle b/inventory/build.gradle index f8d7bba..85761de 100644 --- a/inventory/build.gradle +++ b/inventory/build.gradle @@ -9,5 +9,7 @@ repositories { dependencies { compile project(':domain') - testCompile group: 'junit', name: 'junit', version: '4.11' + compile 'com.h2database:h2:1.4.187' + compile 'org.flywaydb:flyway-core:3.2.1' + testCompile 'junit:junit:4.11' } \ No newline at end of file diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java new file mode 100644 index 0000000..eb84886 --- /dev/null +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java @@ -0,0 +1,183 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.inventory; + +import ch.dissem.bitmessage.entity.ObjectMessage; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.factory.Factory; +import ch.dissem.bitmessage.ports.AddressRepository; +import ch.dissem.bitmessage.ports.Inventory; +import org.flywaydb.core.Flyway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.sql.*; +import java.util.LinkedList; +import java.util.List; + +import static ch.dissem.bitmessage.utils.Strings.join; + +/** + * Stores everything in a database + */ +public class DatabaseRepository implements Inventory, AddressRepository { + private static final Logger LOG = LoggerFactory.getLogger(DatabaseRepository.class); + + private static final String DB_URL = "jdbc:h2:~/jabit"; + private static final String DB_USER = "sa"; + private static final String DB_PWD = null; + + + public DatabaseRepository() { + Flyway flyway = new Flyway(); + flyway.setDataSource(DB_URL, DB_USER, null); + flyway.migrate(); + } + + @Override + public List getKnownAddresses(int limit, long... streams) { + List result = new LinkedList<>(); + try { + Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE Stream IN (" + join(streams) + ")"); + while (rs.next()) { +// result.add(new NetworkAddress.Builder() +// .ipv6(rs.getBytes("ip")) +// .port(rs.getByte("port")) +// .services(rs.getLong("services")) +// .stream(rs.getLong("stream")) +// .time(rs.getLong("time")) +// .build()); + } + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + if (result.isEmpty()) { + // FIXME: this is for testing purposes, remove it! + result.add(new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build()); + } + return result; + } + + @Override + public void offerAddresses(List addresses) { + try { + Connection connection = getConnection(); + PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ?"); + PreparedStatement insert = connection.prepareStatement( + "INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)"); + PreparedStatement update = connection.prepareStatement( + "UPDATE Node SET services = ?, stream = ?, time = ? WHERE ip = ? AND port = ?"); + for (NetworkAddress node : addresses) { + exists.setBytes(1, node.getIPv6()); + exists.setInt(2, node.getPort()); + if (exists.executeQuery().next()) { + update.setLong(1, node.getServices()); + update.setLong(2, node.getStream()); + update.setLong(3, node.getTime()); + + update.setBytes(4, node.getIPv6()); + update.setInt(5, node.getPort()); + update.executeUpdate(); + } else { + insert.setBytes(1, node.getIPv6()); + insert.setInt(2, node.getPort()); + insert.setLong(3, node.getServices()); + insert.setLong(4, node.getStream()); + insert.setLong(5, node.getTime()); + insert.executeUpdate(); + } + } + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + } + + @Override + public List getInventory(long... streams) { + List result = new LinkedList<>(); + try { + Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE Stream IN (" + join(streams) + ")"); + while (rs.next()) { + result.add(new InventoryVector(rs.getBytes("hash"))); + } + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + return result; + } + + @Override + public List getMissing(List offer, long... streams) { + offer.removeAll(getInventory(streams)); + return offer; + } + + @Override + public ObjectMessage getObject(InventoryVector vector) { + try { + Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT data, version FROM Inventory WHERE hash = " + vector); + Blob data = rs.getBlob("data"); + return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Override + public void storeObject(int version, ObjectMessage object) { + try { + PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, version) VALUES (?, ?, ?, ?, ?)"); + InventoryVector iv = object.getInventoryVector(); + LOG.error("Storing object " + iv); + ps.setBytes(1, iv.getHash()); + ps.setLong(2, object.getStream()); + ps.setLong(3, object.getExpiresTime()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + object.write(os); + ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); + ps.setBlob(4, is); + ps.setInt(5, version); + ps.executeUpdate(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + @Override + public void cleanup() { + try { + getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE time < " + (System.currentTimeMillis() / 1000)); + } catch (SQLException e) { + LOG.debug(e.getMessage(), e); + } + } + + private Connection getConnection() { + try { + return DriverManager.getConnection(DB_URL, DB_USER, DB_PWD); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java index 55e6e68..d0efec7 100644 --- a/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java @@ -34,7 +34,7 @@ public class SimpleInventory implements Inventory { } @Override - public List getMissing(List offer) { + public List getMissing(List offer, long... streams) { return offer; } @@ -44,7 +44,7 @@ public class SimpleInventory implements Inventory { } @Override - public void storeObject(ObjectMessage object) { + public void storeObject(int version, ObjectMessage object) { throw new NotImplementedException(); } diff --git a/inventory/src/main/resources/db/migration/V1.0__Create_node_table.sql b/inventory/src/main/resources/db/migration/V1.0__Create_node_table.sql new file mode 100644 index 0000000..874b1f5 --- /dev/null +++ b/inventory/src/main/resources/db/migration/V1.0__Create_node_table.sql @@ -0,0 +1,9 @@ +CREATE TABLE Node ( + ip BINARY(16) NOT NULL, + port INT NOT NULL, + services BIGINT NOT NULL, + stream BIGINT NOT NULL, + time BIGINT NOT NULL, + + PRIMARY KEY (ip, port) +); \ No newline at end of file diff --git a/inventory/src/main/resources/db/migration/V1.1__Create_inventory_table.sql b/inventory/src/main/resources/db/migration/V1.1__Create_inventory_table.sql new file mode 100644 index 0000000..0edd765 --- /dev/null +++ b/inventory/src/main/resources/db/migration/V1.1__Create_inventory_table.sql @@ -0,0 +1,7 @@ +CREATE TABLE Inventory ( + hash BINARY(32) NOT NULL PRIMARY KEY, + stream BIGINT NOT NULL, + expires BIGINT NOT NULL, + data BLOB NOT NULL, + version INT NOT NULL +); \ No newline at end of file diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 329667c..e68e55d 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -22,6 +22,7 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener; +import ch.dissem.bitmessage.utils.Security; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,7 @@ public class Connection implements Runnable { this.out = socket.getOutputStream(); this.listener = listener; this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); - this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).build(); + this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(); } public State getState() { @@ -124,8 +125,6 @@ public class Connection implements Runnable { send(msg); } } - } catch (IOException e) { - e.printStackTrace(); } } } @@ -134,23 +133,34 @@ public class Connection implements Runnable { switch (messagePayload.getCommand()) { case INV: Inv inv = (Inv) messagePayload; - List missing = ctx.getInventory().getMissing(inv.getInventory()); + List missing = ctx.getInventory().getMissing(inv.getInventory(), streams); + LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are " + + missing.size() + " missing."); send(new GetData.Builder().inventory(missing).build()); break; case GETDATA: GetData getData = (GetData) messagePayload; - for (InventoryVector iv : getData.getInventory()) { - ObjectMessage om = ctx.getInventory().getObject(iv); - sendingQueue.offer(om); - } +// for (InventoryVector iv : getData.getInventory()) { +// ObjectMessage om = ctx.getInventory().getObject(iv); +// sendingQueue.offer(om); +// } + LOG.error("Node requests data!!!! This shouldn't happen, the hash is done wrong!!!"); break; case OBJECT: ObjectMessage objectMessage = (ObjectMessage) messagePayload; - ctx.getInventory().storeObject(objectMessage); + try { + LOG.debug("Received object " + objectMessage.getInventoryVector()); + Security.checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); + ctx.getInventory().storeObject(version, objectMessage); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } + // It's probably pointless, but let the listener decide if we accept the message for the client. listener.receive(objectMessage.getPayload()); break; case ADDR: Addr addr = (Addr) messagePayload; + LOG.debug("Received " + addr.getAddresses().size() + " addresses."); ctx.getAddressRepository().offerAddresses(addr.getAddresses()); break; case VERACK: