From e9acb0071e23502a44dcba90d8a1acb3c976e7fa Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Wed, 21 Feb 2018 07:44:53 +0100 Subject: [PATCH] Make network handler more robust --- .../networking/nio/NioNetworkHandler.kt | 50 +++++++++++++------ .../bitmessage/repository/TestJdbcConfig.kt | 1 - 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt index fff2d2b..01ef19f 100644 --- a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt @@ -34,6 +34,7 @@ import ch.dissem.bitmessage.utils.Property import ch.dissem.bitmessage.utils.ThreadFactoryBuilder.Companion.pool import ch.dissem.bitmessage.utils.UnixTime.now import org.slf4j.LoggerFactory +import java.io.Closeable import java.io.IOException import java.net.InetAddress import java.net.InetSocketAddress @@ -53,7 +54,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { pool("network") .lowPrio() .daemon() - .build()) + .build() + ) private lateinit var ctx: InternalContext private var selector: Selector? = null @@ -72,9 +74,11 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { return threadPool.submit(Callable { SocketChannel.open(InetSocketAddress(server, port)).use { channel -> channel.configureBlocking(false) - val connection = Connection(ctx, SYNC, + val connection = Connection( + ctx, SYNC, NetworkAddress.Builder().ip(server).port(port).stream(1).build(), - HashMap(), timeoutInSeconds) + HashMap(), timeoutInSeconds + ) while (channel.isConnected && !connection.isSyncFinished) { write(channel, connection.io) read(channel, connection.io) @@ -216,7 +220,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { try { val accepted = (key.channel() as ServerSocketChannel).accept() accepted.configureBlocking(false) - val connection = Connection(ctx, SERVER, + val connection = Connection( + ctx, SERVER, NetworkAddress( time = now, stream = 1L, @@ -281,7 +286,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { if (selectionKey.isValid && selectionKey.interestOps() and OP_WRITE == 0 && selectionKey.interestOps() and OP_CONNECT == 0 - && !connection.nothingToSend) { + && !connection.nothingToSend + ) { selectionKey.interestOps(OP_READ or OP_WRITE) } } catch (x: CancelledKeyException) { @@ -338,10 +344,18 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { } override fun stop() { - serverChannel?.socket()?.close() - selector?.close() + tryClose(serverChannel?.socket()) + tryClose(selector) for (selectionKey in connections.values) { - selectionKey.channel().close() + tryClose(selectionKey.channel()) + } + } + + private fun tryClose(item: Closeable?) { + try { + item?.close() + } catch (e: IOException) { + LOG.debug(e.message, e) } } @@ -382,7 +396,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { } } if (connection.knowsOf(next) && !connection.requested(next)) { - val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection") + val ivs = distribution[connection] + ?: throw IllegalStateException("distribution not available for $connection") if (ivs.size == GetData.MAX_INVENTORY_SIZE) { connection.send(GetData(ivs)) ivs.clear() @@ -406,7 +421,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { } for (connection in distribution.keys) { - val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection") + val ivs = + distribution[connection] ?: throw IllegalStateException("distribution not available for $connection") if (!ivs.isEmpty()) { connection.send(GetData(ivs)) } @@ -434,12 +450,16 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { for (stream in streams) { val incoming = incomingConnections[stream] ?: 0 val outgoing = outgoingConnections[stream] ?: 0 - streamProperties.add(Property("stream " + stream, Property("nodes", incoming + outgoing), - Property("incoming", incoming), - Property("outgoing", outgoing) - )) + streamProperties.add( + Property( + "stream " + stream, Property("nodes", incoming + outgoing), + Property("incoming", incoming), + Property("outgoing", outgoing) + ) + ) } - return Property("network", + return Property( + "network", Property("connectionManager", if (isRunning) "running" else "stopped"), Property("connections", streamProperties), Property("requestedObjects", requestedObjects.size) diff --git a/repositories/src/test/kotlin/ch/dissem/bitmessage/repository/TestJdbcConfig.kt b/repositories/src/test/kotlin/ch/dissem/bitmessage/repository/TestJdbcConfig.kt index c5c9ec6..1fa4501 100644 --- a/repositories/src/test/kotlin/ch/dissem/bitmessage/repository/TestJdbcConfig.kt +++ b/repositories/src/test/kotlin/ch/dissem/bitmessage/repository/TestJdbcConfig.kt @@ -17,7 +17,6 @@ package ch.dissem.bitmessage.repository import org.h2.tools.Server -import org.slf4j.LoggerFactory /** * JdbcConfig to be used for tests. Uses an in-memory database and adds a useful [.reset] method resetting