Fixes, improved tests and other improvements

This commit is contained in:
Christian Basler 2016-07-29 07:49:53 +02:00
parent 56ebb7b8fa
commit 334a510743
10 changed files with 99 additions and 93 deletions

View File

@ -41,19 +41,19 @@ public class NetworkAddress implements Streamable {
/**
* Stream number for this node
*/
private long stream;
private final long stream;
/**
* same service(s) listed in version
*/
private long services;
private final long services;
/**
* IPv6 address. IPv4 addresses are written into the message as a 16 byte IPv4-mapped IPv6 address
* (12 bytes 00 00 00 00 00 00 00 00 00 00 FF FF, followed by the 4 bytes of the IPv4 address).
*/
private byte[] ipv6;
private int port;
private final byte[] ipv6;
private final int port;
private NetworkAddress(Builder builder) {
time = builder.time;

View File

@ -16,12 +16,16 @@
package ch.dissem.bitmessage.factory;
import ch.dissem.bitmessage.ports.NetworkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map;
import java.util.Stack;
import java.util.TreeMap;
import static ch.dissem.bitmessage.ports.NetworkHandler.HEADER_SIZE;
import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE;
/**
* A pool for {@link ByteBuffer}s. As they may use up a lot of memory,
@ -30,78 +34,58 @@ import java.util.*;
class BufferPool {
private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
public static final BufferPool bufferPool = new BufferPool(256, 2048);
public static final BufferPool bufferPool = new BufferPool();
private final Map<Size, Integer> capacities = new EnumMap<>(Size.class);
private final Map<Size, Stack<ByteBuffer>> pools = new EnumMap<>(Size.class);
private final Map<Integer, Stack<ByteBuffer>> pools = new TreeMap<>();
private BufferPool(int small, int medium) {
capacities.put(Size.HEADER, 24);
capacities.put(Size.SMALL, small);
capacities.put(Size.MEDIUM, medium);
capacities.put(Size.LARGE, NetworkHandler.MAX_PAYLOAD_SIZE);
pools.put(Size.HEADER, new Stack<ByteBuffer>());
pools.put(Size.SMALL, new Stack<ByteBuffer>());
pools.put(Size.MEDIUM, new Stack<ByteBuffer>());
pools.put(Size.LARGE, new Stack<ByteBuffer>());
private BufferPool() {
pools.put(HEADER_SIZE, new Stack<ByteBuffer>());
pools.put(54, new Stack<ByteBuffer>());
pools.put(1000, new Stack<ByteBuffer>());
pools.put(60000, new Stack<ByteBuffer>());
pools.put(MAX_PAYLOAD_SIZE, new Stack<ByteBuffer>());
}
public synchronized ByteBuffer allocate(int capacity) {
Size targetSize = getTargetSize(capacity);
Size s = targetSize;
do {
Stack<ByteBuffer> pool = pools.get(s);
if (!pool.isEmpty()) {
return pool.pop();
for (Map.Entry<Integer, Stack<ByteBuffer>> e : pools.entrySet()) {
if (e.getKey() >= capacity && !e.getValue().isEmpty()) {
return e.getValue().pop();
}
s = s.next();
} while (s != null);
}
Integer targetSize = getTargetSize(capacity);
LOG.debug("Creating new buffer of size " + targetSize);
return ByteBuffer.allocate(capacities.get(targetSize));
return ByteBuffer.allocate(targetSize);
}
public synchronized ByteBuffer allocate() {
Stack<ByteBuffer> pool = pools.get(Size.HEADER);
/**
* Returns a buffer that has the size of the Bitmessage network message header, 24 bytes.
*
* @return a buffer of size 24
*/
public synchronized ByteBuffer allocateHeaderBuffer() {
Stack<ByteBuffer> pool = pools.get(HEADER_SIZE);
if (!pool.isEmpty()) {
return pool.pop();
} else {
return ByteBuffer.allocate(capacities.get(Size.HEADER));
return ByteBuffer.allocate(HEADER_SIZE);
}
}
public synchronized void deallocate(ByteBuffer buffer) {
buffer.clear();
Size size = getTargetSize(buffer.capacity());
if (buffer.capacity() != capacities.get(size)) {
if (!pools.keySet().contains(buffer.capacity())) {
throw new IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() +
" one of " + capacities.values() + " expected.");
" one of " + pools.keySet() + " expected.");
}
pools.get(size).push(buffer);
pools.get(buffer.capacity()).push(buffer);
}
private Size getTargetSize(int capacity) {
for (Size s : Size.values()) {
if (capacity <= capacities.get(s)) {
return s;
}
private Integer getTargetSize(int capacity) {
for (Integer size : pools.keySet()) {
if (size >= capacity) return size;
}
throw new IllegalArgumentException("Requested capacity too large: " +
"requested=" + capacity + "; max=" + capacities.get(Size.LARGE));
}
private enum Size {
HEADER, SMALL, MEDIUM, LARGE;
public Size next() {
switch (this) {
case SMALL:
return MEDIUM;
case MEDIUM:
return LARGE;
default:
return null;
}
}
"requested=" + capacity + "; max=" + MAX_PAYLOAD_SIZE);
}
}

View File

@ -107,12 +107,12 @@ class V3MessageFactory {
}
return new ObjectMessage.Builder()
.nonce(nonce)
.expiresTime(expiresTime)
.objectType(objectType)
.stream(stream)
.payload(payload)
.build();
.nonce(nonce)
.expiresTime(expiresTime)
.objectType(objectType)
.stream(stream)
.payload(payload)
.build();
}
private static GetData parseGetData(InputStream stream) throws IOException {
@ -153,13 +153,13 @@ class V3MessageFactory {
long[] streamNumbers = Decode.varIntList(stream);
return new Version.Builder()
.version(version)
.services(services)
.timestamp(timestamp)
.addrRecv(addrRecv).addrFrom(addrFrom)
.nonce(nonce)
.userAgent(userAgent)
.streams(streamNumbers).build();
.version(version)
.services(services)
.timestamp(timestamp)
.addrRecv(addrRecv).addrFrom(addrFrom)
.nonce(nonce)
.userAgent(userAgent)
.streams(streamNumbers).build();
}
private static InventoryVector parseInventoryVector(InputStream stream) throws IOException {
@ -179,7 +179,13 @@ class V3MessageFactory {
long services = Decode.int64(stream);
byte[] ipv6 = Decode.bytes(stream, 16);
int port = Decode.uint16(stream);
return new NetworkAddress.Builder().time(time).stream(streamNumber).services(services).ipv6(ipv6).port(port).build();
return new NetworkAddress.Builder()
.time(time)
.stream(streamNumber)
.services(services)
.ipv6(ipv6)
.port(port)
.build();
}
private static boolean testChecksum(byte[] checksum, byte[] payload) {

View File

@ -58,7 +58,7 @@ public class V3MessageReader {
public ByteBuffer getActiveBuffer() {
if (state != null && state != ReaderState.DATA) {
if (headerBuffer == null) {
headerBuffer = bufferPool.allocate();
headerBuffer = bufferPool.allocateHeaderBuffer();
}
}
return state == ReaderState.DATA ? dataBuffer : headerBuffer;

View File

@ -31,8 +31,9 @@ import java.util.concurrent.Future;
*/
public interface NetworkHandler {
int NETWORK_MAGIC_NUMBER = 8;
int HEADER_SIZE = 24;
int MAX_PAYLOAD_SIZE = 1600003;
int MAX_MESSAGE_SIZE = 24 + MAX_PAYLOAD_SIZE;
int MAX_MESSAGE_SIZE = HEADER_SIZE + MAX_PAYLOAD_SIZE;
/**
* Connects to the trusted host, fetches and offers new messages and disconnects afterwards.

View File

@ -99,6 +99,10 @@ public abstract class AbstractConnection {
return state;
}
public long[] getStreams() {
return streams;
}
protected void handleMessage(MessagePayload payload) {
switch (state) {
case ACTIVE:

View File

@ -170,12 +170,13 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
long stream = connection.getNode().getStream();
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
for (long stream : connection.getStreams()) {
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
}
}
}

View File

@ -44,7 +44,7 @@ public class ConnectionInfo extends AbstractConnection {
private ByteBuffer payloadOut;
private V3MessageReader reader = new V3MessageReader();
private boolean syncFinished;
private long lastUpdate = Long.MAX_VALUE;
private long lastUpdate = System.currentTimeMillis();
public ConnectionInfo(InternalContext context, Mode mode,
NetworkAddress node, NetworkHandler.MessageListener listener,

View File

@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
@ -46,7 +47,6 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import static java.util.Collections.synchronizedMap;
/**
* Network handler using java.nio, resulting in less threads.
@ -209,7 +209,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
connection,
channel.register(selector, OP_READ | OP_WRITE, connection)
);
} catch (AsynchronousCloseException ignore) {
} catch (NoRouteToHostException | AsynchronousCloseException ignore) {
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
@ -268,7 +268,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} else {
key.interestOps(OP_READ | OP_WRITE);
}
} catch (NodeException | IOException e) {
} catch (CancelledKeyException | NodeException | IOException e) {
connection.disconnect();
}
if (connection.getState() == DISCONNECTED) {
@ -413,12 +413,13 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE) {
long stream = connection.getNode().getStream();
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
for (long stream : connection.getStreams()) {
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
}
}
}

View File

@ -169,15 +169,24 @@ public class NetworkHandlerTest {
} while (networkHandler.isRunning());
}
@Test
public void ensureNodesAreConnecting() throws Exception {
node.startup();
private Property waitForNetworkStatus(BitmessageContext ctx) throws InterruptedException {
Property status;
do {
Thread.sleep(100);
status = node.status().getProperty("network", "connections", "stream 0");
status = ctx.status().getProperty("network", "connections", "stream 1");
} while (status == null);
assertEquals(1, status.getProperty("outgoing").getValue());
return status;
}
@Test
public void ensureNodesAreConnecting() throws Exception {
node.startup();
Property nodeStatus = waitForNetworkStatus(node);
Property peerStatus = waitForNetworkStatus(peer);
assertEquals(1, nodeStatus.getProperty("outgoing").getValue());
assertEquals(1, peerStatus.getProperty("incoming").getValue());
}
@Test