From 334a510743ceba8fd52e84c304a9810534b2dbaa Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Fri, 29 Jul 2016 07:49:53 +0200 Subject: [PATCH] Fixes, improved tests and other improvements --- .../entity/valueobject/NetworkAddress.java | 8 +- .../dissem/bitmessage/factory/BufferPool.java | 88 ++++++++----------- .../bitmessage/factory/V3MessageFactory.java | 34 ++++--- .../bitmessage/factory/V3MessageReader.java | 2 +- .../bitmessage/ports/NetworkHandler.java | 3 +- .../networking/AbstractConnection.java | 4 + .../networking/DefaultNetworkHandler.java | 13 +-- .../networking/nio/ConnectionInfo.java | 2 +- .../networking/nio/NioNetworkHandler.java | 19 ++-- .../networking/NetworkHandlerTest.java | 19 ++-- 10 files changed, 99 insertions(+), 93 deletions(-) diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java index a496d19..94bc7c0 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java @@ -41,19 +41,19 @@ public class NetworkAddress implements Streamable { /** * Stream number for this node */ - private long stream; + private final long stream; /** * same service(s) listed in version */ - private long services; + private final long services; /** * IPv6 address. IPv4 addresses are written into the message as a 16 byte IPv4-mapped IPv6 address * (12 bytes 00 00 00 00 00 00 00 00 00 00 FF FF, followed by the 4 bytes of the IPv4 address). */ - private byte[] ipv6; - private int port; + private final byte[] ipv6; + private final int port; private NetworkAddress(Builder builder) { time = builder.time; diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.java b/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.java index 15e5856..1b977d7 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.java @@ -16,12 +16,16 @@ package ch.dissem.bitmessage.factory; -import ch.dissem.bitmessage.ports.NetworkHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Map; +import java.util.Stack; +import java.util.TreeMap; + +import static ch.dissem.bitmessage.ports.NetworkHandler.HEADER_SIZE; +import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE; /** * A pool for {@link ByteBuffer}s. As they may use up a lot of memory, @@ -30,78 +34,58 @@ import java.util.*; class BufferPool { private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class); - public static final BufferPool bufferPool = new BufferPool(256, 2048); + public static final BufferPool bufferPool = new BufferPool(); - private final Map capacities = new EnumMap<>(Size.class); - private final Map> pools = new EnumMap<>(Size.class); + private final Map> pools = new TreeMap<>(); - private BufferPool(int small, int medium) { - capacities.put(Size.HEADER, 24); - capacities.put(Size.SMALL, small); - capacities.put(Size.MEDIUM, medium); - capacities.put(Size.LARGE, NetworkHandler.MAX_PAYLOAD_SIZE); - pools.put(Size.HEADER, new Stack()); - pools.put(Size.SMALL, new Stack()); - pools.put(Size.MEDIUM, new Stack()); - pools.put(Size.LARGE, new Stack()); + private BufferPool() { + pools.put(HEADER_SIZE, new Stack()); + pools.put(54, new Stack()); + pools.put(1000, new Stack()); + pools.put(60000, new Stack()); + pools.put(MAX_PAYLOAD_SIZE, new Stack()); } public synchronized ByteBuffer allocate(int capacity) { - Size targetSize = getTargetSize(capacity); - Size s = targetSize; - do { - Stack pool = pools.get(s); - if (!pool.isEmpty()) { - return pool.pop(); + for (Map.Entry> e : pools.entrySet()) { + if (e.getKey() >= capacity && !e.getValue().isEmpty()) { + return e.getValue().pop(); } - s = s.next(); - } while (s != null); + } + Integer targetSize = getTargetSize(capacity); LOG.debug("Creating new buffer of size " + targetSize); - return ByteBuffer.allocate(capacities.get(targetSize)); + return ByteBuffer.allocate(targetSize); } - public synchronized ByteBuffer allocate() { - Stack pool = pools.get(Size.HEADER); + /** + * Returns a buffer that has the size of the Bitmessage network message header, 24 bytes. + * + * @return a buffer of size 24 + */ + public synchronized ByteBuffer allocateHeaderBuffer() { + Stack pool = pools.get(HEADER_SIZE); if (!pool.isEmpty()) { return pool.pop(); } else { - return ByteBuffer.allocate(capacities.get(Size.HEADER)); + return ByteBuffer.allocate(HEADER_SIZE); } } public synchronized void deallocate(ByteBuffer buffer) { buffer.clear(); - Size size = getTargetSize(buffer.capacity()); - if (buffer.capacity() != capacities.get(size)) { + + if (!pools.keySet().contains(buffer.capacity())) { throw new IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() + - " one of " + capacities.values() + " expected."); + " one of " + pools.keySet() + " expected."); } - pools.get(size).push(buffer); + pools.get(buffer.capacity()).push(buffer); } - private Size getTargetSize(int capacity) { - for (Size s : Size.values()) { - if (capacity <= capacities.get(s)) { - return s; - } + private Integer getTargetSize(int capacity) { + for (Integer size : pools.keySet()) { + if (size >= capacity) return size; } throw new IllegalArgumentException("Requested capacity too large: " + - "requested=" + capacity + "; max=" + capacities.get(Size.LARGE)); - } - - - private enum Size { - HEADER, SMALL, MEDIUM, LARGE; - - public Size next() { - switch (this) { - case SMALL: - return MEDIUM; - case MEDIUM: - return LARGE; - default: - return null; - } - } + "requested=" + capacity + "; max=" + MAX_PAYLOAD_SIZE); } } diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java index 478d77c..7b27d13 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageFactory.java @@ -107,12 +107,12 @@ class V3MessageFactory { } return new ObjectMessage.Builder() - .nonce(nonce) - .expiresTime(expiresTime) - .objectType(objectType) - .stream(stream) - .payload(payload) - .build(); + .nonce(nonce) + .expiresTime(expiresTime) + .objectType(objectType) + .stream(stream) + .payload(payload) + .build(); } private static GetData parseGetData(InputStream stream) throws IOException { @@ -153,13 +153,13 @@ class V3MessageFactory { long[] streamNumbers = Decode.varIntList(stream); return new Version.Builder() - .version(version) - .services(services) - .timestamp(timestamp) - .addrRecv(addrRecv).addrFrom(addrFrom) - .nonce(nonce) - .userAgent(userAgent) - .streams(streamNumbers).build(); + .version(version) + .services(services) + .timestamp(timestamp) + .addrRecv(addrRecv).addrFrom(addrFrom) + .nonce(nonce) + .userAgent(userAgent) + .streams(streamNumbers).build(); } private static InventoryVector parseInventoryVector(InputStream stream) throws IOException { @@ -179,7 +179,13 @@ class V3MessageFactory { long services = Decode.int64(stream); byte[] ipv6 = Decode.bytes(stream, 16); int port = Decode.uint16(stream); - return new NetworkAddress.Builder().time(time).stream(streamNumber).services(services).ipv6(ipv6).port(port).build(); + return new NetworkAddress.Builder() + .time(time) + .stream(streamNumber) + .services(services) + .ipv6(ipv6) + .port(port) + .build(); } private static boolean testChecksum(byte[] checksum, byte[] payload) { diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java index bb4460f..d26cabe 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java @@ -58,7 +58,7 @@ public class V3MessageReader { public ByteBuffer getActiveBuffer() { if (state != null && state != ReaderState.DATA) { if (headerBuffer == null) { - headerBuffer = bufferPool.allocate(); + headerBuffer = bufferPool.allocateHeaderBuffer(); } } return state == ReaderState.DATA ? dataBuffer : headerBuffer; diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index 96dec8e..9597625 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -31,8 +31,9 @@ import java.util.concurrent.Future; */ public interface NetworkHandler { int NETWORK_MAGIC_NUMBER = 8; + int HEADER_SIZE = 24; int MAX_PAYLOAD_SIZE = 1600003; - int MAX_MESSAGE_SIZE = 24 + MAX_PAYLOAD_SIZE; + int MAX_MESSAGE_SIZE = HEADER_SIZE + MAX_PAYLOAD_SIZE; /** * Connects to the trusted host, fetches and offers new messages and disconnects afterwards. diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index 73f26f5..411a57a 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -99,6 +99,10 @@ public abstract class AbstractConnection { return state; } + public long[] getStreams() { + return streams; + } + protected void handleMessage(MessagePayload payload) { switch (state) { case ACTIVE: diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index b5f93b1..f5bfd47 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -170,12 +170,13 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { for (Connection connection : connections) { if (connection.getState() == ACTIVE) { - long stream = connection.getNode().getStream(); - streams.add(stream); - if (connection.getMode() == SERVER) { - inc(incomingConnections, stream); - } else { - inc(outgoingConnections, stream); + for (long stream : connection.getStreams()) { + streams.add(stream); + if (connection.getMode() == SERVER) { + inc(incomingConnections, stream); + } else { + inc(outgoingConnections, stream); + } } } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index 647bc2c..217743e 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -44,7 +44,7 @@ public class ConnectionInfo extends AbstractConnection { private ByteBuffer payloadOut; private V3MessageReader reader = new V3MessageReader(); private boolean syncFinished; - private long lastUpdate = Long.MAX_VALUE; + private long lastUpdate = System.currentTimeMillis(); public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, NetworkHandler.MessageListener listener, diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index 069662b..6c4ae10 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; @@ -46,7 +47,6 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; -import static java.util.Collections.synchronizedMap; /** * Network handler using java.nio, resulting in less threads. @@ -209,7 +209,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex connection, channel.register(selector, OP_READ | OP_WRITE, connection) ); - } catch (AsynchronousCloseException ignore) { + } catch (NoRouteToHostException | AsynchronousCloseException ignore) { } catch (IOException e) { LOG.error(e.getMessage(), e); } @@ -268,7 +268,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } else { key.interestOps(OP_READ | OP_WRITE); } - } catch (NodeException | IOException e) { + } catch (CancelledKeyException | NodeException | IOException e) { connection.disconnect(); } if (connection.getState() == DISCONNECTED) { @@ -413,12 +413,13 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex for (ConnectionInfo connection : connections.keySet()) { if (connection.getState() == ACTIVE) { - long stream = connection.getNode().getStream(); - streams.add(stream); - if (connection.getMode() == SERVER) { - inc(incomingConnections, stream); - } else { - inc(outgoingConnections, stream); + for (long stream : connection.getStreams()) { + streams.add(stream); + if (connection.getMode() == SERVER) { + inc(incomingConnections, stream); + } else { + inc(outgoingConnections, stream); + } } } } diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index 475cbb1..e6aeb51 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -169,15 +169,24 @@ public class NetworkHandlerTest { } while (networkHandler.isRunning()); } - @Test - public void ensureNodesAreConnecting() throws Exception { - node.startup(); + private Property waitForNetworkStatus(BitmessageContext ctx) throws InterruptedException { Property status; do { Thread.sleep(100); - status = node.status().getProperty("network", "connections", "stream 0"); + status = ctx.status().getProperty("network", "connections", "stream 1"); } while (status == null); - assertEquals(1, status.getProperty("outgoing").getValue()); + return status; + } + + @Test + public void ensureNodesAreConnecting() throws Exception { + node.startup(); + + Property nodeStatus = waitForNetworkStatus(node); + Property peerStatus = waitForNetworkStatus(peer); + + assertEquals(1, nodeStatus.getProperty("outgoing").getValue()); + assertEquals(1, peerStatus.getProperty("incoming").getValue()); } @Test