Make network handler more robust

This commit is contained in:
Christian Basler 2018-02-21 07:44:53 +01:00
parent c425298b67
commit e9acb0071e
2 changed files with 35 additions and 16 deletions

View File

@ -34,6 +34,7 @@ import ch.dissem.bitmessage.utils.Property
import ch.dissem.bitmessage.utils.ThreadFactoryBuilder.Companion.pool import ch.dissem.bitmessage.utils.ThreadFactoryBuilder.Companion.pool
import ch.dissem.bitmessage.utils.UnixTime.now import ch.dissem.bitmessage.utils.UnixTime.now
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.io.Closeable
import java.io.IOException import java.io.IOException
import java.net.InetAddress import java.net.InetAddress
import java.net.InetSocketAddress import java.net.InetSocketAddress
@ -53,7 +54,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
pool("network") pool("network")
.lowPrio() .lowPrio()
.daemon() .daemon()
.build()) .build()
)
private lateinit var ctx: InternalContext private lateinit var ctx: InternalContext
private var selector: Selector? = null private var selector: Selector? = null
@ -72,9 +74,11 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
return threadPool.submit(Callable<Void> { return threadPool.submit(Callable<Void> {
SocketChannel.open(InetSocketAddress(server, port)).use { channel -> SocketChannel.open(InetSocketAddress(server, port)).use { channel ->
channel.configureBlocking(false) channel.configureBlocking(false)
val connection = Connection(ctx, SYNC, val connection = Connection(
ctx, SYNC,
NetworkAddress.Builder().ip(server).port(port).stream(1).build(), NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
HashMap(), timeoutInSeconds) HashMap(), timeoutInSeconds
)
while (channel.isConnected && !connection.isSyncFinished) { while (channel.isConnected && !connection.isSyncFinished) {
write(channel, connection.io) write(channel, connection.io)
read(channel, connection.io) read(channel, connection.io)
@ -216,7 +220,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
try { try {
val accepted = (key.channel() as ServerSocketChannel).accept() val accepted = (key.channel() as ServerSocketChannel).accept()
accepted.configureBlocking(false) accepted.configureBlocking(false)
val connection = Connection(ctx, SERVER, val connection = Connection(
ctx, SERVER,
NetworkAddress( NetworkAddress(
time = now, time = now,
stream = 1L, stream = 1L,
@ -281,7 +286,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
if (selectionKey.isValid if (selectionKey.isValid
&& selectionKey.interestOps() and OP_WRITE == 0 && selectionKey.interestOps() and OP_WRITE == 0
&& selectionKey.interestOps() and OP_CONNECT == 0 && selectionKey.interestOps() and OP_CONNECT == 0
&& !connection.nothingToSend) { && !connection.nothingToSend
) {
selectionKey.interestOps(OP_READ or OP_WRITE) selectionKey.interestOps(OP_READ or OP_WRITE)
} }
} catch (x: CancelledKeyException) { } catch (x: CancelledKeyException) {
@ -338,10 +344,18 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
} }
override fun stop() { override fun stop() {
serverChannel?.socket()?.close() tryClose(serverChannel?.socket())
selector?.close() tryClose(selector)
for (selectionKey in connections.values) { for (selectionKey in connections.values) {
selectionKey.channel().close() tryClose(selectionKey.channel())
}
}
private fun tryClose(item: Closeable?) {
try {
item?.close()
} catch (e: IOException) {
LOG.debug(e.message, e)
} }
} }
@ -382,7 +396,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
} }
} }
if (connection.knowsOf(next) && !connection.requested(next)) { if (connection.knowsOf(next) && !connection.requested(next)) {
val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection") val ivs = distribution[connection]
?: throw IllegalStateException("distribution not available for $connection")
if (ivs.size == GetData.MAX_INVENTORY_SIZE) { if (ivs.size == GetData.MAX_INVENTORY_SIZE) {
connection.send(GetData(ivs)) connection.send(GetData(ivs))
ivs.clear() ivs.clear()
@ -406,7 +421,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
} }
for (connection in distribution.keys) { for (connection in distribution.keys) {
val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection") val ivs =
distribution[connection] ?: throw IllegalStateException("distribution not available for $connection")
if (!ivs.isEmpty()) { if (!ivs.isEmpty()) {
connection.send(GetData(ivs)) connection.send(GetData(ivs))
} }
@ -434,12 +450,16 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
for (stream in streams) { for (stream in streams) {
val incoming = incomingConnections[stream] ?: 0 val incoming = incomingConnections[stream] ?: 0
val outgoing = outgoingConnections[stream] ?: 0 val outgoing = outgoingConnections[stream] ?: 0
streamProperties.add(Property("stream " + stream, Property("nodes", incoming + outgoing), streamProperties.add(
Property("incoming", incoming), Property(
Property("outgoing", outgoing) "stream " + stream, Property("nodes", incoming + outgoing),
)) Property("incoming", incoming),
Property("outgoing", outgoing)
)
)
} }
return Property("network", return Property(
"network",
Property("connectionManager", if (isRunning) "running" else "stopped"), Property("connectionManager", if (isRunning) "running" else "stopped"),
Property("connections", streamProperties), Property("connections", streamProperties),
Property("requestedObjects", requestedObjects.size) Property("requestedObjects", requestedObjects.size)

View File

@ -17,7 +17,6 @@
package ch.dissem.bitmessage.repository package ch.dissem.bitmessage.repository
import org.h2.tools.Server import org.h2.tools.Server
import org.slf4j.LoggerFactory
/** /**
* JdbcConfig to be used for tests. Uses an in-memory database and adds a useful [.reset] method resetting * JdbcConfig to be used for tests. Uses an in-memory database and adds a useful [.reset] method resetting