From 3299d8ca4ae051da9b8ae32a83d4cf487d609443 Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Mon, 6 Apr 2015 10:01:03 +0200 Subject: [PATCH] Networking code, untested --- domain/build.gradle | 1 + .../java/ch/dissem/bitmessage/Context.java | 60 ++++++ .../ch/dissem/bitmessage/entity/Addr.java | 16 +- .../ch/dissem/bitmessage/entity/GetData.java | 10 +- .../java/ch/dissem/bitmessage/entity/Inv.java | 10 +- .../{Command.java => MessagePayload.java} | 8 +- .../bitmessage/entity/NetworkMessage.java | 19 +- .../bitmessage/entity/ObjectMessage.java | 16 +- .../ch/dissem/bitmessage/entity/VerAck.java | 6 +- .../ch/dissem/bitmessage/entity/Version.java | 34 ++-- .../bitmessage/factory/V3MessageFactory.java | 4 +- .../bitmessage/ports/AddressRepository.java | 30 +++ .../ch/dissem/bitmessage/ports/Inventory.java | 7 +- .../ports/NetworkMessageReceiver.java | 9 +- .../ports/NetworkMessageSender.java | 4 +- .../utils/AddressFormatException.java | 26 +++ .../ch/dissem/bitmessage/utils/Base58.java | 145 +++++++++++++- networking/build.gradle | 2 + .../bitmessage/networking/Connection.java | 189 ++++++++++++++++++ .../bitmessage/networking/NetworkNode.java | 118 ++++------- .../networking/NetworkNodeTest.java | 10 +- 21 files changed, 573 insertions(+), 151 deletions(-) create mode 100644 domain/src/main/java/ch/dissem/bitmessage/Context.java rename domain/src/main/java/ch/dissem/bitmessage/entity/{Command.java => MessagePayload.java} (81%) create mode 100644 domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java create mode 100644 domain/src/main/java/ch/dissem/bitmessage/utils/AddressFormatException.java create mode 100644 networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java diff --git a/domain/build.gradle b/domain/build.gradle index 34a213a..f216ccc 100644 --- a/domain/build.gradle +++ b/domain/build.gradle @@ -8,5 +8,6 @@ repositories { } dependencies { + compile 'org.slf4j:slf4j-api:1.7.12' testCompile group: 'junit', name: 'junit', version: '4.11' } \ No newline at end of file diff --git a/domain/src/main/java/ch/dissem/bitmessage/Context.java b/domain/src/main/java/ch/dissem/bitmessage/Context.java new file mode 100644 index 0000000..0524e4d --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/Context.java @@ -0,0 +1,60 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage; + +import ch.dissem.bitmessage.ports.AddressRepository; +import ch.dissem.bitmessage.ports.Inventory; +import ch.dissem.bitmessage.ports.NetworkMessageReceiver; +import ch.dissem.bitmessage.ports.NetworkMessageSender; + +/** + * Created by chris on 05.04.15. + */ +public class Context { + public static final int CURRENT_VERSION = 3; + + private static Context instance; + + private Inventory inventory; + private AddressRepository addressRepo; + private NetworkMessageSender sender; + private NetworkMessageReceiver receiver; + + private Context(Inventory inventory, AddressRepository addressRepo, + NetworkMessageSender sender, NetworkMessageReceiver receiver) { + this.inventory = inventory; + this.addressRepo = addressRepo; + this.sender = sender; + this.receiver = receiver; + } + + public static void init(Inventory inventory, AddressRepository addressRepository, NetworkMessageSender sender, NetworkMessageReceiver receiver) { + instance = new Context(inventory, addressRepository, sender, receiver); + } + + public static Context getInstance() { + return instance; + } + + public Inventory getInventory() { + return inventory; + } + + public AddressRepository getAddressRepository() { + return addressRepo; + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/Addr.java b/domain/src/main/java/ch/dissem/bitmessage/entity/Addr.java index 8fc13cc..d00623e 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/Addr.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/Addr.java @@ -22,12 +22,13 @@ import ch.dissem.bitmessage.utils.Encode; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.List; /** * The 'addr' command holds a list of known active Bitmessage nodes. */ -public class Addr implements Command { +public class Addr implements MessagePayload { private final List addresses; private Addr(Builder builder) { @@ -35,8 +36,12 @@ public class Addr implements Command { } @Override - public String getCommand() { - return "addr"; + public Command getCommand() { + return Command.ADDR; + } + + public List getAddresses() { + return addresses; } @Override @@ -53,6 +58,11 @@ public class Addr implements Command { public Builder() { } + public Builder addresses(Collection addresses){ + this.addresses.addAll(addresses); + return this; + } + public Builder addAddress(final NetworkAddress address) { this.addresses.add(address); return this; diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java b/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java index 5cab3b5..b62e6e5 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/GetData.java @@ -27,7 +27,7 @@ import java.util.List; /** * The 'getdata' command is used to request objects from a node. */ -public class GetData implements Command { +public class GetData implements MessagePayload { List inventory; private GetData(Builder builder) { @@ -35,8 +35,12 @@ public class GetData implements Command { } @Override - public String getCommand() { - return "getdata"; + public Command getCommand() { + return Command.GETDATA; + } + + public List getInventory() { + return inventory; } @Override diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/Inv.java b/domain/src/main/java/ch/dissem/bitmessage/entity/Inv.java index 19d8161..df1b380 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/Inv.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/Inv.java @@ -27,16 +27,20 @@ import java.util.List; /** * The 'inv' command holds up to 50000 inventory vectors, i.e. hashes of inventory items. */ -public class Inv implements Command { +public class Inv implements MessagePayload { private List inventory; private Inv(Builder builder) { inventory = builder.inventory; } + public List getInventory() { + return inventory; + } + @Override - public String getCommand() { - return "inv"; + public Command getCommand() { + return Command.INV; } @Override diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/Command.java b/domain/src/main/java/ch/dissem/bitmessage/entity/MessagePayload.java similarity index 81% rename from domain/src/main/java/ch/dissem/bitmessage/entity/Command.java rename to domain/src/main/java/ch/dissem/bitmessage/entity/MessagePayload.java index 68e0a62..e6f6f0a 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/Command.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/MessagePayload.java @@ -19,6 +19,10 @@ package ch.dissem.bitmessage.entity; /** * A command can hold a network message payload */ -public interface Command extends Streamable { - String getCommand(); +public interface MessagePayload extends Streamable { + Command getCommand(); + + enum Command { + VERSION, VERACK, ADDR, INV, GETDATA, OBJECT + } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java b/domain/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java index db6dcd9..8c2fb29 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java @@ -16,7 +16,6 @@ package ch.dissem.bitmessage.entity; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.utils.Encode; import java.io.ByteArrayOutputStream; @@ -39,12 +38,9 @@ public class NetworkMessage implements Streamable { public final static int MAGIC = 0xE9BEB4D9; public final static byte[] MAGIC_BYTES = ByteBuffer.allocate(4).putInt(MAGIC).array(); - private final NetworkAddress targetNode; + private final MessagePayload payload; - private final Command payload; - - public NetworkMessage(NetworkAddress target, Command payload) { - this.targetNode = target; + public NetworkMessage(MessagePayload payload) { this.payload = payload; } @@ -59,22 +55,19 @@ public class NetworkMessage implements Streamable { /** * The actual data, a message or an object. Not to be confused with objectPayload. */ - public Command getPayload() { + public MessagePayload getPayload() { return payload; } - public NetworkAddress getTargetNode() { - return targetNode; - } - @Override public void write(OutputStream stream) throws IOException { // magic Encode.int32(MAGIC, stream); // ASCII string identifying the packet content, NULL padded (non-NULL padding results in packet rejected) - stream.write(payload.getCommand().getBytes("ASCII")); - for (int i = payload.getCommand().length(); i < 12; i++) { + String command = payload.getCommand().name().toLowerCase(); + stream.write(command.getBytes("ASCII")); + for (int i = command.length(); i < 12; i++) { stream.write('\0'); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java b/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java index b6a245c..23fcd16 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java @@ -17,6 +17,7 @@ package ch.dissem.bitmessage.entity; import ch.dissem.bitmessage.entity.payload.ObjectPayload; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.utils.Encode; import java.io.IOException; @@ -25,7 +26,7 @@ import java.io.OutputStream; /** * The 'object' command sends an object that is shared throughout the network. */ -public class ObjectMessage implements Command { +public class ObjectMessage implements MessagePayload { private long nonce; private long expiresTime; private long objectType; @@ -47,8 +48,17 @@ public class ObjectMessage implements Command { } @Override - public String getCommand() { - return "object"; + public Command getCommand() { + return Command.OBJECT; + } + + public ObjectPayload getPayload() { + return payload; + } + + public InventoryVector getInventoryVector() { + // TODO + return null; } @Override diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/VerAck.java b/domain/src/main/java/ch/dissem/bitmessage/entity/VerAck.java index 19748d0..1aad501 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/VerAck.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/VerAck.java @@ -22,10 +22,10 @@ import java.io.OutputStream; /** * The 'verack' command answers a 'version' command, accepting the other node's version. */ -public class VerAck implements Command { +public class VerAck implements MessagePayload { @Override - public String getCommand() { - return "verack"; + public Command getCommand() { + return Command.VERACK; } @Override diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java index ea621eb..24936d4 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java @@ -16,6 +16,7 @@ package ch.dissem.bitmessage.entity; +import ch.dissem.bitmessage.Context; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.utils.Encode; @@ -26,8 +27,7 @@ import java.util.Random; /** * The 'version' command advertises this node's latest supported protocol version upon initiation. */ -public class Version implements Command { - public static final int CURRENT = 3; +public class Version implements MessagePayload { /** * Identifies protocol version being used by the node. Should equal 3. Nodes should disconnect if the remote node's * version is lower but continue with the connection if it is higher. @@ -71,6 +71,17 @@ public class Version implements Command { */ private final long[] streamNumbers; + private Version(Builder builder) { + version = builder.version; + services = builder.services; + timestamp = builder.timestamp; + addrRecv = builder.addrRecv; + addrFrom = builder.addrFrom; + nonce = builder.nonce; + userAgent = builder.userAgent; + streamNumbers = builder.streamNumbers; + } + public int getVersion() { return version; } @@ -99,24 +110,13 @@ public class Version implements Command { return userAgent; } - public long[] getStreamNumbers() { + public long[] getStreams() { return streamNumbers; } - private Version(Builder builder) { - version = builder.version; - services = builder.services; - timestamp = builder.timestamp; - addrRecv = builder.addrRecv; - addrFrom = builder.addrFrom; - nonce = builder.nonce; - userAgent = builder.userAgent; - streamNumbers = builder.streamNumbers; - } - @Override - public String getCommand() { - return "version"; + public Command getCommand() { + return Command.VERSION; } @Override @@ -146,7 +146,7 @@ public class Version implements Command { } public Builder defaults() { - version = CURRENT; + version = Context.CURRENT; services = 1; timestamp = System.currentTimeMillis() / 1000; nonce = new Random().nextInt(); diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java index a6d6bbe..a5bf939 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java @@ -40,7 +40,7 @@ class V3MessageFactory { byte[] payloadBytes = Decode.bytes(stream, length); if (testChecksum(checksum, payloadBytes)) { - Command payload = getPayload(command, new ByteArrayInputStream(payloadBytes), length); + MessagePayload payload = getPayload(command, new ByteArrayInputStream(payloadBytes), length); return new NetworkMessage(payload); } else { throw new IOException("Checksum failed for message '" + command + "'"); @@ -49,7 +49,7 @@ class V3MessageFactory { return null; } - private Command getPayload(String command, InputStream stream, int length) throws IOException { + private MessagePayload getPayload(String command, InputStream stream, int length) throws IOException { switch (command) { case "version": return parseVersion(stream); diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java b/domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java new file mode 100644 index 0000000..6469c8d --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/AddressRepository.java @@ -0,0 +1,30 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.ports; + +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; + +import java.util.List; + +/** + * Stores and provides known peers. + */ +public interface AddressRepository { + List getKnownAddresses(int limit, long... streams); + + void offerAddresses(List addresses); +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java index 7fe2eda..73584f9 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java @@ -16,6 +16,7 @@ package ch.dissem.bitmessage.ports; +import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; @@ -25,13 +26,13 @@ import java.util.List; * The Inventory stores and retrieves objects, cleans up outdated objects and can tell which objects are still missing. */ public interface Inventory { - public List getInventory(); + public List getInventory(long... streams); public List getMissing(List offer); - public ObjectPayload getObject(InventoryVector vector); + public ObjectMessage getObject(InventoryVector vector); - public void storeObject(InventoryVector vector, ObjectPayload object); + public void storeObject(ObjectMessage object); public void cleanup(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java index 530f1c8..d86ae60 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageReceiver.java @@ -17,6 +17,7 @@ package ch.dissem.bitmessage.ports; import ch.dissem.bitmessage.entity.NetworkMessage; +import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import java.io.IOException; @@ -25,11 +26,11 @@ import java.io.IOException; * Handles incoming messages */ public interface NetworkMessageReceiver { - public void registerListener(int port) throws IOException; + void registerListener(int port, MessageListener listener) throws IOException; - public void registerListener(NetworkAddress node, MessageListener listener) throws IOException; + void registerListener(NetworkAddress node, MessageListener listener) throws IOException; - public static interface MessageListener { - public void receive(NetworkMessage message); + interface MessageListener { + void receive(ObjectPayload payload); } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageSender.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageSender.java index bac6803..532f8fc 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageSender.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkMessageSender.java @@ -16,12 +16,12 @@ package ch.dissem.bitmessage.ports; -import ch.dissem.bitmessage.entity.NetworkMessage; +import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; /** * Sends messages */ public interface NetworkMessageSender { - public void send(NetworkAddress node, NetworkMessage message); + void send(NetworkAddress node, ObjectPayload payload); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/AddressFormatException.java b/domain/src/main/java/ch/dissem/bitmessage/utils/AddressFormatException.java new file mode 100644 index 0000000..2120a9e --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/AddressFormatException.java @@ -0,0 +1,26 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +/** + * Indicates an illegal Bitmessage address + */ +public class AddressFormatException extends RuntimeException { + public AddressFormatException(String message) { + super(message); + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Base58.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Base58.java index 95de3a6..658a5b1 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Base58.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Base58.java @@ -1,11 +1,12 @@ /* + * Copyright 2011 Google Inc. * Copyright 2015 Christian Basler * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,17 +17,151 @@ package ch.dissem.bitmessage.utils; +import java.io.UnsupportedEncodingException; + /** * Base58 encoder and decoder */ public class Base58 { private static char[] ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz".toCharArray(); - public static String encode(byte[] input) { - return null; // TODO + private static final int[] INDEXES = new int[128]; + + static { + for (int i = 0; i < INDEXES.length; i++) { + INDEXES[i] = -1; + } + for (int i = 0; i < ALPHABET.length; i++) { + INDEXES[ALPHABET[i]] = i; + } } - public static byte[] decode(String input) { - return null; // TODO + /** + * Encodes the given bytes in base58. No checksum is appended. + */ + public static String encode(byte[] input) { + if (input.length == 0) { + return ""; + } + input = copyOfRange(input, 0, input.length); + // Count leading zeroes. + int zeroCount = 0; + while (zeroCount < input.length && input[zeroCount] == 0) { + ++zeroCount; + } + // The actual encoding. + byte[] temp = new byte[input.length * 2]; + int j = temp.length; + + int startAt = zeroCount; + while (startAt < input.length) { + byte mod = divmod58(input, startAt); + if (input[startAt] == 0) { + ++startAt; + } + temp[--j] = (byte) ALPHABET[mod]; + } + + // Strip extra '1' if there are some after decoding. + while (j < temp.length && temp[j] == ALPHABET[0]) { + ++j; + } + // Add as many leading '1' as there were leading zeros. + while (--zeroCount >= 0) { + temp[--j] = (byte) ALPHABET[0]; + } + + byte[] output = copyOfRange(temp, j, temp.length); + try { + return new String(output, "US-ASCII"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); // Cannot happen. + } + } + + public static byte[] decode(String input) throws AddressFormatException { + if (input.length() == 0) { + return new byte[0]; + } + byte[] input58 = new byte[input.length()]; + // Transform the String to a base58 byte sequence + for (int i = 0; i < input.length(); ++i) { + char c = input.charAt(i); + + int digit58 = -1; + if (c >= 0 && c < 128) { + digit58 = INDEXES[c]; + } + if (digit58 < 0) { + throw new AddressFormatException("Illegal character " + c + " at " + i); + } + + input58[i] = (byte) digit58; + } + // Count leading zeroes + int zeroCount = 0; + while (zeroCount < input58.length && input58[zeroCount] == 0) { + ++zeroCount; + } + // The encoding + byte[] temp = new byte[input.length()]; + int j = temp.length; + + int startAt = zeroCount; + while (startAt < input58.length) { + byte mod = divmod256(input58, startAt); + if (input58[startAt] == 0) { + ++startAt; + } + + temp[--j] = mod; + } + // Do no add extra leading zeroes, move j to first non null byte. + while (j < temp.length && temp[j] == 0) { + ++j; + } + + return copyOfRange(temp, j - zeroCount, temp.length); + } + + // + // number -> number / 58, returns number % 58 + // + private static byte divmod58(byte[] number, int startAt) { + int remainder = 0; + for (int i = startAt; i < number.length; i++) { + int digit256 = (int) number[i] & 0xFF; + int temp = remainder * 256 + digit256; + + number[i] = (byte) (temp / 58); + + remainder = temp % 58; + } + + return (byte) remainder; + } + + // + // number -> number / 256, returns number % 256 + // + private static byte divmod256(byte[] number58, int startAt) { + int remainder = 0; + for (int i = startAt; i < number58.length; i++) { + int digit58 = (int) number58[i] & 0xFF; + int temp = remainder * 58 + digit58; + + number58[i] = (byte) (temp / 256); + + remainder = temp % 256; + } + + return (byte) remainder; + } + + private static byte[] copyOfRange(byte[] source, int from, int to) { + byte[] range = new byte[to - from]; + System.arraycopy(source, from, range, 0, range.length); + + return range; } } \ No newline at end of file diff --git a/networking/build.gradle b/networking/build.gradle index 53552cf..befb2d1 100644 --- a/networking/build.gradle +++ b/networking/build.gradle @@ -9,5 +9,7 @@ repositories { dependencies { compile ':domain' + compile 'com.google.guava:guava-concurrent:r03' + testCompile 'org.slf4j:slf4j-simple:1.7.12' testCompile 'junit:junit:4.11' } \ No newline at end of file diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java new file mode 100644 index 0000000..58c1e97 --- /dev/null +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -0,0 +1,189 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking; + +import ch.dissem.bitmessage.Context; +import ch.dissem.bitmessage.entity.*; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.factory.Factory; +import ch.dissem.bitmessage.ports.NetworkMessageReceiver.MessageListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; + +import static ch.dissem.bitmessage.networking.Connection.State.*; + +/** + * A connection to a specific node + */ +public class Connection implements Runnable { + private final static Logger LOG = LoggerFactory.getLogger(Connection.class); + + private Context ctx; + + private State state; + private Socket socket; + private InputStream in; + private OutputStream out; + private MessageListener listener; + + private int version; + private long[] streams; + + private NetworkAddress host; + private NetworkAddress node; + + private Queue sendingQueue = new ConcurrentLinkedDeque<>(); + + public Connection(State state, Socket socket, MessageListener listener) throws IOException { + this.ctx = Context.getInstance(); + this.state = state; + this.socket = socket; + this.in = socket.getInputStream(); + this.out = socket.getOutputStream(); + this.listener = listener; + this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); + this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).build(); + } + + @Override + public void run() { + if (state == CLIENT) { + send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + } + while (state != DISCONNECTED) { + try { + NetworkMessage msg = Factory.getNetworkMessage(version, in); + switch (state) { + case ACTIVE: + receiveMessage(msg.getPayload()); + break; + + default: + switch (msg.getPayload().getCommand()) { + case VERSION: + Version payload = (Version) msg.getPayload(); + if (payload.getVersion() >= Context.CURRENT_VERSION) { + this.version = payload.getVersion(); + this.streams = payload.getStreams(); + send(new VerAck()); + if (state == SERVER) { + state = ACTIVE; + } + } else { + disconnect(); + } + break; + case VERACK: + if (state == CLIENT) { + sendAddresses(); + sendInventory(); + + state = ACTIVE; + } else { + send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + } + break; + default: + throw new RuntimeException("Command 'version' or 'verack' expected, but was " + + msg.getPayload().getCommand()); + } + } + } catch (SocketTimeoutException e) { + if (state == ACTIVE) { + for (MessagePayload msg = sendingQueue.poll(); msg != null; msg = sendingQueue.poll()) { + send(msg); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private void receiveMessage(MessagePayload messagePayload) { + switch (messagePayload.getCommand()) { + case INV: + Inv inv = (Inv) messagePayload; + List missing = ctx.getInventory().getMissing(inv.getInventory()); + send(new GetData.Builder().inventory(missing).build()); + break; + case GETDATA: + GetData getData = (GetData) messagePayload; + for (InventoryVector iv : getData.getInventory()) { + ObjectMessage om = ctx.getInventory().getObject(iv); + sendingQueue.offer(om); + } + break; + case OBJECT: + ObjectMessage objectMessage = (ObjectMessage) messagePayload; + ctx.getInventory().storeObject(objectMessage); + listener.receive(objectMessage.getPayload()); + break; + case ADDR: + Addr addr = (Addr) messagePayload; + ctx.getAddressRepository().offerAddresses(addr.getAddresses()); + break; + case VERACK: + case VERSION: + throw new RuntimeException("Unexpectedly received '" + messagePayload.getCommand() + "' command"); + } + } + + private void sendAddresses() { + List addresses = ctx.getAddressRepository().getKnownAddresses(1000, streams); + send(new Addr.Builder().addresses(addresses).build()); + } + + private void sendInventory() { + List inventory = ctx.getInventory().getInventory(streams); + for (int i = 0; i < inventory.size(); i += 50000) { + sendingQueue.offer(new Inv.Builder() + .inventory(inventory.subList(i, Math.min(inventory.size(), i + 50000))) + .build()); + } + } + + private void disconnect() { + try { + state = DISCONNECTED; + socket.close(); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } + } + + private void send(MessagePayload payload) { + try { + new NetworkMessage(payload).write(out); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + disconnect(); + } + } + + public enum State {SERVER, CLIENT, ACTIVE, DISCONNECTED} +} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java index dc21c4a..11b8f07 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java @@ -22,69 +22,68 @@ import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.NetworkMessageReceiver; import ch.dissem.bitmessage.ports.NetworkMessageSender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; + +import static ch.dissem.bitmessage.networking.Connection.State.CLIENT; +import static ch.dissem.bitmessage.networking.Connection.State.SERVER; /** * Handles all the networky stuff. */ public class NetworkNode implements NetworkMessageSender, NetworkMessageReceiver { - private final BlockingQueue sendingQueue = new LinkedBlockingQueue<>(); - private final ExecutorService pool; - - private final Map sockets = new HashMap<>(); - private final Map versions = new HashMap<>(); - + private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class); /** * This is only to be used where it's ignored */ private final static NetworkAddress LOCALHOST = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build(); + private final ExecutorService pool; public NetworkNode() { pool = Executors.newCachedThreadPool(); - new Thread(new Runnable() { - @Override - public void run() { - while (true) { - try { - NetworkMessage message = sendingQueue.take(); - Socket socket = getSocket(message.getTargetNode()); - message.write(socket.getOutputStream()); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - }, "Sender"); + // TODO: sending +// Thread sender = new Thread(new Runnable() { +// @Override +// public void run() { +// while (true) { +// try { +// NetworkMessage message = sendingQueue.take(); +// +// try (Socket socket = getSocket(message.getTargetNode())) { +// message.write(socket.getOutputStream()); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } catch (InterruptedException e) { +// // Ignore? +// } +// } +// } +// }, "Sender"); +// sender.setDaemon(true); +// sender.start(); } @Override - public void registerListener(final int port) throws IOException { + public void registerListener(final int port, final MessageListener listener) throws IOException { final ServerSocket serverSocket = new ServerSocket(port); pool.execute(new Runnable() { @Override public void run() { + NetworkAddress address = null; try { Socket socket = serverSocket.accept(); socket.setSoTimeout(20000); - // FIXME: addd to sockets - registerListener(getVersion(null), socket, new MessageListener() { - @Override - public void receive(NetworkMessage message) { - // TODO - } - }); + pool.execute(new Connection(SERVER, socket, listener)); } catch (IOException e) { - e.printStackTrace(); + LOG.debug(e.getMessage(), e); } } }); @@ -92,60 +91,11 @@ public class NetworkNode implements NetworkMessageSender, NetworkMessageReceiver @Override public void registerListener(final NetworkAddress node, final MessageListener listener) throws IOException { - final Socket socket = getSocket(node); - final int version = getVersion(node); - sendVersion(node); - pool.execute(new Runnable() { - @Override - public void run() { - try { - registerListener(version, socket, listener); - } catch (IOException e) { - e.printStackTrace(); - } - } - }); - } - - private void sendVersion(NetworkAddress node) { - send(node, new NetworkMessage(node, new Version.Builder().defaults().addrFrom(LOCALHOST).addrRecv(node).build())); - } - - private void registerListener(int version, Socket socket, MessageListener listener) throws IOException { - NetworkMessage message = Factory.getNetworkMessage(version, socket.getInputStream()); - if (message.getPayload() instanceof Version) { - version = ((Version) message.getPayload()).getVersion(); - synchronized (versions) { - - versions.put(new NetworkAddress.Builder() - .ip(socket.getInetAddress()) - .port(socket.getPort()) - .build(), version); - } - } - listener.receive(message); + pool.execute(new Connection(CLIENT, new Socket(node.toInetAddress(), node.getPort()), listener)); } @Override public void send(final NetworkAddress node, final NetworkMessage message) { - sendingQueue.add(message); - } - - private Socket getSocket(NetworkAddress node) throws IOException { - synchronized (sockets) { - Socket socket = sockets.get(node); - if (socket == null) { - socket = new Socket(node.toInetAddress(), node.getPort()); - sockets.put(node, socket); - } - return socket; - } - } - - private synchronized int getVersion(NetworkAddress node) { - synchronized (versions) { - Integer version = versions.get(node); - return version == null ? 3 : version; - } + // TODO: sendingQueue.add(message); } } diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java index 1d98b37..163e1f7 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java @@ -18,6 +18,7 @@ package ch.dissem.bitmessage.networking; import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.Version; +import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.ports.NetworkMessageReceiver; import org.junit.Test; @@ -34,12 +35,12 @@ public class NetworkNodeTest { NetworkNode net = new NetworkNode(); net.registerListener(localhost, new NetworkMessageReceiver.MessageListener() { @Override - public void receive(NetworkMessage message) { - System.out.println(message); + public void receive(ObjectPayload payload) { + System.out.println(payload); baseThread.interrupt(); } }); - NetworkMessage ver = new NetworkMessage(localhost, + NetworkMessage ver = new NetworkMessage( new Version.Builder() .version(3) .services(1) @@ -49,7 +50,8 @@ public class NetworkNodeTest { .nonce(-1) .userAgent("Test") .streams(1, 2) - .build()); + .build() + ); net.send(localhost, ver); Thread.sleep(20000); }