It looks like the NIO network handler works now - some testing needed to see how reliably

This commit is contained in:
Christian Basler 2016-08-24 22:17:02 +02:00
parent 3a92bab9ba
commit caa2219a63
6 changed files with 59 additions and 74 deletions

View File

@ -21,17 +21,13 @@ import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.utils.Decode; import ch.dissem.bitmessage.utils.Decode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.UUID;
import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES; import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES;
import static ch.dissem.bitmessage.factory.BufferPool.bufferPool; import static ch.dissem.bitmessage.factory.BufferPool.bufferPool;
@ -42,8 +38,6 @@ import static ch.dissem.bitmessage.utils.Singleton.cryptography;
* Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message. * Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message.
*/ */
public class V3MessageReader { public class V3MessageReader {
private static final Logger LOG = LoggerFactory.getLogger(V3MessageReader.class);
private ByteBuffer headerBuffer; private ByteBuffer headerBuffer;
private ByteBuffer dataBuffer; private ByteBuffer dataBuffer;
@ -53,7 +47,6 @@ public class V3MessageReader {
private byte[] checksum; private byte[] checksum;
private List<NetworkMessage> messages = new LinkedList<>(); private List<NetworkMessage> messages = new LinkedList<>();
private SizeInfo sizeInfo = new SizeInfo();
public ByteBuffer getActiveBuffer() { public ByteBuffer getActiveBuffer() {
if (state != null && state != ReaderState.DATA) { if (state != null && state != ReaderState.DATA) {
@ -88,7 +81,6 @@ public class V3MessageReader {
throw new NodeException("Payload of " + length + " bytes received, no more than " + throw new NodeException("Payload of " + length + " bytes received, no more than " +
MAX_PAYLOAD_SIZE + " was expected."); MAX_PAYLOAD_SIZE + " was expected.");
} }
sizeInfo.add(length); // FIXME: remove this once we have some values to work with
checksum = new byte[4]; checksum = new byte[4];
headerBuffer.get(checksum); headerBuffer.get(checksum);
state = ReaderState.DATA; state = ReaderState.DATA;
@ -194,37 +186,4 @@ public class V3MessageReader {
} }
private enum ReaderState {MAGIC, HEADER, DATA} private enum ReaderState {MAGIC, HEADER, DATA}
private class SizeInfo {
private FileWriter file;
private long min = Long.MAX_VALUE;
private long avg = 0;
private long max = Long.MIN_VALUE;
private long count = 0;
private SizeInfo() {
try {
file = new FileWriter("D:/message_size_info-" + UUID.randomUUID() + ".csv");
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
private void add(long length) {
avg = (count * avg + length) / (count + 1);
if (length < min) {
min = length;
}
if (length > max) {
max = length;
}
count++;
LOG.info("Received message with data size " + length + "; Min: " + min + "; Max: " + max + "; Avg: " + avg);
try {
file.write(length + "\n");
} catch (IOException e) {
e.printStackTrace();
}
}
}
} }

View File

@ -228,10 +228,11 @@ public abstract class AbstractConnection {
break; break;
case CUSTOM: case CUSTOM:
MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload); MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload);
if (response != null) { if (response == null) {
disconnect();
} else {
send(response); send(response);
} }
disconnect();
break; break;
default: default:
throw new NodeException("Command 'version' or 'verack' expected, but was '" throw new NodeException("Command 'version' or 'verack' expected, but was '"

View File

@ -17,10 +17,7 @@
package ch.dissem.bitmessage.networking.nio; package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.GetData; import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.exception.NodeException;
@ -112,16 +109,18 @@ public class ConnectionInfo extends AbstractConnection {
public void updateSyncStatus() { public void updateSyncStatus() {
if (!syncFinished) { if (!syncFinished) {
syncFinished = reader.getMessages().isEmpty() && syncFinished(null); syncFinished = (reader == null || reader.getMessages().isEmpty()) && syncFinished(null);
} }
} }
public boolean isExpired() { public boolean isExpired() {
switch (state) { switch (state) {
case CONNECTING: case CONNECTING:
return lastUpdate < System.currentTimeMillis() - 30000; // the TCP timeout starts out at 20 seconds
return lastUpdate < System.currentTimeMillis() - 20_000;
case ACTIVE: case ACTIVE:
return lastUpdate < System.currentTimeMillis() - 30000; // after verack messages are exchanged, the timeout is raised to 10 minutes
return lastUpdate < System.currentTimeMillis() - 600_000;
case DISCONNECTED: case DISCONNECTED:
return true; return true;
default: default:
@ -150,4 +149,10 @@ public class ConnectionInfo extends AbstractConnection {
commonRequestedObjects.addAll(((GetData) payload).getInventory()); commonRequestedObjects.addAll(((GetData) payload).getInventory());
} }
} }
public boolean isWritePending() {
return !sendingQueue.isEmpty()
|| headerOut != null && headerOut.hasRemaining()
|| payloadOut != null && payloadOut.hasRemaining();
}
} }

View File

@ -64,7 +64,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
private Selector selector; private Selector selector;
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>(); private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>();
private int requestedObjectsCount; private volatile int requestedObjectsCount;
private Thread starter; private Thread starter;
@ -94,13 +94,15 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
public CustomMessage send(InetAddress server, int port, CustomMessage request) { public CustomMessage send(InetAddress server, int port, CustomMessage request) {
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.configureBlocking(true); channel.configureBlocking(true);
ByteBuffer buffer = ByteBuffer.allocate(MAX_MESSAGE_SIZE); ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
new NetworkMessage(request).write(buffer); ByteBuffer payloadBuffer = new NetworkMessage(request).writeHeaderAndGetPayloadBuffer(headerBuffer);
buffer.flip(); headerBuffer.flip();
while (buffer.hasRemaining()) { while (headerBuffer.hasRemaining()) {
channel.write(buffer); channel.write(headerBuffer);
}
while (payloadBuffer.hasRemaining()) {
channel.write(payloadBuffer);
} }
buffer.clear();
V3MessageReader reader = new V3MessageReader(); V3MessageReader reader = new V3MessageReader();
while (channel.isConnected() && reader.getMessages().isEmpty()) { while (channel.isConnected() && reader.getMessages().isEmpty()) {
@ -195,11 +197,25 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
if (missing > 0) { if (missing > 0) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(missing, ctx.getStreams()); List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(missing, ctx.getStreams());
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
if (isConnectedTo(address)) {
continue;
}
try { try {
SocketChannel channel = SocketChannel.open(); SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false); channel.configureBlocking(false);
channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort())); channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort()));
channel.finishConnect(); long timeout = System.currentTimeMillis() + 20_000;
while (!channel.finishConnect() && System.currentTimeMillis() < timeout) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
if (!channel.finishConnect()) {
channel.close();
continue;
}
ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT,
address, address,
listener, listener,
@ -248,6 +264,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next(); SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.attachment() instanceof ConnectionInfo) { if (key.attachment() instanceof ConnectionInfo) {
SocketChannel channel = (SocketChannel) key.channel(); SocketChannel channel = (SocketChannel) key.channel();
ConnectionInfo connection = (ConnectionInfo) key.attachment(); ConnectionInfo connection = (ConnectionInfo) key.attachment();
@ -258,24 +275,18 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
if (key.isReadable()) { if (key.isReadable()) {
read(channel, connection); read(channel, connection);
} }
if (connection.getSendingQueue().isEmpty()) {
if (connection.getState() == DISCONNECTED) { if (connection.getState() == DISCONNECTED) {
key.interestOps(0); key.interestOps(0);
key.channel().close(); channel.close();
} else if (connection.isWritePending()) {
key.interestOps(OP_READ | OP_WRITE);
} else { } else {
key.interestOps(OP_READ); key.interestOps(OP_READ);
} }
} else {
key.interestOps(OP_READ | OP_WRITE);
}
} catch (CancelledKeyException | NodeException | IOException e) { } catch (CancelledKeyException | NodeException | IOException e) {
connection.disconnect(); connection.disconnect();
} }
if (connection.getState() == DISCONNECTED) {
connections.remove(connection);
} }
}
keyIterator.remove();
requestedObjectsCount = requestedObjects.size(); requestedObjectsCount = requestedObjects.size();
} }
for (Map.Entry<ConnectionInfo, SelectionKey> e : connections.entrySet()) { for (Map.Entry<ConnectionInfo, SelectionKey> e : connections.entrySet()) {
@ -316,7 +327,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException { private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException {
while (channel.read(connection.getInBuffer()) > 0) { if (channel.read(connection.getInBuffer()) > 0) {
connection.updateReader(); connection.updateReader();
} }
connection.updateSyncStatus(); connection.updateSyncStatus();
@ -442,6 +453,15 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
); );
} }
private boolean isConnectedTo(NetworkAddress address) {
for (ConnectionInfo c : connections.keySet()) {
if (c.getNode().equals(address)) {
return true;
}
}
return false;
}
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return selector != null && selector.isOpen() && starter.isAlive(); return selector != null && selector.isOpen() && starter.isAlive();

View File

@ -14,7 +14,7 @@ sourceCompatibility = 1.8
dependencies { dependencies {
compile project(':core') compile project(':core')
compile 'org.flywaydb:flyway-core:3.2.1' compile 'org.flywaydb:flyway-core:4.0.3'
testCompile 'junit:junit:4.12' testCompile 'junit:junit:4.12'
testCompile 'com.h2database:h2:1.4.190' testCompile 'com.h2database:h2:1.4.190'
testCompile 'org.mockito:mockito-core:1.10.19' testCompile 'org.mockito:mockito-core:1.10.19'