From cde4f7b3ce50e5649b6b60e4fdd644d8bd43a281 Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Wed, 1 Jun 2016 17:38:49 +0200 Subject: [PATCH] Some refactoring to move some common code into an AbstractConnection --- .../entity/valueobject/NetworkAddress.java | 13 + .../bitmessage/factory/V3MessageFactory.java | 2 +- .../bitmessage/factory/V3MessageReader.java | 143 ++++++++ .../ports/AbstractCryptography.java | 6 + .../dissem/bitmessage/ports/Cryptography.java | 12 + .../bitmessage/ports/NetworkHandler.java | 10 + .../ch/dissem/bitmessage/utils/Decode.java | 4 + .../networking/AbstractConnection.java | 318 ++++++++++++++++++ .../bitmessage/networking/Connection.java | 281 ++-------------- .../networking/ConnectionOrganizer.java | 8 +- .../networking/DefaultNetworkHandler.java | 11 +- .../bitmessage/networking/ServerRunnable.java | 8 +- .../networking/nio/ConnectionInfo.java | 53 ++- .../networking/nio/NioNetworkHandler.java | 210 +++++++++--- .../networking/NetworkHandlerTest.java | 9 +- 15 files changed, 748 insertions(+), 340 deletions(-) create mode 100644 core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java create mode 100644 networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java index 0fef152..a496d19 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java @@ -24,6 +24,8 @@ import ch.dissem.bitmessage.utils.UnixTime; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -215,6 +217,17 @@ public class NetworkAddress implements Streamable { return this; } + public Builder address(SocketAddress address) { + if (address instanceof InetSocketAddress) { + InetSocketAddress inetAddress = (InetSocketAddress) address; + ip(inetAddress.getAddress()); + port(inetAddress.getPort()); + } else { + throw new IllegalArgumentException("Unknown type of address: " + address.getClass()); + } + return this; + } + public NetworkAddress build() { if (time == 0) { time = UnixTime.now(); diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java index af9839f..478d77c 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java @@ -62,7 +62,7 @@ class V3MessageFactory { } } - private static MessagePayload getPayload(String command, InputStream stream, int length) throws IOException { + static MessagePayload getPayload(String command, InputStream stream, int length) throws IOException { switch (command) { case "version": return parseVersion(stream); diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java new file mode 100644 index 0000000..7006cdb --- /dev/null +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java @@ -0,0 +1,143 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.factory; + +import ch.dissem.bitmessage.entity.MessagePayload; +import ch.dissem.bitmessage.entity.NetworkMessage; +import ch.dissem.bitmessage.exception.ApplicationException; +import ch.dissem.bitmessage.exception.NodeException; +import ch.dissem.bitmessage.utils.Decode; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + +import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES; +import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE; +import static ch.dissem.bitmessage.utils.Singleton.cryptography; + +/** + * Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message. + */ +public class V3MessageReader { + private ReaderState state = ReaderState.MAGIC; + private String command; + private int length; + private byte[] checksum; + + private List messages = new LinkedList<>(); + + public void update(ByteBuffer buffer) { + while (buffer.hasRemaining()) { + switch (state) { + case MAGIC: + if (!findMagicBytes(buffer)) return; + state = ReaderState.HEADER; + case HEADER: + if (buffer.remaining() < 20) { + buffer.compact(); + return; + } + command = getCommand(buffer); + length = (int) Decode.uint32(buffer); + if (length > MAX_PAYLOAD_SIZE) { + throw new NodeException("Payload of " + length + " bytes received, no more than 1600003 was expected."); + } + checksum = new byte[4]; + buffer.get(checksum); + state = ReaderState.DATA; + if (buffer.remaining() < length) { + // We need to compact the buffer to make sure the message fits even if it's really big. + buffer.compact(); + } + case DATA: + if (buffer.remaining() < length) return; + if (!testChecksum(buffer)) { + throw new NodeException("Checksum failed for message '" + command + "'"); + } + try { + MessagePayload payload = V3MessageFactory.getPayload( + command, + new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + buffer.position(), length), + length); + if (payload != null) { + messages.add(new NetworkMessage(payload)); + } + } catch (IOException e) { + throw new NodeException(e.getMessage()); + } + state = ReaderState.MAGIC; + } + } + } + + public List getMessages() { + return messages; + } + + private boolean findMagicBytes(ByteBuffer buffer) { + int i = 0; + while (buffer.hasRemaining()) { + if (buffer.get() == MAGIC_BYTES[i]) { + buffer.mark(); + i++; + if (i == MAGIC_BYTES.length) return true; + } else { + i = 0; + } + } + if (i > 0) { + buffer.reset(); + buffer.compact(); + } else { + buffer.clear(); + } + return false; + } + + private static String getCommand(ByteBuffer buffer) { + int start = buffer.position(); + int i = 0; + while (i < 12 && buffer.get() != 0) i++; + int end = start + i; + while (i < 12) { + if (buffer.get() != 0) throw new NodeException("'\\0' padding expected for command"); + i++; + } + try { + return new String(buffer.array(), start, end, "ASCII"); + } catch (UnsupportedEncodingException e) { + throw new ApplicationException(e); + } + } + + private boolean testChecksum(ByteBuffer buffer) { + byte[] payloadChecksum = cryptography().sha512(buffer.array(), + buffer.arrayOffset() + buffer.position(), length); + for (int i = 0; i < checksum.length; i++) { + if (checksum[i] != payloadChecksum[i]) { + return false; + } + } + return true; + } + + private enum ReaderState {MAGIC, HEADER, DATA} +} diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.java b/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.java index 3b08377..f67b6d4 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.java @@ -61,6 +61,12 @@ public abstract class AbstractCryptography implements Cryptography, InternalCont this.context = context; } + public byte[] sha512(byte[] data, int offset, int length) { + MessageDigest mda = md("SHA-512"); + mda.update(data, offset, length); + return mda.digest(); + } + public byte[] sha512(byte[]... data) { return hash("SHA-512", data); } diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/Cryptography.java b/core/src/main/java/ch/dissem/bitmessage/ports/Cryptography.java index 48739ea..9ea6a9d 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/Cryptography.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/Cryptography.java @@ -30,6 +30,18 @@ import java.security.SecureRandom; * which should be secure enough. */ public interface Cryptography { + /** + * A helper method to calculate SHA-512 hashes. Please note that a new {@link MessageDigest} object is created at + * each call (to ensure thread safety), so you shouldn't use this if you need to do many hash calculations in + * success on the same thread. + * + * @param data to get hashed + * @param offset of the data to be hashed + * @param length of the data to be hashed + * @return SHA-512 hash of data within the given range + */ + byte[] sha512(byte[] data, int offset, int length); + /** * A helper method to calculate SHA-512 hashes. Please note that a new {@link MessageDigest} object is created at * each call (to ensure thread safety), so you shouldn't use this if you need to do many hash calculations in diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index bfbb48c..96dec8e 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -23,6 +23,7 @@ import ch.dissem.bitmessage.utils.Property; import java.io.IOException; import java.net.InetAddress; +import java.util.Collection; import java.util.concurrent.Future; /** @@ -30,6 +31,8 @@ import java.util.concurrent.Future; */ public interface NetworkHandler { int NETWORK_MAGIC_NUMBER = 8; + int MAX_PAYLOAD_SIZE = 1600003; + int MAX_MESSAGE_SIZE = 24 + MAX_PAYLOAD_SIZE; /** * Connects to the trusted host, fetches and offers new messages and disconnects afterwards. @@ -65,6 +68,13 @@ public interface NetworkHandler { */ void offer(InventoryVector iv); + /** + * Request each of those objects from a node that knows of the requested object. + * + * @param inventoryVectors of the objects to be requested + */ + void request(Collection inventoryVectors); + Property getNetworkStatus(); boolean isRunning(); diff --git a/core/src/main/java/ch/dissem/bitmessage/utils/Decode.java b/core/src/main/java/ch/dissem/bitmessage/utils/Decode.java index 47b0ee3..fb2f3c8 100644 --- a/core/src/main/java/ch/dissem/bitmessage/utils/Decode.java +++ b/core/src/main/java/ch/dissem/bitmessage/utils/Decode.java @@ -111,6 +111,10 @@ public class Decode { return stream.read() * 16777216L + stream.read() * 65536L + stream.read() * 256L + stream.read(); } + public static long uint32(ByteBuffer buffer) { + return buffer.get() * 16777216L + buffer.get() * 65536L + buffer.get() * 256L + buffer.get(); + } + public static int int32(InputStream stream) throws IOException { return int32(stream, null); } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java new file mode 100644 index 0000000..3027f04 --- /dev/null +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -0,0 +1,318 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking; + +import ch.dissem.bitmessage.BitmessageContext; +import ch.dissem.bitmessage.InternalContext; +import ch.dissem.bitmessage.entity.*; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException; +import ch.dissem.bitmessage.exception.NodeException; +import ch.dissem.bitmessage.ports.NetworkHandler; +import ch.dissem.bitmessage.utils.UnixTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; + +import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES; +import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; +import static ch.dissem.bitmessage.networking.AbstractConnection.State.*; +import static ch.dissem.bitmessage.utils.Singleton.cryptography; +import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; + +/** + * Contains everything used by both the old streams-oriented NetworkHandler and the new NioNetworkHandler, + * respectively their connection objects. + */ +public abstract class AbstractConnection { + private static final Logger LOG = LoggerFactory.getLogger(AbstractConnection.class); + protected final InternalContext ctx; + protected final Mode mode; + protected final NetworkAddress host; + protected final NetworkAddress node; + protected final NetworkHandler.MessageListener listener; + protected final Map ivCache; + protected final Deque sendingQueue; + protected final Set commonRequestedObjects; + protected final Set requestedObjects; + + protected volatile State state; + protected long lastObjectTime; + + protected long peerNonce; + protected int version; + protected long[] streams; + + public AbstractConnection(InternalContext context, Mode mode, + NetworkAddress node, + NetworkHandler.MessageListener listener, + Set commonRequestedObjects, + boolean threadsafe) { + this.ctx = context; + this.mode = mode; + this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); + this.node = node; + this.listener = listener; + if (threadsafe) { + this.ivCache = new ConcurrentHashMap<>(); + this.sendingQueue = new ConcurrentLinkedDeque<>(); + this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap(10_000)); + } else { + this.ivCache = new HashMap<>(); + this.sendingQueue = new LinkedList<>(); + this.requestedObjects = new HashSet<>(); + } + this.state = CONNECTING; + this.commonRequestedObjects = commonRequestedObjects; + } + + public Mode getMode() { + return mode; + } + + public NetworkAddress getNode() { + return node; + } + + public State getState() { + return state; + } + + protected void handleMessage(MessagePayload payload) { + switch (state) { + case ACTIVE: + receiveMessage(payload); + break; + + default: + handleCommand(payload); + break; + } + } + + private void receiveMessage(MessagePayload messagePayload) { + switch (messagePayload.getCommand()) { + case INV: + receiveMessage((Inv) messagePayload); + break; + case GETDATA: + receiveMessage((GetData) messagePayload); + break; + case OBJECT: + receiveMessage((ObjectMessage) messagePayload); + break; + case ADDR: + receiveMessage((Addr) messagePayload); + break; + case CUSTOM: + case VERACK: + case VERSION: + default: + throw new IllegalStateException("Unexpectedly received '" + messagePayload.getCommand() + "' command"); + } + } + + private void receiveMessage(Inv inv) { + int originalSize = inv.getInventory().size(); + updateIvCache(inv.getInventory()); + List missing = ctx.getInventory().getMissing(inv.getInventory(), streams); + missing.removeAll(commonRequestedObjects); + LOG.debug("Received inventory with " + originalSize + " elements, of which are " + + missing.size() + " missing."); + send(new GetData.Builder().inventory(missing).build()); + } + + private void receiveMessage(GetData getData) { + for (InventoryVector iv : getData.getInventory()) { + ObjectMessage om = ctx.getInventory().getObject(iv); + if (om != null) sendingQueue.offer(om); + } + } + + private void receiveMessage(ObjectMessage objectMessage) { + requestedObjects.remove(objectMessage.getInventoryVector()); + if (ctx.getInventory().contains(objectMessage)) { + LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); + return; + } + try { + listener.receive(objectMessage); + cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES); + ctx.getInventory().storeObject(objectMessage); + // offer object to some random nodes so it gets distributed throughout the network: + ctx.getNetworkHandler().offer(objectMessage.getInventoryVector()); + lastObjectTime = UnixTime.now(); + } catch (InsufficientProofOfWorkException e) { + LOG.warn(e.getMessage()); + // DebugUtils.saveToFile(objectMessage); // this line must not be committed active + } catch (IOException e) { + LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); + } finally { + if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) { + LOG.debug("Received object that wasn't requested."); + } + } + } + + private void receiveMessage(Addr addr) { + LOG.debug("Received " + addr.getAddresses().size() + " addresses."); + ctx.getNodeRegistry().offerAddresses(addr.getAddresses()); + } + + private void updateIvCache(List inventory) { + cleanupIvCache(); + Long now = UnixTime.now(); + for (InventoryVector iv : inventory) { + ivCache.put(iv, now); + } + } + + public void offer(InventoryVector iv) { + sendingQueue.offer(new Inv.Builder() + .addInventoryVector(iv) + .build()); + updateIvCache(Collections.singletonList(iv)); + } + + public boolean knowsOf(InventoryVector iv) { + return ivCache.containsKey(iv); + } + + protected void cleanupIvCache() { + Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); + for (Map.Entry entry : ivCache.entrySet()) { + if (entry.getValue() < fiveMinutesAgo) { + ivCache.remove(entry.getKey()); + } + } + } + + private void handleCommand(MessagePayload payload) { + switch (payload.getCommand()) { + case VERSION: + handleVersion((Version) payload); + break; + case VERACK: + switch (mode) { + case SERVER: + activateConnection(); + break; + case CLIENT: + case SYNC: + default: + // NO OP + break; + } + break; + case CUSTOM: + MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload); + if (response != null) { + send(response); + } + disconnect(); + break; + default: + throw new NodeException("Command 'version' or 'verack' expected, but was '" + + payload.getCommand() + "'"); + } + } + + protected void activateConnection() { + LOG.info("Successfully established connection with node " + node); + state = ACTIVE; + node.setTime(UnixTime.now()); + if (mode != SYNC) { + sendAddresses(); + ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node)); + } + sendInventory(); + } + + private void sendAddresses() { + List addresses = ctx.getNodeRegistry().getKnownAddresses(1000, streams); + sendingQueue.offer(new Addr.Builder().addresses(addresses).build()); + } + + private void sendInventory() { + List inventory = ctx.getInventory().getInventory(streams); + for (int i = 0; i < inventory.size(); i += 50000) { + sendingQueue.offer(new Inv.Builder() + .inventory(inventory.subList(i, Math.min(inventory.size(), i + 50000))) + .build()); + } + } + + private void handleVersion(Version version) { + if (version.getNonce() == ctx.getClientNonce()) { + LOG.info("Tried to connect to self, disconnecting."); + disconnect(); + } else if (version.getVersion() >= BitmessageContext.CURRENT_VERSION) { + this.peerNonce = version.getNonce(); + if (peerNonce == ctx.getClientNonce()) disconnect(); + + this.version = version.getVersion(); + this.streams = version.getStreams(); + send(new VerAck()); + switch (mode) { + case SERVER: + send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build()); + break; + case CLIENT: + case SYNC: + activateConnection(); + break; + default: + // NO OP + } + } else { + LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting."); + disconnect(); + } + } + + public void disconnect() { + state = DISCONNECTED; + + // Make sure objects that are still missing are requested from other nodes + ctx.getNetworkHandler().request(requestedObjects); + } + + protected abstract void send(MessagePayload payload); + + public enum Mode {SERVER, CLIENT, SYNC} + + public enum State {CONNECTING, ACTIVE, DISCONNECTED} + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AbstractConnection that = (AbstractConnection) o; + return Objects.equals(node, that.node); + } + + @Override + public int hashCode() { + return Objects.hash(node); + } +} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 9ed19bc..3fe1b62 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -16,13 +16,13 @@ package ch.dissem.bitmessage.networking; -import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.*; +import ch.dissem.bitmessage.entity.GetData; +import ch.dissem.bitmessage.entity.MessagePayload; +import ch.dissem.bitmessage.entity.NetworkMessage; +import ch.dissem.bitmessage.entity.Version; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException; -import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener; import ch.dissem.bitmessage.utils.UnixTime; @@ -36,94 +36,62 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentMap; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; -import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES; -import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE; -import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; -import static ch.dissem.bitmessage.networking.Connection.Mode.SYNC; -import static ch.dissem.bitmessage.networking.Connection.State.*; -import static ch.dissem.bitmessage.utils.Singleton.cryptography; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; +import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; +import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED; import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; /** * A connection to a specific node */ -class Connection { +class Connection extends AbstractConnection { public static final int READ_TIMEOUT = 2000; private static final Logger LOG = LoggerFactory.getLogger(Connection.class); private static final int CONNECT_TIMEOUT = 5000; private final long startTime; - private final ConcurrentMap ivCache; - private final InternalContext ctx; - private final Mode mode; private final Socket socket; - private final MessageListener listener; - private final NetworkAddress host; - private final NetworkAddress node; - private final Queue sendingQueue = new ConcurrentLinkedDeque<>(); - private final Set commonRequestedObjects; - private final Set requestedObjects; private final long syncTimeout; private final ReaderRunnable reader = new ReaderRunnable(); private final WriterRunnable writer = new WriterRunnable(); - private final DefaultNetworkHandler networkHandler; - private final long clientNonce; - private volatile State state; private InputStream in; private OutputStream out; - private int version; - private long[] streams; private int readTimeoutCounter; private boolean socketInitialized; - private long lastObjectTime; public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, - Set requestedObjectsMap, long clientNonce) throws IOException { + Set requestedObjectsMap) throws IOException { this(context, mode, listener, socket, requestedObjectsMap, - Collections.newSetFromMap(new ConcurrentHashMap(10_000)), new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), - 0, clientNonce); + 0); } public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, - Set requestedObjectsMap, long clientNonce) { + Set requestedObjectsMap) { this(context, mode, listener, new Socket(), requestedObjectsMap, - Collections.newSetFromMap(new ConcurrentHashMap(10_000)), - node, 0, clientNonce); + node, 0); } private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, - Set commonRequestedObjects, Set requestedObjects, - NetworkAddress node, long syncTimeout, long clientNonce) { + Set commonRequestedObjects, NetworkAddress node, long syncTimeout) { + super(context, mode, node, listener, commonRequestedObjects, true); this.startTime = UnixTime.now(); - this.ctx = context; - this.mode = mode; - this.state = CONNECTING; - this.listener = listener; this.socket = socket; - this.commonRequestedObjects = commonRequestedObjects; - this.requestedObjects = requestedObjects; - this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); - this.node = node; this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); - this.ivCache = new ConcurrentHashMap<>(); - this.networkHandler = (DefaultNetworkHandler) ctx.getNetworkHandler(); - this.clientNonce = clientNonce; } public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, long timeoutInSeconds) throws IOException { return new Connection(ctx, SYNC, listener, new Socket(address, port), - new HashSet(), new HashSet(), new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), - timeoutInSeconds, cryptography().randomNonce()); + timeoutInSeconds); } public long getStartTime() { @@ -169,133 +137,8 @@ class Connection { } } - private void activateConnection() { - LOG.info("Successfully established connection with node " + node); - state = ACTIVE; - if (mode != SYNC) { - sendAddresses(); - ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node)); - } - sendInventory(); - node.setTime(UnixTime.now()); - } - - private void cleanupIvCache() { - Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); - for (Map.Entry entry : ivCache.entrySet()) { - if (entry.getValue() < fiveMinutesAgo) { - ivCache.remove(entry.getKey()); - } - } - } - - private void updateIvCache(InventoryVector... inventory) { - cleanupIvCache(); - Long now = UnixTime.now(); - for (InventoryVector iv : inventory) { - ivCache.put(iv, now); - } - } - - private void updateIvCache(List inventory) { - cleanupIvCache(); - Long now = UnixTime.now(); - for (InventoryVector iv : inventory) { - ivCache.put(iv, now); - } - } - - private void receiveMessage(MessagePayload messagePayload) { - switch (messagePayload.getCommand()) { - case INV: - receiveMessage((Inv) messagePayload); - break; - case GETDATA: - receiveMessage((GetData) messagePayload); - break; - case OBJECT: - receiveMessage((ObjectMessage) messagePayload); - break; - case ADDR: - receiveMessage((Addr) messagePayload); - break; - case CUSTOM: - case VERACK: - case VERSION: - default: - throw new IllegalStateException("Unexpectedly received '" + messagePayload.getCommand() + "' command"); - } - } - - private void receiveMessage(Inv inv) { - int originalSize = inv.getInventory().size(); - updateIvCache(inv.getInventory()); - List missing = ctx.getInventory().getMissing(inv.getInventory(), streams); - missing.removeAll(commonRequestedObjects); - LOG.debug("Received inventory with " + originalSize + " elements, of which are " - + missing.size() + " missing."); - send(new GetData.Builder().inventory(missing).build()); - } - - private void receiveMessage(GetData getData) { - for (InventoryVector iv : getData.getInventory()) { - ObjectMessage om = ctx.getInventory().getObject(iv); - if (om != null) sendingQueue.offer(om); - } - } - - private void receiveMessage(ObjectMessage objectMessage) { - requestedObjects.remove(objectMessage.getInventoryVector()); - if (ctx.getInventory().contains(objectMessage)) { - LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); - return; - } - try { - listener.receive(objectMessage); - cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES); - ctx.getInventory().storeObject(objectMessage); - // offer object to some random nodes so it gets distributed throughout the network: - networkHandler.offer(objectMessage.getInventoryVector()); - lastObjectTime = UnixTime.now(); - } catch (InsufficientProofOfWorkException e) { - LOG.warn(e.getMessage()); - // DebugUtils.saveToFile(objectMessage); // this line must not be committed active - } catch (IOException e) { - LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); - } finally { - if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) { - LOG.debug("Received object that wasn't requested."); - } - } - } - - private void receiveMessage(Addr addr) { - LOG.debug("Received " + addr.getAddresses().size() + " addresses."); - ctx.getNodeRegistry().offerAddresses(addr.getAddresses()); - } - - private void sendAddresses() { - List addresses = ctx.getNodeRegistry().getKnownAddresses(1000, streams); - sendingQueue.offer(new Addr.Builder().addresses(addresses).build()); - } - - private void sendInventory() { - List inventory = ctx.getInventory().getInventory(streams); - for (int i = 0; i < inventory.size(); i += 50000) { - sendingQueue.offer(new Inv.Builder() - .inventory(inventory.subList(i, Math.min(inventory.size(), i + 50000))) - .build()); - } - } - - public void disconnect() { - state = DISCONNECTED; - - // Make sure objects that are still missing are requested from other nodes - networkHandler.request(requestedObjects); - } - - void send(MessagePayload payload) { + @Override + protected void send(MessagePayload payload) { try { if (payload instanceof GetData) { requestedObjects.addAll(((GetData) payload).getInventory()); @@ -309,17 +152,6 @@ class Connection { } } - public void offer(InventoryVector iv) { - sendingQueue.offer(new Inv.Builder() - .addInventoryVector(iv) - .build()); - updateIvCache(iv); - } - - public boolean knowsOf(InventoryVector iv) { - return ivCache.containsKey(iv); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -354,18 +186,13 @@ class Connection { return writer; } - public enum Mode {SERVER, CLIENT, SYNC} - - public enum State {CONNECTING, ACTIVE, DISCONNECTED} - public class ReaderRunnable implements Runnable { @Override public void run() { - lastObjectTime = 0; try (Socket socket = Connection.this.socket) { initSocket(socket); if (mode == CLIENT || mode == SYNC) { - send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build()); + send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build()); } while (state != DISCONNECTED) { if (mode != SYNC) { @@ -394,75 +221,13 @@ class Connection { NetworkMessage msg = Factory.getNetworkMessage(version, in); if (msg == null) return; - switch (state) { - case ACTIVE: - receiveMessage(msg.getPayload()); - break; - - default: - handleCommand(msg.getPayload()); - break; - } + handleMessage(msg.getPayload()); if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect(); } catch (SocketTimeoutException ignore) { if (state == ACTIVE && syncFinished(null)) disconnect(); } } - private void handleCommand(MessagePayload payload) { - switch (payload.getCommand()) { - case VERSION: - handleVersion((Version) payload); - break; - case VERACK: - switch (mode) { - case SERVER: - activateConnection(); - break; - case CLIENT: - case SYNC: - default: - // NO OP - break; - } - break; - case CUSTOM: - MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload); - if (response != null) { - send(response); - } - disconnect(); - break; - default: - throw new NodeException("Command 'version' or 'verack' expected, but was '" - + payload.getCommand() + "'"); - } - } - - private void handleVersion(Version version) { - if (version.getNonce() == ctx.getClientNonce()) { - LOG.info("Tried to connect to self, disconnecting."); - disconnect(); - } else if (version.getVersion() >= BitmessageContext.CURRENT_VERSION) { - Connection.this.version = version.getVersion(); - streams = version.getStreams(); - send(new VerAck()); - switch (mode) { - case SERVER: - send(new Version.Builder().defaults(clientNonce).addrFrom(host).addrRecv(node).build()); - break; - case CLIENT: - case SYNC: - activateConnection(); - break; - default: - // NO OP - } - } else { - LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting."); - disconnect(); - } - } } private boolean checkOpenRequests() { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java index c25a31c..5c976e2 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java @@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.List; -import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.DefaultNetworkHandler.NETWORK_MAGIC_NUMBER; /** @@ -38,17 +38,15 @@ public class ConnectionOrganizer implements Runnable { private final InternalContext ctx; private final DefaultNetworkHandler networkHandler; private final NetworkHandler.MessageListener listener; - private final long clientNonce; private Connection initialConnection; public ConnectionOrganizer(InternalContext ctx, DefaultNetworkHandler networkHandler, - NetworkHandler.MessageListener listener, long clientNonce) { + NetworkHandler.MessageListener listener) { this.ctx = ctx; this.networkHandler = networkHandler; this.listener = listener; - this.clientNonce = clientNonce; } @Override @@ -94,7 +92,7 @@ public class ConnectionOrganizer implements Runnable { boolean first = active == 0 && initialConnection == null; for (NetworkAddress address : addresses) { Connection c = new Connection(ctx, CLIENT, address, listener, - networkHandler.requestedObjects, clientNonce); + networkHandler.requestedObjects); if (first) { initialConnection = c; first = false; diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index 2b06373..c4a4554 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -35,8 +35,8 @@ import java.net.Socket; import java.util.*; import java.util.concurrent.*; -import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; -import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; +import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.util.Collections.newSetFromMap; @@ -107,9 +107,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { try { running = true; connections.clear(); - server = new ServerRunnable(ctx, this, listener, ctx.getClientNonce()); + server = new ServerRunnable(ctx, this, listener); pool.execute(server); - pool.execute(new ConnectionOrganizer(ctx, this, listener, ctx.getClientNonce())); + pool.execute(new ConnectionOrganizer(ctx, this, listener)); } catch (IOException e) { throw new ApplicationException(e); } @@ -198,7 +198,8 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { ); } - void request(Set inventoryVectors) { + @Override + public void request(Collection inventoryVectors) { if (!running || inventoryVectors.isEmpty()) return; Map> distribution = new HashMap<>(); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java index 5a866b9..99b6a88 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; /** * @author Christian Basler @@ -37,15 +37,13 @@ public class ServerRunnable implements Runnable, Closeable { private final ServerSocket serverSocket; private final DefaultNetworkHandler networkHandler; private final NetworkHandler.MessageListener listener; - private final long clientNonce; public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, - NetworkHandler.MessageListener listener, long clientNonce) throws IOException { + NetworkHandler.MessageListener listener) throws IOException { this.ctx = ctx; this.networkHandler = networkHandler; this.listener = listener; this.serverSocket = new ServerSocket(ctx.getPort()); - this.clientNonce = clientNonce; } @Override @@ -55,7 +53,7 @@ public class ServerRunnable implements Runnable, Closeable { Socket socket = serverSocket.accept(); socket.setSoTimeout(Connection.READ_TIMEOUT); networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener, - networkHandler.requestedObjects, clientNonce)); + networkHandler.requestedObjects)); } catch (IOException e) { LOG.debug(e.getMessage(), e); } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index 3361693..223722a 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -16,25 +16,43 @@ package ch.dissem.bitmessage.networking.nio; +import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.entity.MessagePayload; +import ch.dissem.bitmessage.entity.NetworkMessage; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.factory.V3MessageReader; +import ch.dissem.bitmessage.networking.AbstractConnection; +import ch.dissem.bitmessage.ports.NetworkHandler; import java.nio.ByteBuffer; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedDeque; +import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_MESSAGE_SIZE; + /** - * Created by chrig on 27.05.2016. + * Represents the current state of a connection. */ -public class ConnectionInfo { - private State state; - private final Queue sendingQueue = new ConcurrentLinkedDeque<>(); - private ByteBuffer in = ByteBuffer.allocate(10); - private ByteBuffer out = ByteBuffer.allocate(10); +public class ConnectionInfo extends AbstractConnection { + private ByteBuffer in = ByteBuffer.allocate(MAX_MESSAGE_SIZE); + private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE); + private V3MessageReader reader = new V3MessageReader(); + + public ConnectionInfo(InternalContext context, Mode mode, + NetworkAddress node, NetworkHandler.MessageListener listener, + Set commonRequestedObjects) { + super(context, mode, node, listener, commonRequestedObjects, false); + } public State getState() { return state; } + public boolean knowsOf(InventoryVector iv) { + return ivCache.containsKey(iv); + } + public Queue getSendingQueue() { return sendingQueue; } @@ -47,5 +65,24 @@ public class ConnectionInfo { return out; } - public enum State {CONNECTING, ACTIVE, DISCONNECTED} + public void updateReader() { + reader.update(in); + if (!reader.getMessages().isEmpty()) { + Iterator iterator = reader.getMessages().iterator(); + while (iterator.hasNext()) { + NetworkMessage msg = iterator.next(); + handleMessage(msg.getPayload()); + iterator.remove(); + } + } + } + + public List getMessages() { + return reader.getMessages(); + } + + @Override + protected void send(MessagePayload payload) { + sendingQueue.addFirst(payload); + } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index 4f56d42..c3689af 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -22,7 +22,10 @@ import ch.dissem.bitmessage.entity.GetData; import ch.dissem.bitmessage.entity.MessagePayload; import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.exception.ApplicationException; +import ch.dissem.bitmessage.exception.NodeException; +import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.utils.Property; import org.slf4j.Logger; @@ -31,16 +34,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.*; import java.util.concurrent.Future; -import static java.nio.channels.SelectionKey.*; +import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; +import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; +import static ch.dissem.bitmessage.utils.DebugUtils.inc; +import static java.nio.channels.SelectionKey.OP_READ; +import static java.nio.channels.SelectionKey.OP_WRITE; /** * Network handler using java.nio, resulting in less threads. @@ -50,6 +53,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private InternalContext ctx; private Selector selector; + private ServerSocketChannel serverChannel; @Override public Future synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds) { @@ -58,11 +62,38 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex @Override public CustomMessage send(InetAddress server, int port, CustomMessage request) { - return null; + try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { + channel.configureBlocking(true); + ByteBuffer buffer = ByteBuffer.allocate(MAX_MESSAGE_SIZE); + new NetworkMessage(request).write(buffer); + channel.write(buffer); + buffer.clear(); + + V3MessageReader reader = new V3MessageReader(); + while (reader.getMessages().isEmpty()) { + channel.read(buffer); + buffer.flip(); + reader.update(buffer); + } + NetworkMessage networkMessage = reader.getMessages().get(0); + + if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) { + return (CustomMessage) networkMessage.getPayload(); + } else { + if (networkMessage == null) { + throw new NodeException("No response from node " + server); + } else { + throw new NodeException("Unexpected response from node " + + server + ": " + networkMessage.getPayload().getCommand()); + } + } + } catch (IOException e) { + throw new ApplicationException(e); + } } @Override - public void start(MessageListener listener) { + public void start(final MessageListener listener) { if (listener == null) { throw new IllegalStateException("Listener must be set at start"); } @@ -70,50 +101,83 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex throw new IllegalStateException("Network already running - you need to stop first."); } try { - final Set requestedObjects = new HashSet<>(); selector = Selector.open(); - { - ServerSocketChannel server = ServerSocketChannel.open(); - server.configureBlocking(false); - server.bind(new InetSocketAddress(ctx.getPort())); - server.register(selector, OP_ACCEPT); - } - while (selector.isOpen()) { - // TODO: establish outgoing connections - selector.select(); - Iterator keyIterator = selector.selectedKeys().iterator(); + } catch (IOException e) { + throw new ApplicationException(e); + } + final Set requestedObjects = new HashSet<>(); + new Thread(new Runnable() { + @Override + public void run() { + try { + serverChannel = ServerSocketChannel.open(); + serverChannel.bind(new InetSocketAddress(ctx.getPort())); - while (keyIterator.hasNext()) { - SelectionKey key = keyIterator.next(); - if (key.isAcceptable()) { - SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept(); - accepted.configureBlocking(false); - accepted.register(selector, OP_READ | OP_WRITE).attach(new ConnectionInfo()); - } - if (key.attachment() instanceof ConnectionInfo) { - SocketChannel channel = (SocketChannel) key.channel(); - ConnectionInfo connection = (ConnectionInfo) key.attachment(); - - if (key.isWritable()) { - if (connection.getOutBuffer().hasRemaining()) { - channel.write(connection.getOutBuffer()); - } - while (!connection.getOutBuffer().hasRemaining() && !connection.getSendingQueue().isEmpty()) { - MessagePayload payload = connection.getSendingQueue().poll(); - if (payload instanceof GetData) { - requestedObjects.addAll(((GetData) payload).getInventory()); - } - new NetworkMessage(payload).write(connection.getOutBuffer()); - } - } - if (key.isReadable()) { - // TODO - channel.read(connection.getInBuffer()); - } - } - keyIterator.remove(); + SocketChannel accepted = serverChannel.accept(); + accepted.configureBlocking(false); + // FIXME: apparently it isn't good practice to generally listen for OP_WRITE + accepted.register(selector, OP_READ | OP_WRITE).attach( + new ConnectionInfo(ctx, SERVER, + new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), + listener, + requestedObjects + )); + } catch (ClosedSelectorException | AsynchronousCloseException ignore) { + } catch (IOException e) { + throw new ApplicationException(e); } } + }, "Server").start(); + new Thread(new Runnable() { + @Override + public void run() { + try { + while (selector.isOpen()) { + // TODO: establish outgoing connections + Iterator keyIterator = selector.selectedKeys().iterator(); + + while (keyIterator.hasNext()) { + SelectionKey key = keyIterator.next(); + if (key.attachment() instanceof ConnectionInfo) { + SocketChannel channel = (SocketChannel) key.channel(); + ConnectionInfo connection = (ConnectionInfo) key.attachment(); + + if (key.isWritable()) { + if (connection.getOutBuffer().hasRemaining()) { + channel.write(connection.getOutBuffer()); + } + while (!connection.getOutBuffer().hasRemaining() && !connection.getSendingQueue().isEmpty()) { + MessagePayload payload = connection.getSendingQueue().poll(); + if (payload instanceof GetData) { + requestedObjects.addAll(((GetData) payload).getInventory()); + } + new NetworkMessage(payload).write(connection.getOutBuffer()); + } + } + if (key.isReadable()) { + channel.read(connection.getInBuffer()); + connection.updateReader(); + } + } + keyIterator.remove(); + } + } + selector.close(); + } catch (ClosedSelectorException ignore) { + } catch (IOException e) { + throw new ApplicationException(e); + } + } + }, "Connections").start(); + } + + @Override + public void stop() { + try { + serverChannel.close(); + for (SelectionKey key : selector.keys()) { + key.channel().close(); + } selector.close(); } catch (IOException e) { throw new ApplicationException(e); @@ -121,23 +185,57 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } @Override - public void stop() { - + public void offer(InventoryVector iv) { + // TODO } @Override - public void offer(InventoryVector iv) { - + public void request(Collection inventoryVectors) { + // TODO } @Override public Property getNetworkStatus() { - return null; + TreeSet streams = new TreeSet<>(); + TreeMap incomingConnections = new TreeMap<>(); + TreeMap outgoingConnections = new TreeMap<>(); + + for (SelectionKey key : selector.keys()) { + if (key.attachment() instanceof ConnectionInfo) { + ConnectionInfo connection = (ConnectionInfo) key.attachment(); + if (connection.getState() == ACTIVE) { + long stream = connection.getNode().getStream(); + streams.add(stream); + if (connection.getMode() == SERVER) { + inc(incomingConnections, stream); + } else { + inc(outgoingConnections, stream); + } + } + } + } + Property[] streamProperties = new Property[streams.size()]; + int i = 0; + for (Long stream : streams) { + int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0; + int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0; + streamProperties[i] = new Property("stream " + stream, + null, new Property("nodes", incoming + outgoing), + new Property("incoming", incoming), + new Property("outgoing", outgoing) + ); + i++; + } + return new Property("network", null, + new Property("connectionManager", isRunning() ? "running" : "stopped"), + new Property("connections", null, streamProperties), + new Property("requestedObjects", "requestedObjects.size()") // TODO + ); } @Override public boolean isRunning() { - return false; + return selector != null && selector.isOpen(); } @Override diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index 3841be3..5dbf077 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -24,7 +24,9 @@ import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.ports.*; import ch.dissem.bitmessage.utils.Property; -import org.junit.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.net.InetAddress; import java.util.concurrent.Future; @@ -89,6 +91,9 @@ public class NetworkHandlerTest { break; case 3: data[0] = 0; + break; + default: + break; } } return new CustomMessage("test response", request.getData()); @@ -115,7 +120,7 @@ public class NetworkHandlerTest { } while (ctx.isRunning()); } - @Test(timeout = 5_000) + @Test//(timeout = 5_000) public void ensureNodesAreConnecting() { node.startup(); Property status;