From 35088ca03304bbbe64df04643d2c99dec3af554e Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Tue, 7 Apr 2015 18:48:58 +0200 Subject: [PATCH] Network code now works well enough for the server to think it successfully established a connection --- demo/build.gradle | 16 +++ .../java/ch/dissem/bitmessage/demo/Main.java | 50 +++++++ .../java/ch/dissem/bitmessage/Context.java | 46 ++++-- .../ch/dissem/bitmessage/entity/Version.java | 2 +- .../bitmessage/entity/payload/Broadcast.java | 45 ++++++ .../entity/payload/GenericPayload.java | 9 +- .../bitmessage/entity/payload/GetPubkey.java | 9 +- .../dissem/bitmessage/entity/payload/Msg.java | 43 ++++++ .../entity/payload/ObjectPayload.java | 1 + .../bitmessage/entity/payload/Pubkey.java | 2 - .../bitmessage/entity/payload/V2Pubkey.java | 48 ++++++- .../bitmessage/entity/payload/V3Pubkey.java | 64 +++++++++ .../bitmessage/entity/payload/V4Pubkey.java | 13 +- .../entity/valueobject/NetworkAddress.java | 24 ++-- .../ch/dissem/bitmessage/factory/Factory.java | 61 ++++++-- .../bitmessage/factory/V3MessageFactory.java | 44 ++++-- .../ch/dissem/bitmessage/ports/Inventory.java | 11 +- ...MessageSender.java => NetworkHandler.java} | 15 +- .../ch/dissem/bitmessage/utils/Decode.java | 5 +- inventory/build.gradle | 13 ++ .../inventory/SimpleAddressRepository.java | 23 +-- .../bitmessage/inventory/SimpleInventory.java | 55 +++++++ networking/build.gradle | 2 +- .../bitmessage/networking/Connection.java | 28 ++-- .../bitmessage/networking/NetworkNode.java | 135 +++++++++++------- .../networking/NetworkNodeTest.java | 18 +-- settings.gradle | 4 + 27 files changed, 636 insertions(+), 150 deletions(-) create mode 100644 demo/build.gradle create mode 100644 demo/src/main/java/ch/dissem/bitmessage/demo/Main.java create mode 100644 domain/src/main/java/ch/dissem/bitmessage/entity/payload/Broadcast.java create mode 100644 domain/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java rename domain/src/main/java/ch/dissem/bitmessage/ports/{NetworkMessageSender.java => NetworkHandler.java} (73%) create mode 100644 inventory/build.gradle rename domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java => inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleAddressRepository.java (56%) create mode 100644 inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java diff --git a/demo/build.gradle b/demo/build.gradle new file mode 100644 index 0000000..dbc81ea --- /dev/null +++ b/demo/build.gradle @@ -0,0 +1,16 @@ +apply plugin: 'java' + +sourceCompatibility = 1.7 +version = '1.0' + +repositories { + mavenCentral() +} + +dependencies { + compile project(':domain') + compile project(':networking') + compile project(':inventory') + compile 'org.slf4j:slf4j-simple:1.7.12' + testCompile group: 'junit', name: 'junit', version: '4.11' +} \ No newline at end of file diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java new file mode 100644 index 0000000..e239e0b --- /dev/null +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java @@ -0,0 +1,50 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.demo; + +import ch.dissem.bitmessage.Context; +import ch.dissem.bitmessage.entity.payload.ObjectPayload; +import ch.dissem.bitmessage.inventory.SimpleAddressRepository; +import ch.dissem.bitmessage.inventory.SimpleInventory; +import ch.dissem.bitmessage.networking.NetworkNode; +import ch.dissem.bitmessage.ports.NetworkHandler; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * Created by chris on 06.04.15. + */ +public class Main { + public static void main(String[] args) throws IOException { + NetworkNode networkNode = new NetworkNode(); + Context.init(new SimpleInventory(), new SimpleAddressRepository(), networkNode, 48444); + Context.getInstance().addStream(1); + networkNode.setListener(new NetworkHandler.MessageListener() { + @Override + public void receive(ObjectPayload payload) { + // TODO + } + }); + networkNode.start(); + + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + System.out.print("Enter String"); + br.readLine(); + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/Context.java b/domain/src/main/java/ch/dissem/bitmessage/Context.java index 0524e4d..e09f1ad 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/Context.java +++ b/domain/src/main/java/ch/dissem/bitmessage/Context.java @@ -18,8 +18,12 @@ package ch.dissem.bitmessage; import ch.dissem.bitmessage.ports.AddressRepository; import ch.dissem.bitmessage.ports.Inventory; -import ch.dissem.bitmessage.ports.NetworkMessageReceiver; -import ch.dissem.bitmessage.ports.NetworkMessageSender; +import ch.dissem.bitmessage.ports.NetworkHandler; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeSet; /** * Created by chris on 05.04.15. @@ -31,19 +35,22 @@ public class Context { private Inventory inventory; private AddressRepository addressRepo; - private NetworkMessageSender sender; - private NetworkMessageReceiver receiver; + private NetworkHandler networkHandler; + + private Collection streams = new TreeSet<>(); + + private int port; private Context(Inventory inventory, AddressRepository addressRepo, - NetworkMessageSender sender, NetworkMessageReceiver receiver) { + NetworkHandler networkHandler, int port) { this.inventory = inventory; this.addressRepo = addressRepo; - this.sender = sender; - this.receiver = receiver; + this.networkHandler = networkHandler; + this.port = port; } - public static void init(Inventory inventory, AddressRepository addressRepository, NetworkMessageSender sender, NetworkMessageReceiver receiver) { - instance = new Context(inventory, addressRepository, sender, receiver); + public static void init(Inventory inventory, AddressRepository addressRepository, NetworkHandler networkHandler, int port) { + instance = new Context(inventory, addressRepository, networkHandler, port); } public static Context getInstance() { @@ -57,4 +64,25 @@ public class Context { public AddressRepository getAddressRepository() { return addressRepo; } + + public int getPort() { + return port; + } + + public long[] getStreams() { + long[] result = new long[streams.size()]; + int i = 0; + for (long stream : streams) { + result[i++] = stream; + } + return result; + } + + public void addStream(long stream) { + streams.add(stream); + } + + public void removeStream(long stream) { + streams.remove(stream); + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java index 24936d4..af950a8 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java @@ -146,7 +146,7 @@ public class Version implements MessagePayload { } public Builder defaults() { - version = Context.CURRENT; + version = Context.CURRENT_VERSION; services = 1; timestamp = System.currentTimeMillis() / 1000; nonce = new Random().nextInt(); diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Broadcast.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Broadcast.java new file mode 100644 index 0000000..9d16766 --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Broadcast.java @@ -0,0 +1,45 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.entity.payload; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Created by chris on 07.04.15. + */ +public class Broadcast implements ObjectPayload { + private long stream; + private byte[] tag; + private byte[] encrypted; + + public Broadcast(long stream, byte[] tag, byte[] encrypted) { + this.stream = stream; + this.tag = tag; + this.encrypted = encrypted; + } + + @Override + public long getStream() { + return stream; + } + + @Override + public void write(OutputStream stream) throws IOException { + + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java index 874d1bc..5742dee 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GenericPayload.java @@ -23,12 +23,19 @@ import java.io.OutputStream; * Created by chris on 24.03.15. */ public class GenericPayload implements ObjectPayload { + private long stream; private byte[] data; - public GenericPayload(byte[] data) { + public GenericPayload(long stream, byte[] data) { + this.stream=stream; this.data = data; } + @Override + public long getStream() { + return stream; + } + @Override public void write(OutputStream stream) throws IOException { stream.write(data); diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GetPubkey.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GetPubkey.java index 9a551d0..132e573 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GetPubkey.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/GetPubkey.java @@ -23,10 +23,12 @@ import java.io.OutputStream; * Created by chris on 24.03.15. */ public class GetPubkey implements ObjectPayload { + private long stream; private byte[] ripe; private byte[] tag; - public GetPubkey(byte[] ripeOrTag) { + public GetPubkey(long stream, byte[] ripeOrTag) { + this.stream=stream; switch (ripeOrTag.length) { case 20: ripe = ripeOrTag; @@ -39,6 +41,11 @@ public class GetPubkey implements ObjectPayload { } } + @Override + public long getStream() { + return stream; + } + @Override public void write(OutputStream stream) throws IOException { if (tag != null) { diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java new file mode 100644 index 0000000..b93e618 --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Msg.java @@ -0,0 +1,43 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.entity.payload; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Created by chris on 07.04.15. + */ +public class Msg implements ObjectPayload { + private long stream; + private byte[] encrypted; + + public Msg(long stream, byte[] encrypted) { + this.stream = stream; + this.encrypted = encrypted; + } + + @Override + public long getStream() { + return stream; + } + + @Override + public void write(OutputStream stream) throws IOException { + stream.write(encrypted); + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/ObjectPayload.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/ObjectPayload.java index 52599be..d3aefe2 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/ObjectPayload.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/ObjectPayload.java @@ -22,4 +22,5 @@ import ch.dissem.bitmessage.entity.Streamable; * The payload of an 'object' command. This is shared by the network. */ public interface ObjectPayload extends Streamable { + long getStream(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java index 859effc..c36e1c7 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/Pubkey.java @@ -22,8 +22,6 @@ package ch.dissem.bitmessage.entity.payload; public interface Pubkey extends ObjectPayload { long getVersion(); - long getStream(); - byte[] getSigningKey(); byte[] getEncryptionKey(); diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java index fef5874..1f77af3 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java @@ -25,11 +25,21 @@ import java.io.OutputStream; * Created by chris on 24.03.15. */ public class V2Pubkey implements Pubkey { - protected long streamNumber; + protected long stream; protected long behaviorBitfield; protected byte[] publicSigningKey; protected byte[] publicEncryptionKey; + protected V2Pubkey() { + } + + private V2Pubkey(Builder builder) { + stream = builder.streamNumber; + behaviorBitfield = builder.behaviorBitfield; + publicSigningKey = builder.publicSigningKey; + publicEncryptionKey = builder.publicEncryptionKey; + } + @Override public long getVersion() { return 2; @@ -37,7 +47,7 @@ public class V2Pubkey implements Pubkey { @Override public long getStream() { - return streamNumber; + return stream; } @Override @@ -56,4 +66,38 @@ public class V2Pubkey implements Pubkey { stream.write(publicSigningKey); stream.write(publicEncryptionKey); } + + public static class Builder { + private long streamNumber; + private long behaviorBitfield; + private byte[] publicSigningKey; + private byte[] publicEncryptionKey; + + public Builder() { + } + + public Builder streamNumber(long streamNumber) { + this.streamNumber = streamNumber; + return this; + } + + public Builder behaviorBitfield(long behaviorBitfield) { + this.behaviorBitfield = behaviorBitfield; + return this; + } + + public Builder publicSigningKey(byte[] publicSigningKey) { + this.publicSigningKey = publicSigningKey; + return this; + } + + public Builder publicEncryptionKey(byte[] publicEncryptionKey) { + this.publicEncryptionKey = publicEncryptionKey; + return this; + } + + public V2Pubkey build() { + return new V2Pubkey(this); + } + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java index 8b9b088..081063d 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java @@ -29,6 +29,17 @@ public class V3Pubkey extends V2Pubkey { long extraBytes; byte[] signature; + protected V3Pubkey(Builder builder) { + stream = builder.streamNumber; + behaviorBitfield = builder.behaviorBitfield; + publicSigningKey = builder.publicSigningKey; + publicEncryptionKey = builder.publicEncryptionKey; + + nonceTrialsPerByte = builder.nonceTrialsPerByte; + extraBytes = builder.extraBytes; + signature = builder.signature; + } + @Override public void write(OutputStream stream) throws IOException { super.write(stream); @@ -42,4 +53,57 @@ public class V3Pubkey extends V2Pubkey { public long getVersion() { return 3; } + + public static class Builder extends V2Pubkey.Builder { + private long streamNumber; + private long behaviorBitfield; + private byte[] publicSigningKey; + private byte[] publicEncryptionKey; + + private long nonceTrialsPerByte; + private long extraBytes; + private byte[] signature; + + public Builder() { + } + + public Builder streamNumber(long streamNumber) { + this.streamNumber = streamNumber; + return this; + } + + public Builder behaviorBitfield(long behaviorBitfield) { + this.behaviorBitfield = behaviorBitfield; + return this; + } + + public Builder publicSigningKey(byte[] publicSigningKey) { + this.publicSigningKey = publicSigningKey; + return this; + } + + public Builder publicEncryptionKey(byte[] publicEncryptionKey) { + this.publicEncryptionKey = publicEncryptionKey; + return this; + } + + public Builder nonceTrialsPerByte(long nonceTrialsPerByte) { + this.nonceTrialsPerByte = nonceTrialsPerByte; + return this; + } + + public Builder extraBytes(long extraBytes) { + this.extraBytes = extraBytes; + return this; + } + + public Builder signature(byte[] signature) { + this.signature = signature; + return this; + } + + public V3Pubkey build() { + return new V3Pubkey(this); + } + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V4Pubkey.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V4Pubkey.java index bb31ab8..2737744 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V4Pubkey.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/V4Pubkey.java @@ -23,15 +23,22 @@ import java.io.OutputStream; * Created by chris on 27.03.15. */ public class V4Pubkey implements Pubkey { - private long streamNumber; + private long stream; private byte[] tag; private byte[] encrypted; private V3Pubkey decrypted; + public V4Pubkey(long stream, byte[] tag, byte[] encrypted) { + this.stream = stream; + this.tag = tag; + this.encrypted = encrypted; + } + public V4Pubkey(V3Pubkey decrypted) { + this.stream = decrypted.stream; + // TODO: this.tag = new BitmessageAddress(this).doubleHash this.decrypted = decrypted; - // TODO: this.tag = new BitmessageAddress(this).doubleHash } @Override @@ -47,7 +54,7 @@ public class V4Pubkey implements Pubkey { @Override public long getStream() { - return streamNumber; + return stream; } @Override diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java index 545781c..a08e31d 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java @@ -21,12 +21,9 @@ import ch.dissem.bitmessage.utils.Encode; import java.io.IOException; import java.io.OutputStream; -import java.net.Inet6Address; import java.net.InetAddress; -import java.net.Socket; import java.net.UnknownHostException; import java.util.Arrays; -import java.util.Objects; /** * A node's address. It's written in IPv6 format. @@ -51,6 +48,14 @@ public class NetworkAddress implements Streamable { private byte[] ipv6; private int port; + private NetworkAddress(Builder builder) { + time = builder.time; + stream = builder.stream; + services = builder.services; + ipv6 = builder.ipv6; + port = builder.port; + } + public int getPort() { return port; } @@ -63,14 +68,6 @@ public class NetworkAddress implements Streamable { } } - private NetworkAddress(Builder builder) { - time = builder.time; - stream = builder.stream; - services = builder.services; - ipv6 = builder.ipv6; - port = builder.port; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -88,6 +85,11 @@ public class NetworkAddress implements Streamable { return result; } + @Override + public String toString() { + return toInetAddress() + ":" + port; + } + @Override public void write(OutputStream stream) throws IOException { write(stream, false); diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java index 2a57a0a..4bdde80 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java @@ -17,10 +17,10 @@ package ch.dissem.bitmessage.factory; import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.payload.GenericPayload; -import ch.dissem.bitmessage.entity.payload.GetPubkey; -import ch.dissem.bitmessage.entity.payload.ObjectPayload; +import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.utils.Decode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -29,26 +29,63 @@ import java.io.InputStream; * Creates {@link NetworkMessage} objects from {@link InputStream InputStreams} */ public class Factory { + public static final Logger LOG = LoggerFactory.getLogger(Factory.class); + public static NetworkMessage getNetworkMessage(int version, InputStream stream) throws IOException { return new V3MessageFactory().read(stream); } - static ObjectPayload getObjectPayload(long objectType, long version, InputStream stream, int length) throws IOException { + static ObjectPayload getObjectPayload(long objectType, long version, long streamNumber, InputStream stream, int length) throws IOException { if (objectType < 4) { switch ((int) objectType) { case 0: // getpubkey - return new GetPubkey(Decode.bytes(stream, length)); + return new GetPubkey(streamNumber, Decode.bytes(stream, length)); case 1: // pubkey - break; + return parsePubkey((int) version, streamNumber, stream, length); case 2: // msg - break; + return parseMsg((int) version, streamNumber, stream, length); case 3: // broadcast - break; + return parseBroadcast((int) version, streamNumber, stream, length); } - throw new RuntimeException("This must not happen, someone broke something in the code!"); - } else { - // passthrough message - return new GenericPayload(Decode.bytes(stream, length)); + LOG.error("This should not happen, someone broke something in the code!"); } + // fallback: just store the message - we don't really care what it is + LOG.error("Unexpected object type: " + objectType); + return new GenericPayload(streamNumber, Decode.bytes(stream, length)); + } + + private static ObjectPayload parsePubkey(int version, long streamNumber, InputStream stream, int length) throws IOException { + switch (version) { + case 2: + return new V2Pubkey.Builder() + .streamNumber(streamNumber) + .behaviorBitfield(Decode.int64(stream)) + .publicSigningKey(Decode.bytes(stream, 64)) + .publicEncryptionKey(Decode.bytes(stream, 64)) + .build(); + case 3: + V3Pubkey.Builder v3 = new V3Pubkey.Builder() + .streamNumber(streamNumber) + .behaviorBitfield(Decode.int64(stream)) + .publicSigningKey(Decode.bytes(stream, 64)) + .publicEncryptionKey(Decode.bytes(stream, 64)) + .nonceTrialsPerByte(Decode.varInt(stream)) + .extraBytes(Decode.varInt(stream)); + int sigLength = (int) Decode.varInt(stream); + v3.signature(Decode.bytes(stream, sigLength)); + return v3.build(); + case 4: + // TODO + } + LOG.debug("Unexpected pubkey version " + version + ", handling as generic payload object"); + return new GenericPayload(streamNumber, Decode.bytes(stream, length)); + } + + private static ObjectPayload parseMsg(int version, long streamNumber, InputStream stream, int length) throws IOException { + return new Msg(streamNumber, Decode.bytes(stream, length)); + } + + private static ObjectPayload parseBroadcast(int version, long streamNumber, InputStream stream, int length) throws IOException { + return new Broadcast(streamNumber, Decode.bytes(stream, 32), Decode.bytes(stream, length - 32)); } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java index a5bf939..6c3a9f6 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java @@ -22,6 +22,8 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.utils.Decode; import ch.dissem.bitmessage.utils.Security; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -31,6 +33,8 @@ import java.io.InputStream; * Creates protocol v3 network messages from {@link InputStream InputStreams} */ class V3MessageFactory { + private Logger LOG = LoggerFactory.getLogger(V3MessageFactory.class); + public NetworkMessage read(InputStream stream) throws IOException { if (testMagic(stream)) { String command = getCommand(stream); @@ -45,8 +49,10 @@ class V3MessageFactory { } else { throw new IOException("Checksum failed for message '" + command + "'"); } + } else { + LOG.debug("Failed test for MAGIC bytes"); + return null; } - return null; } private MessagePayload getPayload(String command, InputStream stream, int length) throws IOException { @@ -64,6 +70,7 @@ class V3MessageFactory { case "object": return parseObject(stream, length); default: + LOG.debug("Unknown command: " + command); return null; } } @@ -75,7 +82,7 @@ class V3MessageFactory { long version = Decode.varInt(stream); long streamNumber = Decode.varInt(stream); - ObjectPayload payload = Factory.getObjectPayload(objectType, version, stream, length); + ObjectPayload payload = Factory.getObjectPayload(objectType, version, streamNumber, stream, length); return new ObjectMessage.Builder() .nonce(nonce) @@ -109,7 +116,7 @@ class V3MessageFactory { long count = Decode.varInt(stream); Addr.Builder builder = new Addr.Builder(); for (int i = 0; i < count; i++) { - builder.addAddress(parseAddress(stream)); + builder.addAddress(parseAddress(stream, false)); } return builder.build(); } @@ -118,8 +125,8 @@ class V3MessageFactory { int version = Decode.int32(stream); long services = Decode.int64(stream); long timestamp = Decode.int64(stream); - NetworkAddress addrRecv = parseAddress(stream); - NetworkAddress addrFrom = parseAddress(stream); + NetworkAddress addrRecv = parseAddress(stream, true); + NetworkAddress addrFrom = parseAddress(stream, true); long nonce = Decode.int64(stream); String userAgent = Decode.varString(stream); long[] streamNumbers = Decode.varIntList(stream); @@ -138,9 +145,16 @@ class V3MessageFactory { return new InventoryVector(Decode.bytes(stream, 32)); } - private NetworkAddress parseAddress(InputStream stream) throws IOException { - long time = Decode.int64(stream); - long streamNumber = Decode.uint32(stream); // This isn't consistent, not sure if this is correct + private NetworkAddress parseAddress(InputStream stream, boolean light) throws IOException { + long time; + long streamNumber; + if (!light) { + time = Decode.int64(stream); + streamNumber = Decode.uint32(stream); // This isn't consistent, not sure if this is correct + } else { + time = 0; + streamNumber = 0; + } long services = Decode.int64(stream); byte[] ipv6 = Decode.bytes(stream, 16); int port = Decode.uint16(stream); @@ -159,13 +173,21 @@ class V3MessageFactory { private String getCommand(InputStream stream) throws IOException { byte[] bytes = new byte[12]; - stream.read(bytes); - return new String(bytes, "ASCII"); + int end = -1; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) stream.read(); + if (end == -1) { + if (bytes[i] == 0) end = i; + } else { + if (bytes[i] != 0) throw new IOException("'\\0' padding expected for command"); + } + } + return new String(bytes, 0, end, "ASCII"); } private boolean testMagic(InputStream stream) throws IOException { for (byte b : NetworkMessage.MAGIC_BYTES) { - if (b != stream.read()) return false; + if (b != (byte) stream.read()) return false; } return true; } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java index 73584f9..310a7bb 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java @@ -17,7 +17,6 @@ package ch.dissem.bitmessage.ports; import ch.dissem.bitmessage.entity.ObjectMessage; -import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import java.util.List; @@ -26,13 +25,13 @@ import java.util.List; * The Inventory stores and retrieves objects, cleans up outdated objects and can tell which objects are still missing. */ public interface Inventory { - public List getInventory(long... streams); + List getInventory(long... streams); - public List getMissing(List offer); + List getMissing(List offer); - public ObjectMessage getObject(InventoryVector vector); + ObjectMessage getObject(InventoryVector vector); - public void storeObject(ObjectMessage object); + void storeObject(ObjectMessage object); - public void cleanup(); + void cleanup(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageSender.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java similarity index 73% rename from domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageSender.java rename to domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index 532f8fc..95b37da 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageSender.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -17,11 +17,18 @@ package ch.dissem.bitmessage.ports; import ch.dissem.bitmessage.entity.payload.ObjectPayload; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; /** - * Sends messages + * Handles incoming messages */ -public interface NetworkMessageSender { - void send(NetworkAddress node, ObjectPayload payload); +public interface NetworkHandler { + void setListener(MessageListener listener); + + void start(); + + void send(ObjectPayload payload); + + interface MessageListener { + void receive(ObjectPayload payload); + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java index dad6b4a..e70305c 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Decode.java @@ -27,7 +27,10 @@ import java.nio.ByteBuffer; public class Decode { public static byte[] bytes(InputStream stream, int count) throws IOException { byte[] result = new byte[count]; - stream.read(result); + int off = 0; + while (off < count) { + off += stream.read(result, off, count - off); + } return result; } diff --git a/inventory/build.gradle b/inventory/build.gradle new file mode 100644 index 0000000..f8d7bba --- /dev/null +++ b/inventory/build.gradle @@ -0,0 +1,13 @@ +apply plugin: 'java' + +sourceCompatibility = 1.7 +version = '1.0' + +repositories { + mavenCentral() +} + +dependencies { + compile project(':domain') + testCompile group: 'junit', name: 'junit', version: '4.11' +} \ No newline at end of file diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleAddressRepository.java similarity index 56% rename from domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java rename to inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleAddressRepository.java index d86ae60..3fb19de 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleAddressRepository.java @@ -14,23 +14,24 @@ * limitations under the License. */ -package ch.dissem.bitmessage.ports; +package ch.dissem.bitmessage.inventory; -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.ports.AddressRepository; -import java.io.IOException; +import java.util.Collections; +import java.util.List; /** - * Handles incoming messages + * Created by chris on 06.04.15. */ -public interface NetworkMessageReceiver { - void registerListener(int port, MessageListener listener) throws IOException; +public class SimpleAddressRepository implements AddressRepository { + @Override + public List getKnownAddresses(int limit, long... streams) { + return Collections.singletonList(new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build()); + } - void registerListener(NetworkAddress node, MessageListener listener) throws IOException; - - interface MessageListener { - void receive(ObjectPayload payload); + @Override + public void offerAddresses(List addresses) { } } diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java new file mode 100644 index 0000000..55e6e68 --- /dev/null +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/SimpleInventory.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.inventory; + +import ch.dissem.bitmessage.entity.ObjectMessage; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.ports.Inventory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.util.LinkedList; +import java.util.List; + +/** + * Created by chris on 06.04.15. + */ +public class SimpleInventory implements Inventory { + @Override + public List getInventory(long... streams) { + return new LinkedList<>(); + } + + @Override + public List getMissing(List offer) { + return offer; + } + + @Override + public ObjectMessage getObject(InventoryVector vector) { + throw new NotImplementedException(); + } + + @Override + public void storeObject(ObjectMessage object) { + throw new NotImplementedException(); + } + + @Override + public void cleanup() { + throw new NotImplementedException(); + } +} diff --git a/networking/build.gradle b/networking/build.gradle index befb2d1..efa381c 100644 --- a/networking/build.gradle +++ b/networking/build.gradle @@ -8,7 +8,7 @@ repositories { } dependencies { - compile ':domain' + compile project(':domain') compile 'com.google.guava:guava-concurrent:r03' testCompile 'org.slf4j:slf4j-simple:1.7.12' testCompile 'junit:junit:4.11' diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 58c1e97..329667c 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -21,7 +21,7 @@ import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.NetworkMessageReceiver.MessageListener; +import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +69,14 @@ public class Connection implements Runnable { this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).build(); } + public State getState() { + return state; + } + + public NetworkAddress getNode() { + return node; + } + @Override public void run() { if (state == CLIENT) { @@ -77,6 +85,8 @@ public class Connection implements Runnable { while (state != DISCONNECTED) { try { NetworkMessage msg = Factory.getNetworkMessage(version, in); + if (msg == null) + continue; switch (state) { case ACTIVE: receiveMessage(msg.getPayload()); @@ -90,20 +100,16 @@ public class Connection implements Runnable { this.version = payload.getVersion(); this.streams = payload.getStreams(); send(new VerAck()); - if (state == SERVER) { - state = ACTIVE; - } + state = ACTIVE; + sendAddresses(); + sendInventory(); } else { + LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting."); disconnect(); } break; case VERACK: - if (state == CLIENT) { - sendAddresses(); - sendInventory(); - - state = ACTIVE; - } else { + if (state == SERVER) { send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); } break; @@ -155,7 +161,7 @@ public class Connection implements Runnable { private void sendAddresses() { List addresses = ctx.getAddressRepository().getKnownAddresses(1000, streams); - send(new Addr.Builder().addresses(addresses).build()); + sendingQueue.offer(new Addr.Builder().addresses(addresses).build()); } private void sendInventory() { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java index 11b8f07..f5e19a9 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java @@ -16,86 +16,113 @@ package ch.dissem.bitmessage.networking; -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.Version; +import ch.dissem.bitmessage.Context; +import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.NetworkMessageReceiver; -import ch.dissem.bitmessage.ports.NetworkMessageSender; +import ch.dissem.bitmessage.ports.NetworkHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static ch.dissem.bitmessage.networking.Connection.State.CLIENT; -import static ch.dissem.bitmessage.networking.Connection.State.SERVER; +import static ch.dissem.bitmessage.networking.Connection.State.*; /** * Handles all the networky stuff. */ -public class NetworkNode implements NetworkMessageSender, NetworkMessageReceiver { +public class NetworkNode implements NetworkHandler { private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class); - /** - * This is only to be used where it's ignored - */ - private final static NetworkAddress LOCALHOST = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build(); private final ExecutorService pool; + private final List connections = new LinkedList<>(); + private MessageListener listener; public NetworkNode() { pool = Executors.newCachedThreadPool(); - - // TODO: sending -// Thread sender = new Thread(new Runnable() { -// @Override -// public void run() { -// while (true) { -// try { -// NetworkMessage message = sendingQueue.take(); -// -// try (Socket socket = getSocket(message.getTargetNode())) { -// message.write(socket.getOutputStream()); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } catch (InterruptedException e) { -// // Ignore? -// } -// } -// } -// }, "Sender"); -// sender.setDaemon(true); -// sender.start(); } @Override - public void registerListener(final int port, final MessageListener listener) throws IOException { - final ServerSocket serverSocket = new ServerSocket(port); - pool.execute(new Runnable() { - @Override - public void run() { - NetworkAddress address = null; - try { - Socket socket = serverSocket.accept(); - socket.setSoTimeout(20000); - pool.execute(new Connection(SERVER, socket, listener)); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); + public void setListener(final MessageListener listener) { + if (this.listener != null) { + throw new IllegalStateException("Listener can only be set once"); + } + this.listener = listener; + } + + @Override + public void start() { + final Context ctx = Context.getInstance(); + if (listener == null) { + throw new IllegalStateException("Listener must be set at start"); + } + try { + final ServerSocket serverSocket = new ServerSocket(Context.getInstance().getPort()); + pool.execute(new Runnable() { + @Override + public void run() { + try { + Socket socket = serverSocket.accept(); + socket.setSoTimeout(20000); + startConnection(new Connection(SERVER, socket, listener)); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } } - } - }); + }); + Thread connectionManager = new Thread(new Runnable() { + @Override + public void run() { + while (!Thread.interrupted()) { + synchronized (connections) { + for (Iterator iterator = connections.iterator(); iterator.hasNext(); ) { + Connection c = iterator.next(); + if (c.getState() == DISCONNECTED) { + // Remove the current element from the iterator and the list. + iterator.remove(); + } + } + } + if (connections.size() < 1) { + List addresses = ctx.getAddressRepository().getKnownAddresses(8, ctx.getStreams()); + for (NetworkAddress address : addresses) { + try { + startConnection(new Connection(CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener)); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } + } + // FIXME: prevent connecting twice to the same node + } + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + LOG.debug(e.getMessage(), e); + Thread.currentThread().interrupt(); + } + } + } + }, "connection-manager"); + connectionManager.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void startConnection(Connection c) { + synchronized (connections) { + connections.add(c); + } + pool.execute(c); } @Override - public void registerListener(final NetworkAddress node, final MessageListener listener) throws IOException { - pool.execute(new Connection(CLIENT, new Socket(node.toInetAddress(), node.getPort()), listener)); - } - - @Override - public void send(final NetworkAddress node, final NetworkMessage message) { + public void send(final ObjectPayload payload) { // TODO: sendingQueue.add(message); } } diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java index 163e1f7..a835d4a 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java @@ -20,7 +20,7 @@ import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.Version; import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.ports.NetworkMessageReceiver; +import ch.dissem.bitmessage.ports.NetworkHandler; import org.junit.Test; /** @@ -33,13 +33,13 @@ public class NetworkNodeTest { public void testSendMessage() throws Exception { final Thread baseThread = Thread.currentThread(); NetworkNode net = new NetworkNode(); - net.registerListener(localhost, new NetworkMessageReceiver.MessageListener() { - @Override - public void receive(ObjectPayload payload) { - System.out.println(payload); - baseThread.interrupt(); - } - }); +// net.setListener(localhost, new NetworkHandler.MessageListener() { +// @Override +// public void receive(ObjectPayload payload) { +// System.out.println(payload); +// baseThread.interrupt(); +// } +// }); NetworkMessage ver = new NetworkMessage( new Version.Builder() .version(3) @@ -52,7 +52,7 @@ public class NetworkNodeTest { .streams(1, 2) .build() ); - net.send(localhost, ver); +// net.send(localhost, ver); Thread.sleep(20000); } } diff --git a/settings.gradle b/settings.gradle index dfd719a..b66fc83 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,3 +4,7 @@ include 'domain' include 'networking' +include 'inventory' + +include 'demo' +