Better memory management for the in buffer (the same TODO for the out buffer.

This commit is contained in:
Christian Basler 2016-07-25 07:52:27 +02:00
parent 82ee4d05bb
commit 48ff975ffd
11 changed files with 427 additions and 168 deletions
.editorconfig
core/src/main/java/ch/dissem/bitmessage
demo/src/main/java/ch/dissem/bitmessage/demo
networking/src/main/java/ch/dissem/bitmessage/networking

7
.editorconfig Normal file
View File

@ -0,0 +1,7 @@
root = true
[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
indent_size = 4

View File

@ -0,0 +1,107 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.factory;
import ch.dissem.bitmessage.ports.NetworkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.*;
/**
* A pool for {@link ByteBuffer}s. As they may use up a lot of memory,
* they should be reused as efficiently as possible.
*/
class BufferPool {
private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
public static final BufferPool bufferPool = new BufferPool(256, 2048);
private final Map<Size, Integer> capacities = new EnumMap<>(Size.class);
private final Map<Size, Stack<ByteBuffer>> pools = new EnumMap<>(Size.class);
private BufferPool(int small, int medium) {
capacities.put(Size.HEADER, 24);
capacities.put(Size.SMALL, small);
capacities.put(Size.MEDIUM, medium);
capacities.put(Size.LARGE, NetworkHandler.MAX_PAYLOAD_SIZE);
pools.put(Size.HEADER, new Stack<ByteBuffer>());
pools.put(Size.SMALL, new Stack<ByteBuffer>());
pools.put(Size.MEDIUM, new Stack<ByteBuffer>());
pools.put(Size.LARGE, new Stack<ByteBuffer>());
}
public synchronized ByteBuffer allocate(int capacity) {
Size targetSize = getTargetSize(capacity);
Size s = targetSize;
do {
Stack<ByteBuffer> pool = pools.get(s);
if (!pool.isEmpty()) {
return pool.pop();
}
s = s.next();
} while (s != null);
LOG.debug("Creating new buffer of size " + targetSize);
return ByteBuffer.allocate(capacities.get(targetSize));
}
public synchronized ByteBuffer allocate() {
Stack<ByteBuffer> pool = pools.get(Size.HEADER);
if (!pool.isEmpty()) {
return pool.pop();
} else {
return ByteBuffer.allocate(capacities.get(Size.HEADER));
}
}
public synchronized void deallocate(ByteBuffer buffer) {
buffer.clear();
Size size = getTargetSize(buffer.capacity());
if (buffer.capacity() != capacities.get(size)) {
throw new IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() +
" one of " + capacities.values() + " expected.");
}
pools.get(size).push(buffer);
}
private Size getTargetSize(int capacity) {
for (Size s : Size.values()) {
if (capacity <= capacities.get(s)) {
return s;
}
}
throw new IllegalArgumentException("Requested capacity too large: " +
"requested=" + capacity + "; max=" + capacities.get(Size.LARGE));
}
private enum Size {
HEADER, SMALL, MEDIUM, LARGE;
public Size next() {
switch (this) {
case SMALL:
return MEDIUM;
case MEDIUM:
return LARGE;
default:
return null;
}
}
}
}

View File

@ -21,15 +21,20 @@ import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.utils.Decode; import ch.dissem.bitmessage.utils.Decode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.UUID;
import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES; import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES;
import static ch.dissem.bitmessage.factory.BufferPool.bufferPool;
import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE; import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_PAYLOAD_SIZE;
import static ch.dissem.bitmessage.utils.Singleton.cryptography; import static ch.dissem.bitmessage.utils.Singleton.cryptography;
@ -37,51 +42,87 @@ import static ch.dissem.bitmessage.utils.Singleton.cryptography;
* Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message. * Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message.
*/ */
public class V3MessageReader { public class V3MessageReader {
private static final Logger LOG = LoggerFactory.getLogger(V3MessageReader.class);
private ByteBuffer headerBuffer;
private ByteBuffer dataBuffer;
private ReaderState state = ReaderState.MAGIC; private ReaderState state = ReaderState.MAGIC;
private String command; private String command;
private int length; private int length;
private byte[] checksum; private byte[] checksum;
private List<NetworkMessage> messages = new LinkedList<>(); private List<NetworkMessage> messages = new LinkedList<>();
private SizeInfo sizeInfo = new SizeInfo();
public void update(ByteBuffer buffer) { public ByteBuffer getActiveBuffer() {
while (buffer.hasRemaining()) { if (state != null && state != ReaderState.DATA) {
if (headerBuffer == null) {
headerBuffer = bufferPool.allocate();
}
}
return state == ReaderState.DATA ? dataBuffer : headerBuffer;
}
public void update() {
if (state != ReaderState.DATA) {
getActiveBuffer();
headerBuffer.flip();
}
switch (state) { switch (state) {
case MAGIC: case MAGIC:
if (!findMagicBytes(buffer)) return; if (!findMagicBytes(headerBuffer)) {
state = ReaderState.HEADER; headerBuffer.compact();
case HEADER:
if (buffer.remaining() < 20) {
return; return;
} }
command = getCommand(buffer); state = ReaderState.HEADER;
length = (int) Decode.uint32(buffer); case HEADER:
if (headerBuffer.remaining() < 20) {
headerBuffer.compact();
headerBuffer.limit(20);
return;
}
command = getCommand(headerBuffer);
length = (int) Decode.uint32(headerBuffer);
if (length > MAX_PAYLOAD_SIZE) { if (length > MAX_PAYLOAD_SIZE) {
throw new NodeException("Payload of " + length + " bytes received, no more than " + throw new NodeException("Payload of " + length + " bytes received, no more than " +
MAX_PAYLOAD_SIZE + " was expected."); MAX_PAYLOAD_SIZE + " was expected.");
} }
sizeInfo.add(length); // FIXME: remove this once we have some values to work with
checksum = new byte[4]; checksum = new byte[4];
buffer.get(checksum); headerBuffer.get(checksum);
state = ReaderState.DATA; state = ReaderState.DATA;
bufferPool.deallocate(headerBuffer);
headerBuffer = null;
dataBuffer = bufferPool.allocate(length);
dataBuffer.clear();
dataBuffer.limit(length);
case DATA: case DATA:
if (buffer.remaining() < length) { if (dataBuffer.position() < length) {
return; return;
} else {
dataBuffer.flip();
} }
if (!testChecksum(buffer)) { if (!testChecksum(dataBuffer)) {
state = ReaderState.MAGIC;
throw new NodeException("Checksum failed for message '" + command + "'"); throw new NodeException("Checksum failed for message '" + command + "'");
} }
try { try {
MessagePayload payload = V3MessageFactory.getPayload( MessagePayload payload = V3MessageFactory.getPayload(
command, command,
new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + buffer.position(), length), new ByteArrayInputStream(dataBuffer.array(),
dataBuffer.arrayOffset() + dataBuffer.position(), length),
length); length);
if (payload != null) { if (payload != null) {
messages.add(new NetworkMessage(payload)); messages.add(new NetworkMessage(payload));
} }
} catch (IOException e) { } catch (IOException e) {
throw new NodeException(e.getMessage()); throw new NodeException(e.getMessage());
} } finally {
state = ReaderState.MAGIC; state = ReaderState.MAGIC;
bufferPool.deallocate(dataBuffer);
dataBuffer = null;
dataBuffer = null;
} }
} }
} }
@ -138,5 +179,52 @@ public class V3MessageReader {
return true; return true;
} }
private enum ReaderState {MAGIC, HEADER, DATA} /**
* De-allocates all buffers. This method should be called iff the reader isn't used anymore, i.e. when its
* connection is severed.
*/
public void cleanup() {
state = null;
if (headerBuffer != null) {
bufferPool.deallocate(headerBuffer);
}
if (dataBuffer != null) {
bufferPool.deallocate(dataBuffer);
}
}
private enum ReaderState {MAGIC, HEADER, DATA}
private class SizeInfo {
private FileWriter file;
private long min = Long.MAX_VALUE;
private long avg = 0;
private long max = Long.MIN_VALUE;
private long count = 0;
private SizeInfo() {
try {
file = new FileWriter("D:/message_size_info-" + UUID.randomUUID() + ".csv");
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
private void add(long length) {
avg = (count * avg + length) / (count + 1);
if (length < min) {
min = length;
}
if (length > max) {
max = length;
}
count++;
LOG.info("Received message with data size " + length + "; Min: " + min + "; Max: " + max + "; Avg: " + avg);
try {
file.write(length + "\n");
} catch (IOException e) {
e.printStackTrace();
}
}
}
} }

View File

@ -87,7 +87,7 @@ public class MemoryNodeRegistry implements NodeRegistry {
} }
} }
if (result.isEmpty()) { if (result.isEmpty()) {
if (stableNodes.isEmpty()) { if (stableNodes.isEmpty() || stableNodes.get(stream).isEmpty()) {
loadStableNodes(); loadStableNodes();
} }
Set<NetworkAddress> nodes = stableNodes.get(stream); Set<NetworkAddress> nodes = stableNodes.get(stream);

View File

@ -22,7 +22,7 @@ import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.payload.Pubkey; import ch.dissem.bitmessage.entity.payload.Pubkey;
import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler; import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.MemoryNodeRegistry; import ch.dissem.bitmessage.ports.MemoryNodeRegistry;
import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.repository.*;
import org.apache.commons.lang3.text.WordUtils; import org.apache.commons.lang3.text.WordUtils;
@ -53,7 +53,7 @@ public class Application {
.nodeRegistry(new MemoryNodeRegistry()) .nodeRegistry(new MemoryNodeRegistry())
.messageRepo(new JdbcMessageRepository(jdbcConfig)) .messageRepo(new JdbcMessageRepository(jdbcConfig))
.powRepo(new JdbcProofOfWorkRepository(jdbcConfig)) .powRepo(new JdbcProofOfWorkRepository(jdbcConfig))
.networkHandler(new DefaultNetworkHandler()) .networkHandler(new NioNetworkHandler())
.cryptography(new BouncyCryptography()) .cryptography(new BouncyCryptography())
.port(48444) .port(48444)
.listener(plaintext -> System.out.println("New Message from " + plaintext.getFrom() + ": " + plaintext.getSubject())) .listener(plaintext -> System.out.println("New Message from " + plaintext.getFrom() + ": " + plaintext.getSubject()))

View File

@ -18,7 +18,7 @@ package ch.dissem.bitmessage.demo;
import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography; import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler; import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.MemoryNodeRegistry; import ch.dissem.bitmessage.ports.MemoryNodeRegistry;
import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.repository.*;
import ch.dissem.bitmessage.wif.WifExporter; import ch.dissem.bitmessage.wif.WifExporter;
@ -53,7 +53,7 @@ public class Main {
.nodeRegistry(new MemoryNodeRegistry()) .nodeRegistry(new MemoryNodeRegistry())
.messageRepo(new JdbcMessageRepository(jdbcConfig)) .messageRepo(new JdbcMessageRepository(jdbcConfig))
.powRepo(new JdbcProofOfWorkRepository(jdbcConfig)) .powRepo(new JdbcProofOfWorkRepository(jdbcConfig))
.networkHandler(new DefaultNetworkHandler()) .networkHandler(new NioNetworkHandler())
.cryptography(new BouncyCryptography()) .cryptography(new BouncyCryptography())
.port(48444) .port(48444)
.build(); .build();

View File

@ -73,20 +73,15 @@ public abstract class AbstractConnection {
NetworkAddress node, NetworkAddress node,
NetworkHandler.MessageListener listener, NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> commonRequestedObjects,
long syncTimeout, boolean threadsafe) { long syncTimeout) {
this.ctx = context; this.ctx = context;
this.mode = mode; this.mode = mode;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node; this.node = node;
this.listener = listener; this.listener = listener;
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
if (threadsafe) {
this.ivCache = new ConcurrentHashMap<>();
this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)); this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000));
} else { this.ivCache = new ConcurrentHashMap<>();
this.ivCache = new HashMap<>();
this.requestedObjects = new HashSet<>();
}
this.sendingQueue = new ConcurrentLinkedDeque<>(); this.sendingQueue = new ConcurrentLinkedDeque<>();
this.state = CONNECTING; this.state = CONNECTING;
this.commonRequestedObjects = commonRequestedObjects; this.commonRequestedObjects = commonRequestedObjects;
@ -177,7 +172,7 @@ public abstract class AbstractConnection {
} catch (IOException e) { } catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally { } finally {
if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) { if (!commonRequestedObjects.remove(objectMessage.getInventoryVector())) {
LOG.debug("Received object that wasn't requested."); LOG.debug("Received object that wasn't requested.");
} }
} }

View File

@ -78,7 +78,7 @@ class Connection extends AbstractConnection {
private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket,
Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) {
super(context, mode, node, listener, commonRequestedObjects, syncTimeout, true); super(context, mode, node, listener, commonRequestedObjects, syncTimeout);
this.startTime = UnixTime.now(); this.startTime = UnixTime.now();
this.socket = socket; this.socket = socket;
} }

View File

@ -92,7 +92,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
} }
} catch (IOException e) { } catch (IOException e) {
throw new ApplicationException(e); throw new NodeException(e.getMessage(), e);
} }
} }

View File

@ -17,11 +17,13 @@
package ch.dissem.bitmessage.networking.nio; package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload; import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version; import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.factory.V3MessageReader;
import ch.dissem.bitmessage.networking.AbstractConnection; import ch.dissem.bitmessage.networking.AbstractConnection;
import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.ports.NetworkHandler;
@ -39,15 +41,15 @@ import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_MESSAGE_SIZE;
* Represents the current state of a connection. * Represents the current state of a connection.
*/ */
public class ConnectionInfo extends AbstractConnection { public class ConnectionInfo extends AbstractConnection {
private ByteBuffer in = ByteBuffer.allocate(MAX_MESSAGE_SIZE);
private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE);
private V3MessageReader reader = new V3MessageReader(); private V3MessageReader reader = new V3MessageReader();
private boolean syncFinished; private boolean syncFinished;
private long lastUpdate = Long.MAX_VALUE;
public ConnectionInfo(InternalContext context, Mode mode, public ConnectionInfo(InternalContext context, Mode mode,
NetworkAddress node, NetworkHandler.MessageListener listener, NetworkAddress node, NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, long syncTimeout) {
super(context, mode, node, listener, commonRequestedObjects, syncTimeout, false); super(context, mode, node, listener, commonRequestedObjects, syncTimeout);
out.flip(); out.flip();
if (mode == CLIENT || mode == SYNC) { if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build());
@ -67,7 +69,20 @@ public class ConnectionInfo extends AbstractConnection {
} }
public ByteBuffer getInBuffer() { public ByteBuffer getInBuffer() {
return in; if (reader == null) {
throw new NodeException("Node is disconnected");
}
return reader.getActiveBuffer();
}
public void updateWriter() {
if ((out == null || !out.hasRemaining()) && !sendingQueue.isEmpty()) {
out.clear();
MessagePayload payload = sendingQueue.poll();
new NetworkMessage(payload).write(out);
out.flip();
lastUpdate = System.currentTimeMillis();
}
} }
public ByteBuffer getOutBuffer() { public ByteBuffer getOutBuffer() {
@ -75,7 +90,7 @@ public class ConnectionInfo extends AbstractConnection {
} }
public void updateReader() { public void updateReader() {
reader.update(in); reader.update();
if (!reader.getMessages().isEmpty()) { if (!reader.getMessages().isEmpty()) {
Iterator<NetworkMessage> iterator = reader.getMessages().iterator(); Iterator<NetworkMessage> iterator = reader.getMessages().iterator();
NetworkMessage msg = null; NetworkMessage msg = null;
@ -86,6 +101,7 @@ public class ConnectionInfo extends AbstractConnection {
} }
syncFinished = syncFinished(msg); syncFinished = syncFinished(msg);
} }
lastUpdate = System.currentTimeMillis();
} }
public void updateSyncStatus() { public void updateSyncStatus() {
@ -94,6 +110,28 @@ public class ConnectionInfo extends AbstractConnection {
} }
} }
public boolean isExpired() {
switch (state) {
case CONNECTING:
return lastUpdate < System.currentTimeMillis() - 30000;
case ACTIVE:
return lastUpdate < System.currentTimeMillis() - 30000;
case DISCONNECTED:
return true;
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
@Override
public synchronized void disconnect() {
super.disconnect();
if (reader != null) {
reader.cleanup();
reader = null;
}
}
public boolean isSyncFinished() { public boolean isSyncFinished() {
return syncFinished; return syncFinished;
} }
@ -101,5 +139,9 @@ public class ConnectionInfo extends AbstractConnection {
@Override @Override
protected void send(MessagePayload payload) { protected void send(MessagePayload payload) {
sendingQueue.add(payload); sendingQueue.add(payload);
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
commonRequestedObjects.addAll(((GetData) payload).getInventory());
}
} }
} }

View File

@ -19,7 +19,6 @@ package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.CustomMessage; import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.GetData; import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
@ -50,8 +49,7 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE; import static java.nio.channels.SelectionKey.OP_WRITE;
import static java.util.Collections.newSetFromMap; import static java.util.Collections.synchronizedMap;
import static java.util.Collections.synchronizedSet;
/** /**
* Network handler using java.nio, resulting in less threads. * Network handler using java.nio, resulting in less threads.
@ -59,7 +57,7 @@ import static java.util.Collections.synchronizedSet;
public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder { public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder {
private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class); private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class);
private final ExecutorService pool = Executors.newCachedThreadPool( private final ExecutorService threadPool = Executors.newCachedThreadPool(
pool("network") pool("network")
.lowPrio() .lowPrio()
.daemon() .daemon()
@ -68,22 +66,24 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
private InternalContext ctx; private InternalContext ctx;
private Selector selector; private Selector selector;
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private Set<ConnectionInfo> connections = synchronizedSet(newSetFromMap(new WeakHashMap<ConnectionInfo, Boolean>())); private Map<ConnectionInfo, SelectionKey> connections = synchronizedMap(new WeakHashMap<ConnectionInfo, SelectionKey>());
private int requestedObjectsCount;
private Thread starter;
@Override @Override
public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) { public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) {
return pool.submit(new Callable<Void>() { return threadPool.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
Set<InventoryVector> requestedObjects = new HashSet<>();
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.configureBlocking(false); channel.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
listener, new HashSet<InventoryVector>(), timeoutInSeconds); listener, new HashSet<InventoryVector>(), timeoutInSeconds);
connections.add(connection); connections.put(connection, null);
while (channel.isConnected() && !connection.isSyncFinished()) { while (channel.isConnected() && !connection.isSyncFinished()) {
write(requestedObjects, channel, connection); write(channel, connection);
read(channel, connection); read(channel, connection);
Thread.sleep(10); Thread.sleep(10);
} }
@ -109,10 +109,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
V3MessageReader reader = new V3MessageReader(); V3MessageReader reader = new V3MessageReader();
while (channel.isConnected() && reader.getMessages().isEmpty()) { while (channel.isConnected() && reader.getMessages().isEmpty()) {
if (channel.read(buffer) > 0) { if (channel.read(reader.getActiveBuffer()) > 0) {
buffer.flip(); reader.update();
reader.update(buffer);
buffer.compact();
} else { } else {
throw new NodeException("No response from node " + server); throw new NodeException("No response from node " + server);
} }
@ -168,8 +166,10 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
listener, listener,
requestedObjects, 0 requestedObjects, 0
); );
accepted.register(selector, OP_READ | OP_WRITE, connection); connections.put(
connections.add(connection); connection,
accepted.register(selector, OP_READ | OP_WRITE, connection)
);
} catch (AsynchronousCloseException ignore) { } catch (AsynchronousCloseException ignore) {
LOG.trace(ignore.getMessage()); LOG.trace(ignore.getMessage());
} catch (IOException e) { } catch (IOException e) {
@ -186,29 +186,55 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
}); });
thread("connection starter", new Runnable() { starter = thread("connection starter", new Runnable() {
@Override @Override
public void run() { public void run() {
while (selector.isOpen()) { while (selector.isOpen()) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses( int missing = NETWORK_MAGIC_NUMBER;
2, ctx.getStreams()); for (ConnectionInfo connectionInfo : connections.keySet()) {
if (connectionInfo.getState() == ACTIVE) {
missing--;
if (missing == 0) break;
}
}
if (missing > 0) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(missing, ctx.getStreams());
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
try { try {
SocketChannel channel = SocketChannel.open( SocketChannel channel = SocketChannel.open();
new InetSocketAddress(address.toInetAddress(), address.getPort()));
channel.configureBlocking(false); channel.configureBlocking(false);
channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort()));
channel.finishConnect();
ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT,
address, address,
listener, listener,
requestedObjects, 0 requestedObjects, 0
); );
channel.register(selector, OP_READ | OP_WRITE, connection); connections.put(
connections.add(connection); connection,
channel.register(selector, OP_READ | OP_WRITE, connection)
);
} catch (AsynchronousCloseException ignore) { } catch (AsynchronousCloseException ignore) {
} catch (IOException e) { } catch (IOException e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
} }
} }
}
Iterator<Map.Entry<ConnectionInfo, SelectionKey>> it = connections.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ConnectionInfo, SelectionKey> e = it.next();
if (!e.getValue().isValid() || e.getKey().isExpired()) {
try {
e.getValue().channel().close();
} catch (Exception ignore) {
}
e.getValue().cancel();
e.getValue().attach(null);
it.remove();
e.getKey().disconnect();
}
}
try { try {
Thread.sleep(30_000); Thread.sleep(30_000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -230,8 +256,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
if (key.attachment() instanceof ConnectionInfo) { if (key.attachment() instanceof ConnectionInfo) {
SocketChannel channel = (SocketChannel) key.channel(); SocketChannel channel = (SocketChannel) key.channel();
ConnectionInfo connection = (ConnectionInfo) key.attachment(); ConnectionInfo connection = (ConnectionInfo) key.attachment();
try {
if (key.isWritable()) { if (key.isWritable()) {
write(requestedObjects, channel, connection); write(channel, connection);
} }
if (key.isReadable()) { if (key.isReadable()) {
read(channel, connection); read(channel, connection);
@ -246,17 +273,20 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} else { } else {
key.interestOps(OP_READ | OP_WRITE); key.interestOps(OP_READ | OP_WRITE);
} }
} catch (NodeException | IOException e) {
connection.disconnect();
}
if (connection.getState() == DISCONNECTED) { if (connection.getState() == DISCONNECTED) {
connections.remove(connection); connections.remove(connection);
} }
} }
keyIterator.remove(); keyIterator.remove();
requestedObjectsCount = requestedObjects.size();
} }
for (SelectionKey key : selector.keys()) { for (Map.Entry<ConnectionInfo, SelectionKey> e : connections.entrySet()) {
if ((key.interestOps() & OP_WRITE) == 0) { if (e.getValue().isValid() && (e.getValue().interestOps() & OP_WRITE) == 0) {
if (key.attachment() instanceof ConnectionInfo && if (!e.getKey().getSendingQueue().isEmpty()) {
!((ConnectionInfo) key.attachment()).getSendingQueue().isEmpty()) { e.getValue().interestOps(OP_READ | OP_WRITE);
key.interestOps(OP_READ | OP_WRITE);
} }
} }
} }
@ -270,54 +300,44 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
}); });
} }
private static void write(Set<InventoryVector> requestedObjects, SocketChannel channel, ConnectionInfo connection) private static void write(SocketChannel channel, ConnectionInfo connection)
throws IOException { throws IOException {
if (!connection.getSendingQueue().isEmpty()) { writeBuffer(connection.getOutBuffer(), channel);
ByteBuffer buffer = connection.getOutBuffer();
if (buffer.hasRemaining()) { connection.updateWriter();
writeBuffer(connection.getOutBuffer(), channel);
}
private static void writeBuffer(ByteBuffer buffer, SocketChannel channel) throws IOException {
if (buffer != null && buffer.hasRemaining()) {
channel.write(buffer); channel.write(buffer);
} }
while (!buffer.hasRemaining()
&& !connection.getSendingQueue().isEmpty()) {
buffer.clear();
MessagePayload payload = connection.getSendingQueue().poll();
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
}
new NetworkMessage(payload).write(buffer);
buffer.flip();
if (buffer.hasRemaining()) {
channel.write(buffer);
}
}
}
} }
private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException { private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException {
ByteBuffer buffer = connection.getInBuffer(); while (channel.read(connection.getInBuffer()) > 0) {
while (channel.read(buffer) > 0) {
buffer.flip();
connection.updateReader(); connection.updateReader();
buffer.compact();
} }
connection.updateSyncStatus(); connection.updateSyncStatus();
} }
private void thread(String threadName, Runnable runnable) { private Thread thread(String threadName, Runnable runnable) {
Thread thread = new Thread(runnable, threadName); Thread thread = new Thread(runnable, threadName);
thread.setDaemon(true); thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY); thread.setPriority(Thread.MIN_PRIORITY);
thread.start(); thread.start();
return thread;
} }
@Override @Override
public void stop() { public void stop() {
try { try {
serverChannel.socket().close(); serverChannel.socket().close();
for (SelectionKey selectionKey : selector.keys()) { selector.close();
for (SelectionKey selectionKey : connections.values()) {
selectionKey.channel().close(); selectionKey.channel().close();
} }
selector.close();
} catch (IOException e) { } catch (IOException e) {
throw new ApplicationException(e); throw new ApplicationException(e);
} }
@ -326,7 +346,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
@Override @Override
public void offer(InventoryVector iv) { public void offer(InventoryVector iv) {
List<ConnectionInfo> target = new LinkedList<>(); List<ConnectionInfo> target = new LinkedList<>();
for (ConnectionInfo connection : connections) { for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
target.add(connection); target.add(connection);
} }
@ -346,7 +366,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
Map<ConnectionInfo, List<InventoryVector>> distribution = new HashMap<>(); Map<ConnectionInfo, List<InventoryVector>> distribution = new HashMap<>();
for (ConnectionInfo connection : connections) { for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE) { if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>()); distribution.put(connection, new LinkedList<InventoryVector>());
} }
@ -391,7 +411,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
TreeMap<Long, Integer> incomingConnections = new TreeMap<>(); TreeMap<Long, Integer> incomingConnections = new TreeMap<>();
TreeMap<Long, Integer> outgoingConnections = new TreeMap<>(); TreeMap<Long, Integer> outgoingConnections = new TreeMap<>();
for (ConnectionInfo connection : connections) { for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE) { if (connection.getState() == ACTIVE) {
long stream = connection.getNode().getStream(); long stream = connection.getNode().getStream();
streams.add(stream); streams.add(stream);
@ -417,13 +437,13 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
return new Property("network", null, return new Property("network", null,
new Property("connectionManager", isRunning() ? "running" : "stopped"), new Property("connectionManager", isRunning() ? "running" : "stopped"),
new Property("connections", null, streamProperties), new Property("connections", null, streamProperties),
new Property("requestedObjects", "requestedObjects.size()") // TODO new Property("requestedObjects", requestedObjectsCount)
); );
} }
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return selector != null && selector.isOpen(); return selector != null && selector.isOpen() && starter.isAlive();
} }
@Override @Override