diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java index d26cabe..89677cd 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.java @@ -21,17 +21,13 @@ import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.utils.Decode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; -import java.io.FileWriter; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; -import java.util.UUID; import static ch.dissem.bitmessage.entity.NetworkMessage.MAGIC_BYTES; import static ch.dissem.bitmessage.factory.BufferPool.bufferPool; @@ -42,8 +38,6 @@ import static ch.dissem.bitmessage.utils.Singleton.cryptography; * Similar to the {@link V3MessageFactory}, but used for NIO buffers which may or may not contain a whole message. */ public class V3MessageReader { - private static final Logger LOG = LoggerFactory.getLogger(V3MessageReader.class); - private ByteBuffer headerBuffer; private ByteBuffer dataBuffer; @@ -53,7 +47,6 @@ public class V3MessageReader { private byte[] checksum; private List messages = new LinkedList<>(); - private SizeInfo sizeInfo = new SizeInfo(); public ByteBuffer getActiveBuffer() { if (state != null && state != ReaderState.DATA) { @@ -88,7 +81,6 @@ public class V3MessageReader { throw new NodeException("Payload of " + length + " bytes received, no more than " + MAX_PAYLOAD_SIZE + " was expected."); } - sizeInfo.add(length); // FIXME: remove this once we have some values to work with checksum = new byte[4]; headerBuffer.get(checksum); state = ReaderState.DATA; @@ -194,37 +186,4 @@ public class V3MessageReader { } private enum ReaderState {MAGIC, HEADER, DATA} - - private class SizeInfo { - private FileWriter file; - private long min = Long.MAX_VALUE; - private long avg = 0; - private long max = Long.MIN_VALUE; - private long count = 0; - - private SizeInfo() { - try { - file = new FileWriter("D:/message_size_info-" + UUID.randomUUID() + ".csv"); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - - private void add(long length) { - avg = (count * avg + length) / (count + 1); - if (length < min) { - min = length; - } - if (length > max) { - max = length; - } - count++; - LOG.info("Received message with data size " + length + "; Min: " + min + "; Max: " + max + "; Avg: " + avg); - try { - file.write(length + "\n"); - } catch (IOException e) { - e.printStackTrace(); - } - } - } } diff --git a/core/src/main/resources/nodes.txt b/core/src/main/resources/nodes.txt index 9466a85..bce982e 100644 --- a/core/src/main/resources/nodes.txt +++ b/core/src/main/resources/nodes.txt @@ -5,4 +5,4 @@ bootstrap8080.bitmessage.org:8080 bootstrap8444.bitmessage.org:8444 [stream 2] -# none yet \ No newline at end of file +# none yet diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index f3900b1..66f7dca 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -228,10 +228,11 @@ public abstract class AbstractConnection { break; case CUSTOM: MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload); - if (response != null) { + if (response == null) { + disconnect(); + } else { send(response); } - disconnect(); break; default: throw new NodeException("Command 'version' or 'verack' expected, but was '" diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index e416da5..fb4cf58 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -17,10 +17,7 @@ package ch.dissem.bitmessage.networking.nio; import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.GetData; -import ch.dissem.bitmessage.entity.MessagePayload; -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.Version; +import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.exception.NodeException; @@ -112,16 +109,18 @@ public class ConnectionInfo extends AbstractConnection { public void updateSyncStatus() { if (!syncFinished) { - syncFinished = reader.getMessages().isEmpty() && syncFinished(null); + syncFinished = (reader == null || reader.getMessages().isEmpty()) && syncFinished(null); } } public boolean isExpired() { switch (state) { case CONNECTING: - return lastUpdate < System.currentTimeMillis() - 30000; + // the TCP timeout starts out at 20 seconds + return lastUpdate < System.currentTimeMillis() - 20_000; case ACTIVE: - return lastUpdate < System.currentTimeMillis() - 30000; + // after verack messages are exchanged, the timeout is raised to 10 minutes + return lastUpdate < System.currentTimeMillis() - 600_000; case DISCONNECTED: return true; default: @@ -150,4 +149,10 @@ public class ConnectionInfo extends AbstractConnection { commonRequestedObjects.addAll(((GetData) payload).getInventory()); } } + + public boolean isWritePending() { + return !sendingQueue.isEmpty() + || headerOut != null && headerOut.hasRemaining() + || payloadOut != null && payloadOut.hasRemaining(); + } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index 6c4ae10..b1d1cb6 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -64,7 +64,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private Selector selector; private ServerSocketChannel serverChannel; private Map connections = new ConcurrentHashMap<>(); - private int requestedObjectsCount; + private volatile int requestedObjectsCount; private Thread starter; @@ -94,13 +94,15 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex public CustomMessage send(InetAddress server, int port, CustomMessage request) { try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { channel.configureBlocking(true); - ByteBuffer buffer = ByteBuffer.allocate(MAX_MESSAGE_SIZE); - new NetworkMessage(request).write(buffer); - buffer.flip(); - while (buffer.hasRemaining()) { - channel.write(buffer); + ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE); + ByteBuffer payloadBuffer = new NetworkMessage(request).writeHeaderAndGetPayloadBuffer(headerBuffer); + headerBuffer.flip(); + while (headerBuffer.hasRemaining()) { + channel.write(headerBuffer); + } + while (payloadBuffer.hasRemaining()) { + channel.write(payloadBuffer); } - buffer.clear(); V3MessageReader reader = new V3MessageReader(); while (channel.isConnected() && reader.getMessages().isEmpty()) { @@ -195,11 +197,25 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex if (missing > 0) { List addresses = ctx.getNodeRegistry().getKnownAddresses(missing, ctx.getStreams()); for (NetworkAddress address : addresses) { + if (isConnectedTo(address)) { + continue; + } try { SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort())); - channel.finishConnect(); + long timeout = System.currentTimeMillis() + 20_000; + while (!channel.finishConnect() && System.currentTimeMillis() < timeout) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + if (!channel.finishConnect()) { + channel.close(); + continue; + } ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, address, listener, @@ -248,6 +264,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex Iterator keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); + keyIterator.remove(); if (key.attachment() instanceof ConnectionInfo) { SocketChannel channel = (SocketChannel) key.channel(); ConnectionInfo connection = (ConnectionInfo) key.attachment(); @@ -258,24 +275,18 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex if (key.isReadable()) { read(channel, connection); } - if (connection.getSendingQueue().isEmpty()) { - if (connection.getState() == DISCONNECTED) { - key.interestOps(0); - key.channel().close(); - } else { - key.interestOps(OP_READ); - } - } else { + if (connection.getState() == DISCONNECTED) { + key.interestOps(0); + channel.close(); + } else if (connection.isWritePending()) { key.interestOps(OP_READ | OP_WRITE); + } else { + key.interestOps(OP_READ); } } catch (CancelledKeyException | NodeException | IOException e) { connection.disconnect(); } - if (connection.getState() == DISCONNECTED) { - connections.remove(connection); - } } - keyIterator.remove(); requestedObjectsCount = requestedObjects.size(); } for (Map.Entry e : connections.entrySet()) { @@ -316,7 +327,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException { - while (channel.read(connection.getInBuffer()) > 0) { + if (channel.read(connection.getInBuffer()) > 0) { connection.updateReader(); } connection.updateSyncStatus(); @@ -442,6 +453,15 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex ); } + private boolean isConnectedTo(NetworkAddress address) { + for (ConnectionInfo c : connections.keySet()) { + if (c.getNode().equals(address)) { + return true; + } + } + return false; + } + @Override public boolean isRunning() { return selector != null && selector.isOpen() && starter.isAlive(); diff --git a/repositories/build.gradle b/repositories/build.gradle index 2ab092c..032ffbc 100644 --- a/repositories/build.gradle +++ b/repositories/build.gradle @@ -14,10 +14,10 @@ sourceCompatibility = 1.8 dependencies { compile project(':core') - compile 'org.flywaydb:flyway-core:3.2.1' + compile 'org.flywaydb:flyway-core:4.0.3' testCompile 'junit:junit:4.12' testCompile 'com.h2database:h2:1.4.190' testCompile 'org.mockito:mockito-core:1.10.19' testCompile project(path: ':core', configuration: 'testArtifacts') testCompile project(':cryptography-bc') -} \ No newline at end of file +}