diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..656384c --- /dev/null +++ b/.editorconfig @@ -0,0 +1,7 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +indent_size = 4 diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.java b/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.java new file mode 100644 index 0000000..15e5856 --- /dev/null +++ b/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.java @@ -0,0 +1,107 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.factory; + +import ch.dissem.bitmessage.ports.NetworkHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.*; + +/** + * A pool for {@link ByteBuffer}s. As they may use up a lot of memory, + * they should be reused as efficiently as possible. + */ +class BufferPool { + private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class); + + public static final BufferPool bufferPool = new BufferPool(256, 2048); + + private final Map capacities = new EnumMap<>(Size.class); + private final Map> pools = new EnumMap<>(Size.class); + + private BufferPool(int small, int medium) { + capacities.put(Size.HEADER, 24); + capacities.put(Size.SMALL, small); + capacities.put(Size.MEDIUM, medium); + capacities.put(Size.LARGE, NetworkHandler.MAX_PAYLOAD_SIZE); + pools.put(Size.HEADER, new Stack()); + pools.put(Size.SMALL, new Stack()); + pools.put(Size.MEDIUM, new Stack()); + pools.put(Size.LARGE, new Stack()); + } + + public synchronized ByteBuffer allocate(int capacity) { + Size targetSize = getTargetSize(capacity); + Size s = targetSize; + do { + Stack pool = pools.get(s); + if (!pool.isEmpty()) { + return pool.pop(); + } + s = s.next(); + } while (s != null); + LOG.debug("Creating new buffer of size " + targetSize); + return ByteBuffer.allocate(capacities.get(targetSize)); + } + + public synchronized ByteBuffer allocate() { + Stack pool = pools.get(Size.HEADER); + if (!pool.isEmpty()) { + return pool.pop(); + } else { + return ByteBuffer.allocate(capacities.get(Size.HEADER)); + } + } + + public synchronized void deallocate(ByteBuffer buffer) { + buffer.clear(); + Size size = getTargetSize(buffer.capacity()); + if (buffer.capacity() != capacities.get(size)) { + throw new IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() + + " one of " + capacities.values() + " expected."); + } + pools.get(size).push(buffer); + } + + private Size getTargetSize(int capacity) { + for (Size s : Size.values()) { + if (capacity <= capacities.get(s)) { + return s; + } + } + throw new IllegalArgumentException("Requested capacity too large: " + + "requested=" + capacity + "; max=" + capacities.get(Size.LARGE)); + } + + + private enum Size { + HEADER, SMALL, MEDIUM, LARGE; + + public Size next() { + switch (this) { + case SMALL: + return MEDIUM; + case MEDIUM: + return LARGE; + default: + return null; + } + } + } +} diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java index 80220f1..bb4460f 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java @@ -21,15 +21,20 @@ import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.utils.Decode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; +import java.util.UUID; import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES; +import static ch.dissem.bitmessage.factory.BufferPool.bufferPool; import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE; import static ch.dissem.bitmessage.utils.Singleton.cryptography; @@ -37,53 +42,89 @@ import static ch.dissem.bitmessage.utils.Singleton.cryptography; * Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message. */ public class V3MessageReader { + private static final Logger LOG = LoggerFactory.getLogger(V3MessageReader.class); + + private ByteBuffer headerBuffer; + private ByteBuffer dataBuffer; + private ReaderState state = ReaderState.MAGIC; private String command; private int length; private byte[] checksum; private List messages = new LinkedList<>(); + private SizeInfo sizeInfo = new SizeInfo(); - public void update(ByteBuffer buffer) { - while (buffer.hasRemaining()) { - switch (state) { - case MAGIC: - if (!findMagicBytes(buffer)) return; - state = ReaderState.HEADER; - case HEADER: - if (buffer.remaining() < 20) { - return; - } - command = getCommand(buffer); - length = (int) Decode.uint32(buffer); - if (length > MAX_PAYLOAD_SIZE) { - throw new NodeException("Payload of " + length + " bytes received, no more than " + - MAX_PAYLOAD_SIZE + " was expected."); - } - checksum = new byte[4]; - buffer.get(checksum); - state = ReaderState.DATA; - case DATA: - if (buffer.remaining() < length) { - return; - } - if (!testChecksum(buffer)) { - throw new NodeException("Checksum failed for message '" + command + "'"); - } - try { - MessagePayload payload = V3MessageFactory.getPayload( - command, - new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + buffer.position(), length), - length); - if (payload != null) { - messages.add(new NetworkMessage(payload)); - } - } catch (IOException e) { - throw new NodeException(e.getMessage()); - } - state = ReaderState.MAGIC; + public ByteBuffer getActiveBuffer() { + if (state != null && state != ReaderState.DATA) { + if (headerBuffer == null) { + headerBuffer = bufferPool.allocate(); } } + return state == ReaderState.DATA ? dataBuffer : headerBuffer; + } + + public void update() { + if (state != ReaderState.DATA) { + getActiveBuffer(); + headerBuffer.flip(); + } + switch (state) { + case MAGIC: + if (!findMagicBytes(headerBuffer)) { + headerBuffer.compact(); + return; + } + state = ReaderState.HEADER; + case HEADER: + if (headerBuffer.remaining() < 20) { + headerBuffer.compact(); + headerBuffer.limit(20); + return; + } + command = getCommand(headerBuffer); + length = (int) Decode.uint32(headerBuffer); + if (length > MAX_PAYLOAD_SIZE) { + throw new NodeException("Payload of " + length + " bytes received, no more than " + + MAX_PAYLOAD_SIZE + " was expected."); + } + sizeInfo.add(length); // FIXME: remove this once we have some values to work with + checksum = new byte[4]; + headerBuffer.get(checksum); + state = ReaderState.DATA; + bufferPool.deallocate(headerBuffer); + headerBuffer = null; + dataBuffer = bufferPool.allocate(length); + dataBuffer.clear(); + dataBuffer.limit(length); + case DATA: + if (dataBuffer.position() < length) { + return; + } else { + dataBuffer.flip(); + } + if (!testChecksum(dataBuffer)) { + state = ReaderState.MAGIC; + throw new NodeException("Checksum failed for message '" + command + "'"); + } + try { + MessagePayload payload = V3MessageFactory.getPayload( + command, + new ByteArrayInputStream(dataBuffer.array(), + dataBuffer.arrayOffset() + dataBuffer.position(), length), + length); + if (payload != null) { + messages.add(new NetworkMessage(payload)); + } + } catch (IOException e) { + throw new NodeException(e.getMessage()); + } finally { + state = ReaderState.MAGIC; + bufferPool.deallocate(dataBuffer); + dataBuffer = null; + dataBuffer = null; + } + } } public List getMessages() { @@ -129,7 +170,7 @@ public class V3MessageReader { private boolean testChecksum(ByteBuffer buffer) { byte[] payloadChecksum = cryptography().sha512(buffer.array(), - buffer.arrayOffset() + buffer.position(), length); + buffer.arrayOffset() + buffer.position(), length); for (int i = 0; i < checksum.length; i++) { if (checksum[i] != payloadChecksum[i]) { return false; @@ -138,5 +179,52 @@ public class V3MessageReader { return true; } + /** + * De-allocates all buffers. This method should be called iff the reader isn't used anymore, i.e. when its + * connection is severed. + */ + public void cleanup() { + state = null; + if (headerBuffer != null) { + bufferPool.deallocate(headerBuffer); + } + if (dataBuffer != null) { + bufferPool.deallocate(dataBuffer); + } + } + private enum ReaderState {MAGIC, HEADER, DATA} + + private class SizeInfo { + private FileWriter file; + private long min = Long.MAX_VALUE; + private long avg = 0; + private long max = Long.MIN_VALUE; + private long count = 0; + + private SizeInfo() { + try { + file = new FileWriter("D:/message_size_info-" + UUID.randomUUID() + ".csv"); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + + private void add(long length) { + avg = (count * avg + length) / (count + 1); + if (length < min) { + min = length; + } + if (length > max) { + max = length; + } + count++; + LOG.info("Received message with data size " + length + "; Min: " + min + "; Max: " + max + "; Avg: " + avg); + try { + file.write(length + "\n"); + } catch (IOException e) { + e.printStackTrace(); + } + } + } } diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java b/core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java index ff2ad95..a56b4ff 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java @@ -87,7 +87,7 @@ public class MemoryNodeRegistry implements NodeRegistry { } } if (result.isEmpty()) { - if (stableNodes.isEmpty()) { + if (stableNodes.isEmpty() || stableNodes.get(stream).isEmpty()) { loadStableNodes(); } Set nodes = stableNodes.get(stream); @@ -108,8 +108,8 @@ public class MemoryNodeRegistry implements NodeRegistry { synchronized (knownNodes) { if (!knownNodes.containsKey(node.getStream())) { knownNodes.put( - node.getStream(), - newSetFromMap(new ConcurrentHashMap()) + node.getStream(), + newSetFromMap(new ConcurrentHashMap()) ); } } diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index 583669b..4f155fa 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -22,7 +22,7 @@ import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.payload.Pubkey; import ch.dissem.bitmessage.entity.valueobject.Label; -import ch.dissem.bitmessage.networking.DefaultNetworkHandler; +import ch.dissem.bitmessage.networking.nio.NioNetworkHandler; import ch.dissem.bitmessage.ports.MemoryNodeRegistry; import ch.dissem.bitmessage.repository.*; import org.apache.commons.lang3.text.WordUtils; @@ -53,7 +53,7 @@ public class Application { .nodeRegistry(new MemoryNodeRegistry()) .messageRepo(new JdbcMessageRepository(jdbcConfig)) .powRepo(new JdbcProofOfWorkRepository(jdbcConfig)) - .networkHandler(new DefaultNetworkHandler()) + .networkHandler(new NioNetworkHandler()) .cryptography(new BouncyCryptography()) .port(48444) .listener(plaintext -> System.out.println("New Message from " + plaintext.getFrom() + ": " + plaintext.getSubject())) diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java index 2532796..402fa91 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java @@ -18,7 +18,7 @@ package ch.dissem.bitmessage.demo; import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography; -import ch.dissem.bitmessage.networking.DefaultNetworkHandler; +import ch.dissem.bitmessage.networking.nio.NioNetworkHandler; import ch.dissem.bitmessage.ports.MemoryNodeRegistry; import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.wif.WifExporter; @@ -53,7 +53,7 @@ public class Main { .nodeRegistry(new MemoryNodeRegistry()) .messageRepo(new JdbcMessageRepository(jdbcConfig)) .powRepo(new JdbcProofOfWorkRepository(jdbcConfig)) - .networkHandler(new DefaultNetworkHandler()) + .networkHandler(new NioNetworkHandler()) .cryptography(new BouncyCryptography()) .port(48444) .build(); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index edb26a2..73f26f5 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -73,20 +73,15 @@ public abstract class AbstractConnection { NetworkAddress node, NetworkHandler.MessageListener listener, Set commonRequestedObjects, - long syncTimeout, boolean threadsafe) { + long syncTimeout) { this.ctx = context; this.mode = mode; this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.node = node; this.listener = listener; this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); - if (threadsafe) { - this.ivCache = new ConcurrentHashMap<>(); - this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap(10_000)); - } else { - this.ivCache = new HashMap<>(); - this.requestedObjects = new HashSet<>(); - } + this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap(10_000)); + this.ivCache = new ConcurrentHashMap<>(); this.sendingQueue = new ConcurrentLinkedDeque<>(); this.state = CONNECTING; this.commonRequestedObjects = commonRequestedObjects; @@ -177,7 +172,7 @@ public abstract class AbstractConnection { } catch (IOException e) { LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); } finally { - if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) { + if (!commonRequestedObjects.remove(objectMessage.getInventoryVector())) { LOG.debug("Received object that wasn't requested."); } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index d9cb2cc..bb5f370 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -78,7 +78,7 @@ class Connection extends AbstractConnection { private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, Set commonRequestedObjects, NetworkAddress node, long syncTimeout) { - super(context, mode, node, listener, commonRequestedObjects, syncTimeout, true); + super(context, mode, node, listener, commonRequestedObjects, syncTimeout); this.startTime = UnixTime.now(); this.socket = socket; } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index c4a4554..b5f93b1 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -48,10 +48,10 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { final Collection connections = new ConcurrentLinkedQueue<>(); private final ExecutorService pool = Executors.newCachedThreadPool( - pool("network") - .lowPrio() - .daemon() - .build()); + pool("network") + .lowPrio() + .daemon() + .build()); private InternalContext ctx; private ServerRunnable server; private volatile boolean running; @@ -88,11 +88,11 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { throw new NodeException("No response from node " + server); } else { throw new NodeException("Unexpected response from node " + - server + ": " + networkMessage.getPayload().getCommand()); + server + ": " + networkMessage.getPayload().getCommand()); } } } catch (IOException e) { - throw new ApplicationException(e); + throw new NodeException(e.getMessage(), e); } } @@ -185,16 +185,16 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0; int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0; streamProperties[i] = new Property("stream " + stream, - null, new Property("nodes", incoming + outgoing), - new Property("incoming", incoming), - new Property("outgoing", outgoing) + null, new Property("nodes", incoming + outgoing), + new Property("incoming", incoming), + new Property("outgoing", outgoing) ); i++; } return new Property("network", null, - new Property("connectionManager", running ? "running" : "stopped"), - new Property("connections", null, streamProperties), - new Property("requestedObjects", requestedObjects.size()) + new Property("connectionManager", running ? "running" : "stopped"), + new Property("connections", null, streamProperties), + new Property("requestedObjects", requestedObjects.size()) ); } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index afe3d14..ed0462d 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -17,11 +17,13 @@ package ch.dissem.bitmessage.networking.nio; import ch.dissem.bitmessage.InternalContext; +import ch.dissem.bitmessage.entity.GetData; import ch.dissem.bitmessage.entity.MessagePayload; import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.Version; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.networking.AbstractConnection; import ch.dissem.bitmessage.ports.NetworkHandler; @@ -39,15 +41,15 @@ import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_MESSAGE_SIZE; * Represents the current state of a connection. */ public class ConnectionInfo extends AbstractConnection { - private ByteBuffer in = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private V3MessageReader reader = new V3MessageReader(); private boolean syncFinished; + private long lastUpdate = Long.MAX_VALUE; public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, NetworkHandler.MessageListener listener, Set commonRequestedObjects, long syncTimeout) { - super(context, mode, node, listener, commonRequestedObjects, syncTimeout, false); + super(context, mode, node, listener, commonRequestedObjects, syncTimeout); out.flip(); if (mode == CLIENT || mode == SYNC) { send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build()); @@ -67,7 +69,20 @@ public class ConnectionInfo extends AbstractConnection { } public ByteBuffer getInBuffer() { - return in; + if (reader == null) { + throw new NodeException("Node is disconnected"); + } + return reader.getActiveBuffer(); + } + + public void updateWriter() { + if ((out == null || !out.hasRemaining()) && !sendingQueue.isEmpty()) { + out.clear(); + MessagePayload payload = sendingQueue.poll(); + new NetworkMessage(payload).write(out); + out.flip(); + lastUpdate = System.currentTimeMillis(); + } } public ByteBuffer getOutBuffer() { @@ -75,7 +90,7 @@ public class ConnectionInfo extends AbstractConnection { } public void updateReader() { - reader.update(in); + reader.update(); if (!reader.getMessages().isEmpty()) { Iterator iterator = reader.getMessages().iterator(); NetworkMessage msg = null; @@ -86,6 +101,7 @@ public class ConnectionInfo extends AbstractConnection { } syncFinished = syncFinished(msg); } + lastUpdate = System.currentTimeMillis(); } public void updateSyncStatus() { @@ -94,6 +110,28 @@ public class ConnectionInfo extends AbstractConnection { } } + public boolean isExpired() { + switch (state) { + case CONNECTING: + return lastUpdate < System.currentTimeMillis() - 30000; + case ACTIVE: + return lastUpdate < System.currentTimeMillis() - 30000; + case DISCONNECTED: + return true; + default: + throw new IllegalStateException("Unknown state: " + state); + } + } + + @Override + public synchronized void disconnect() { + super.disconnect(); + if (reader != null) { + reader.cleanup(); + reader = null; + } + } + public boolean isSyncFinished() { return syncFinished; } @@ -101,5 +139,9 @@ public class ConnectionInfo extends AbstractConnection { @Override protected void send(MessagePayload payload) { sendingQueue.add(payload); + if (payload instanceof GetData) { + requestedObjects.addAll(((GetData) payload).getInventory()); + commonRequestedObjects.addAll(((GetData) payload).getInventory()); + } } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index c49d324..7ccea71 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -19,7 +19,6 @@ package ch.dissem.bitmessage.networking.nio; import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.entity.CustomMessage; import ch.dissem.bitmessage.entity.GetData; -import ch.dissem.bitmessage.entity.MessagePayload; import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; @@ -50,8 +49,7 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; -import static java.util.Collections.newSetFromMap; -import static java.util.Collections.synchronizedSet; +import static java.util.Collections.synchronizedMap; /** * Network handler using java.nio, resulting in less threads. @@ -59,31 +57,33 @@ import static java.util.Collections.synchronizedSet; public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder { private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class); - private final ExecutorService pool = Executors.newCachedThreadPool( - pool("network") - .lowPrio() - .daemon() - .build()); + private final ExecutorService threadPool = Executors.newCachedThreadPool( + pool("network") + .lowPrio() + .daemon() + .build()); private InternalContext ctx; private Selector selector; private ServerSocketChannel serverChannel; - private Set connections = synchronizedSet(newSetFromMap(new WeakHashMap())); + private Map connections = synchronizedMap(new WeakHashMap()); + private int requestedObjectsCount; + + private Thread starter; @Override public Future synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) { - return pool.submit(new Callable() { + return threadPool.submit(new Callable() { @Override public Void call() throws Exception { - Set requestedObjects = new HashSet<>(); try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { channel.configureBlocking(false); ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, - new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), - listener, new HashSet(), timeoutInSeconds); - connections.add(connection); + new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), + listener, new HashSet(), timeoutInSeconds); + connections.put(connection, null); while (channel.isConnected() && !connection.isSyncFinished()) { - write(requestedObjects, channel, connection); + write(channel, connection); read(channel, connection); Thread.sleep(10); } @@ -109,10 +109,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex V3MessageReader reader = new V3MessageReader(); while (channel.isConnected() && reader.getMessages().isEmpty()) { - if (channel.read(buffer) > 0) { - buffer.flip(); - reader.update(buffer); - buffer.compact(); + if (channel.read(reader.getActiveBuffer()) > 0) { + reader.update(); } else { throw new NodeException("No response from node " + server); } @@ -131,7 +129,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex throw new NodeException("Empty response from node " + server); } else { throw new NodeException("Unexpected response from node " + server + ": " - + networkMessage.getPayload().getClass()); + + networkMessage.getPayload().getClass()); } } } catch (IOException e) { @@ -164,12 +162,14 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex SocketChannel accepted = serverChannel.accept(); accepted.configureBlocking(false); ConnectionInfo connection = new ConnectionInfo(ctx, SERVER, - new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), - listener, - requestedObjects, 0 + new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), + listener, + requestedObjects, 0 + ); + connections.put( + connection, + accepted.register(selector, OP_READ | OP_WRITE, connection) ); - accepted.register(selector, OP_READ | OP_WRITE, connection); - connections.add(connection); } catch (AsynchronousCloseException ignore) { LOG.trace(ignore.getMessage()); } catch (IOException e) { @@ -186,27 +186,53 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } }); - thread("connection starter", new Runnable() { + starter = thread("connection starter", new Runnable() { @Override public void run() { while (selector.isOpen()) { - List addresses = ctx.getNodeRegistry().getKnownAddresses( - 2, ctx.getStreams()); - for (NetworkAddress address : addresses) { - try { - SocketChannel channel = SocketChannel.open( - new InetSocketAddress(address.toInetAddress(), address.getPort())); - channel.configureBlocking(false); - ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, + int missing = NETWORK_MAGIC_NUMBER; + for (ConnectionInfo connectionInfo : connections.keySet()) { + if (connectionInfo.getState() == ACTIVE) { + missing--; + if (missing == 0) break; + } + } + if (missing > 0) { + List addresses = ctx.getNodeRegistry().getKnownAddresses(missing, ctx.getStreams()); + for (NetworkAddress address : addresses) { + try { + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort())); + channel.finishConnect(); + ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, address, listener, requestedObjects, 0 - ); - channel.register(selector, OP_READ | OP_WRITE, connection); - connections.add(connection); - } catch (AsynchronousCloseException ignore) { - } catch (IOException e) { - LOG.error(e.getMessage(), e); + ); + connections.put( + connection, + channel.register(selector, OP_READ | OP_WRITE, connection) + ); + } catch (AsynchronousCloseException ignore) { + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } + + Iterator> it = connections.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + if (!e.getValue().isValid() || e.getKey().isExpired()) { + try { + e.getValue().channel().close(); + } catch (Exception ignore) { + } + e.getValue().cancel(); + e.getValue().attach(null); + it.remove(); + e.getKey().disconnect(); } } try { @@ -230,33 +256,37 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex if (key.attachment() instanceof ConnectionInfo) { SocketChannel channel = (SocketChannel) key.channel(); ConnectionInfo connection = (ConnectionInfo) key.attachment(); - if (key.isWritable()) { - write(requestedObjects, channel, connection); - } - if (key.isReadable()) { - read(channel, connection); - } - if (connection.getSendingQueue().isEmpty()) { - if (connection.getState() == DISCONNECTED) { - key.interestOps(0); - key.channel().close(); - } else { - key.interestOps(OP_READ); + try { + if (key.isWritable()) { + write(channel, connection); } - } else { - key.interestOps(OP_READ | OP_WRITE); + if (key.isReadable()) { + read(channel, connection); + } + if (connection.getSendingQueue().isEmpty()) { + if (connection.getState() == DISCONNECTED) { + key.interestOps(0); + key.channel().close(); + } else { + key.interestOps(OP_READ); + } + } else { + key.interestOps(OP_READ | OP_WRITE); + } + } catch (NodeException | IOException e) { + connection.disconnect(); } if (connection.getState() == DISCONNECTED) { connections.remove(connection); } } keyIterator.remove(); + requestedObjectsCount = requestedObjects.size(); } - for (SelectionKey key : selector.keys()) { - if ((key.interestOps() & OP_WRITE) == 0) { - if (key.attachment() instanceof ConnectionInfo && - !((ConnectionInfo) key.attachment()).getSendingQueue().isEmpty()) { - key.interestOps(OP_READ | OP_WRITE); + for (Map.Entry e : connections.entrySet()) { + if (e.getValue().isValid() && (e.getValue().interestOps() & OP_WRITE) == 0) { + if (!e.getKey().getSendingQueue().isEmpty()) { + e.getValue().interestOps(OP_READ | OP_WRITE); } } } @@ -270,54 +300,44 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex }); } - private static void write(Set requestedObjects, SocketChannel channel, ConnectionInfo connection) - throws IOException { - if (!connection.getSendingQueue().isEmpty()) { - ByteBuffer buffer = connection.getOutBuffer(); - if (buffer.hasRemaining()) { - channel.write(buffer); - } - while (!buffer.hasRemaining() - && !connection.getSendingQueue().isEmpty()) { - buffer.clear(); - MessagePayload payload = connection.getSendingQueue().poll(); - if (payload instanceof GetData) { - requestedObjects.addAll(((GetData) payload).getInventory()); - } - new NetworkMessage(payload).write(buffer); - buffer.flip(); - if (buffer.hasRemaining()) { - channel.write(buffer); - } - } + private static void write(SocketChannel channel, ConnectionInfo connection) + throws IOException { + writeBuffer(connection.getOutBuffer(), channel); + + connection.updateWriter(); + + writeBuffer(connection.getOutBuffer(), channel); + } + + private static void writeBuffer(ByteBuffer buffer, SocketChannel channel) throws IOException { + if (buffer != null && buffer.hasRemaining()) { + channel.write(buffer); } } private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException { - ByteBuffer buffer = connection.getInBuffer(); - while (channel.read(buffer) > 0) { - buffer.flip(); + while (channel.read(connection.getInBuffer()) > 0) { connection.updateReader(); - buffer.compact(); } connection.updateSyncStatus(); } - private void thread(String threadName, Runnable runnable) { + private Thread thread(String threadName, Runnable runnable) { Thread thread = new Thread(runnable, threadName); thread.setDaemon(true); thread.setPriority(Thread.MIN_PRIORITY); thread.start(); + return thread; } @Override public void stop() { try { serverChannel.socket().close(); - for (SelectionKey selectionKey : selector.keys()) { + selector.close(); + for (SelectionKey selectionKey : connections.values()) { selectionKey.channel().close(); } - selector.close(); } catch (IOException e) { throw new ApplicationException(e); } @@ -326,7 +346,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex @Override public void offer(InventoryVector iv) { List target = new LinkedList<>(); - for (ConnectionInfo connection : connections) { + for (ConnectionInfo connection : connections.keySet()) { if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { target.add(connection); } @@ -346,7 +366,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } Map> distribution = new HashMap<>(); - for (ConnectionInfo connection : connections) { + for (ConnectionInfo connection : connections.keySet()) { if (connection.getState() == ACTIVE) { distribution.put(connection, new LinkedList()); } @@ -391,7 +411,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex TreeMap incomingConnections = new TreeMap<>(); TreeMap outgoingConnections = new TreeMap<>(); - for (ConnectionInfo connection : connections) { + for (ConnectionInfo connection : connections.keySet()) { if (connection.getState() == ACTIVE) { long stream = connection.getNode().getStream(); streams.add(stream); @@ -408,22 +428,22 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0; int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0; streamProperties[i] = new Property("stream " + stream, - null, new Property("nodes", incoming + outgoing), - new Property("incoming", incoming), - new Property("outgoing", outgoing) + null, new Property("nodes", incoming + outgoing), + new Property("incoming", incoming), + new Property("outgoing", outgoing) ); i++; } return new Property("network", null, - new Property("connectionManager", isRunning() ? "running" : "stopped"), - new Property("connections", null, streamProperties), - new Property("requestedObjects", "requestedObjects.size()") // TODO + new Property("connectionManager", isRunning() ? "running" : "stopped"), + new Property("connections", null, streamProperties), + new Property("requestedObjects", requestedObjectsCount) ); } @Override public boolean isRunning() { - return selector != null && selector.isOpen(); + return selector != null && selector.isOpen() && starter.isAlive(); } @Override