From 519f457476bba1aeb6c933b6194ed8049a66d15a Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Mon, 15 Oct 2018 10:49:04 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fix=20connectivity=20issues=20an?= =?UTF-8?q?d=20improve=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 22 +-- .../dissem/bitmessage/entity/ObjectMessage.kt | 32 ++-- .../dissem/bitmessage/factory/BufferPool.kt | 145 ------------------ .../bitmessage/factory/V3MessageReader.kt | 125 +++++++-------- .../bitmessage/security/CryptographyTest.kt | 36 +++-- demo/build.gradle | 22 +-- .../dissem/bitmessage/demo/Application.java | 4 + .../java/ch/dissem/bitmessage/demo/Main.java | 1 + .../java/ch/dissem/bitmessage/SystemTest.kt | 6 +- .../bitmessage/networking/nio/Connection.kt | 8 +- .../bitmessage/networking/nio/ConnectionIO.kt | 18 +-- .../nio/NetworkConnectionInitializer.kt | 4 +- .../networking/nio/NioNetworkHandler.kt | 42 ++--- 13 files changed, 152 insertions(+), 313 deletions(-) delete mode 100644 core/src/main/kotlin/ch/dissem/bitmessage/factory/BufferPool.kt diff --git a/build.gradle b/build.gradle index 7b25efd..b5258c9 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ buildscript { - ext.kotlin_version = '1.2.41' + ext.kotlin_version = '1.2.71' repositories { mavenCentral() } @@ -31,8 +31,8 @@ subprojects { jcenter() } dependencies { - compile "org.jetbrains.kotlin:kotlin-stdlib-jdk7" - compile "org.jetbrains.kotlin:kotlin-reflect" + implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7" + implementation "org.jetbrains.kotlin:kotlin-reflect" } test { @@ -139,21 +139,21 @@ subprojects { } dependency 'ch.dissem.msgpack:msgpack:2.0.1' - dependency 'org.bouncycastle:bcprov-jdk15on:1.59' + dependency 'org.bouncycastle:bcprov-jdk15on:1.60' dependency 'com.madgag.spongycastle:prov:1.58.0.0' - dependency 'org.apache.commons:commons-text:1.2' - dependency 'org.flywaydb:flyway-core:5.0.7' - dependency 'com.beust:klaxon:2.1.7' + dependency 'org.apache.commons:commons-text:1.5' + dependency 'org.flywaydb:flyway-core:5.2.0' + dependency 'com.beust:klaxon:3.0.8' dependency 'args4j:args4j:2.33' dependency 'org.ini4j:ini4j:0.5.4' - dependency 'com.h2database:h2:1.4.196' + dependency 'com.h2database:h2:1.4.197' dependency 'org.hamcrest:java-hamcrest:2.0.0.0' - dependency 'com.nhaarman:mockito-kotlin:1.5.0' + dependency 'com.nhaarman:mockito-kotlin:1.6.0' - dependency 'org.junit.jupiter:junit-jupiter-api:5.2.0' - dependency 'org.junit.jupiter:junit-jupiter-engine:5.2.0' + dependency 'org.junit.jupiter:junit-jupiter-api:5.3.1' + dependency 'org.junit.jupiter:junit-jupiter-engine:5.3.1' } } } diff --git a/core/src/main/kotlin/ch/dissem/bitmessage/entity/ObjectMessage.kt b/core/src/main/kotlin/ch/dissem/bitmessage/entity/ObjectMessage.kt index 3ff438c..b218d87 100644 --- a/core/src/main/kotlin/ch/dissem/bitmessage/entity/ObjectMessage.kt +++ b/core/src/main/kotlin/ch/dissem/bitmessage/entity/ObjectMessage.kt @@ -39,36 +39,26 @@ data class ObjectMessage( var nonce: ByteArray? = null, val expiresTime: Long, val payload: ObjectPayload, - val type: Long, + val type: Long = payload.type?.number ?: throw IllegalArgumentException("payload must have type defined"), /** * The object's version */ - val version: Long, - val stream: Long + val version: Long = payload.version, + val stream: Long = payload.stream ) : MessagePayload { override val command: MessagePayload.Command = MessagePayload.Command.OBJECT - constructor( - nonce: ByteArray? = null, - expiresTime: Long, - payload: ObjectPayload, - stream: Long - ) : this( - nonce, - expiresTime, - payload, - payload.type?.number ?: throw IllegalArgumentException("payload must have type defined"), - payload.version, - stream - ) - val inventoryVector: InventoryVector get() { - return InventoryVector(Bytes.truncate(cryptography().doubleSha512( - nonce ?: throw IllegalStateException("nonce must be set"), - payloadBytesWithoutNonce - ), 32)) + return InventoryVector( + Bytes.truncate( + cryptography().doubleSha512( + nonce ?: throw IllegalStateException("nonce must be set"), + payloadBytesWithoutNonce + ), 32 + ) + ) } private val isEncrypted: Boolean diff --git a/core/src/main/kotlin/ch/dissem/bitmessage/factory/BufferPool.kt b/core/src/main/kotlin/ch/dissem/bitmessage/factory/BufferPool.kt deleted file mode 100644 index 548ec2e..0000000 --- a/core/src/main/kotlin/ch/dissem/bitmessage/factory/BufferPool.kt +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright 2017 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.factory - -import ch.dissem.bitmessage.constants.Network.HEADER_SIZE -import ch.dissem.bitmessage.constants.Network.MAX_PAYLOAD_SIZE -import ch.dissem.bitmessage.exception.NodeException -import org.slf4j.LoggerFactory -import java.nio.ByteBuffer -import java.util.* -import kotlin.math.max - -/** - * A pool for [ByteBuffer]s. As they may use up a lot of memory, - * they should be reused as efficiently as possible. - */ -object BufferPool { - private val LOG = LoggerFactory.getLogger(BufferPool::class.java) - - private var limit: Int? = null - private var strictLimit = false - - /** - * Sets a limit to how many buffers the pool handles. If strict is set to true, it will not issue any - * buffers once the limit is reached and will throw a NodeException instead. Otherwise, it will simply - * ignore returned buffers once the limit is reached (and therefore garbage collected) - */ - fun setLimit(limit: Int, strict: Boolean = false) { - this.limit = limit - this.strictLimit = strict - pools.values.forEach { it.limit = limit } - pools[HEADER_SIZE]!!.limit = 2 * limit - pools[MAX_PAYLOAD_SIZE]!!.limit = max(limit / 2, 1) - } - - private val pools = mapOf( - HEADER_SIZE to Pool(), - 54 to Pool(), - 1000 to Pool(), - 60000 to Pool(), - MAX_PAYLOAD_SIZE to Pool() - ) - - @Synchronized - fun allocate(capacity: Int): ByteBuffer { - val targetSize = getTargetSize(capacity) - val pool = pools[targetSize] ?: throw IllegalStateException("No pool for size $targetSize available") - - return if (pool.isEmpty) { - if (pool.hasCapacity || !strictLimit) { - LOG.trace("Creating new buffer of size $targetSize") - ByteBuffer.allocate(targetSize) - } else { - throw NodeException("pool limit for capacity $capacity is reached") - } - } else { - pool.pop() - } - } - - /** - * Returns a buffer that has the size of the Bitmessage network message header, 24 bytes. - - * @return a buffer of size 24 - */ - @Synchronized - fun allocateHeaderBuffer(): ByteBuffer { - val pool = pools[HEADER_SIZE] ?: throw IllegalStateException("No pool for header available") - return if (pool.isEmpty) { - if (pool.hasCapacity || !strictLimit) { - LOG.trace("Creating new buffer of header") - ByteBuffer.allocate(HEADER_SIZE) - } else { - throw NodeException("pool limit for header buffer is reached") - } - } else { - pool.pop() - } - } - - @Synchronized - fun deallocate(buffer: ByteBuffer) { - buffer.clear() - val pool = pools[buffer.capacity()] - ?: throw IllegalArgumentException("Illegal buffer capacity ${buffer.capacity()} one of ${pools.keys} expected.") - pool.push(buffer) - } - - private fun getTargetSize(capacity: Int): Int { - for (size in pools.keys) { - if (size >= capacity) return size - } - throw IllegalArgumentException("Requested capacity too large: requested=$capacity; max=$MAX_PAYLOAD_SIZE") - } - - /** - * There is a race condition where the limit could be ignored for an allocation, but I think the consequences - * are benign. - */ - class Pool { - private val stack = Stack() - private var capacity = 0 - internal var limit: Int? = null - set(value) { - capacity = value ?: 0 - field = value - } - - val isEmpty - get() = stack.isEmpty() - - val hasCapacity - @Synchronized - get() = limit == null || capacity > 0 - - @Synchronized - fun pop(): ByteBuffer { - capacity-- - return stack.pop() - } - - @Synchronized - fun push(buffer: ByteBuffer) { - if (hasCapacity) { - stack.push(buffer) - } - // else, let it be collected by the garbage collector - capacity++ - } - } -} diff --git a/core/src/main/kotlin/ch/dissem/bitmessage/factory/V3MessageReader.kt b/core/src/main/kotlin/ch/dissem/bitmessage/factory/V3MessageReader.kt index ce152bf..80e0baf 100644 --- a/core/src/main/kotlin/ch/dissem/bitmessage/factory/V3MessageReader.kt +++ b/core/src/main/kotlin/ch/dissem/bitmessage/factory/V3MessageReader.kt @@ -30,8 +30,7 @@ import java.util.* * Similar to the [V3MessageFactory], but used for NIO buffers which may or may not contain a whole message. */ class V3MessageReader { - private var headerBuffer: ByteBuffer? = null - private var dataBuffer: ByteBuffer? = null + val buffer: ByteBuffer = ByteBuffer.allocate(MAX_PAYLOAD_SIZE) private var state: ReaderState? = ReaderState.MAGIC private var command: String? = null @@ -40,89 +39,83 @@ class V3MessageReader { private val messages = LinkedList() - fun getActiveBuffer(): ByteBuffer { - if (state != null && state != ReaderState.DATA) { - if (headerBuffer == null) { - headerBuffer = BufferPool.allocateHeaderBuffer() - } - } - return if (state == ReaderState.DATA) - dataBuffer ?: throw IllegalStateException("data buffer is null") - else - headerBuffer ?: throw IllegalStateException("header buffer is null") - } - fun update() { if (state != ReaderState.DATA) { - getActiveBuffer() // in order to initialize - headerBuffer?.flip() ?: throw IllegalStateException("header buffer is null") + buffer.flip() } - when (state) { - V3MessageReader.ReaderState.MAGIC -> magic(headerBuffer ?: throw IllegalStateException("header buffer is null")) - V3MessageReader.ReaderState.HEADER -> header(headerBuffer ?: throw IllegalStateException("header buffer is null")) - V3MessageReader.ReaderState.DATA -> data(dataBuffer ?: throw IllegalStateException("data buffer is null")) + var s = when (state) { + ReaderState.MAGIC -> magic() + ReaderState.HEADER -> header() + ReaderState.DATA -> data() + else -> ReaderState.WAIT_FOR_DATA + } + while (s != ReaderState.WAIT_FOR_DATA) { + s = when (state) { + ReaderState.MAGIC -> magic() + ReaderState.HEADER -> header() + ReaderState.DATA -> data(flip = false) + else -> ReaderState.WAIT_FOR_DATA + } } } - private fun magic(headerBuffer: ByteBuffer) { - if (!findMagicBytes(headerBuffer)) { - headerBuffer.compact() - return - } else { - state = ReaderState.HEADER - header(headerBuffer) - } + private fun magic(): ReaderState = if (!findMagicBytes(buffer)) { + buffer.compact() + ReaderState.WAIT_FOR_DATA + } else { + state = ReaderState.HEADER + ReaderState.HEADER } - private fun header(headerBuffer: ByteBuffer) { - if (headerBuffer.remaining() < 20) { - headerBuffer.compact() - headerBuffer.limit(20) - return + private fun header(): ReaderState { + if (buffer.remaining() < 20) { + buffer.compact() + return ReaderState.WAIT_FOR_DATA } - command = getCommand(headerBuffer) - length = Decode.uint32(headerBuffer).toInt() + command = getCommand(buffer) + length = Decode.uint32(buffer).toInt() if (length > MAX_PAYLOAD_SIZE) { - throw NodeException("Payload of " + length + " bytes received, no more than " + - MAX_PAYLOAD_SIZE + " was expected.") + throw NodeException( + "Payload of " + length + " bytes received, no more than " + + MAX_PAYLOAD_SIZE + " was expected." + ) } - headerBuffer.get(checksum) + buffer.get(checksum) state = ReaderState.DATA - this.headerBuffer = null - BufferPool.deallocate(headerBuffer) - this.dataBuffer = BufferPool.allocate(length).apply { - clear() - limit(length) - data(this) - } + return ReaderState.DATA } - private fun data(dataBuffer: ByteBuffer) { - if (dataBuffer.position() < length) { - return - } else { - dataBuffer.flip() + private fun data(flip: Boolean = true): ReaderState { + if (flip) { + if (buffer.position() < length) { + return ReaderState.WAIT_FOR_DATA + } else { + buffer.flip() + } + } else if (buffer.remaining() < length) { + buffer.compact() + return ReaderState.WAIT_FOR_DATA } - if (!testChecksum(dataBuffer)) { + if (!testChecksum(buffer)) { state = ReaderState.MAGIC - this.dataBuffer = null - BufferPool.deallocate(dataBuffer) + buffer.clear() throw NodeException("Checksum failed for message '$command'") } try { V3MessageFactory.getPayload( command ?: throw IllegalStateException("command is null"), - ByteArrayInputStream(dataBuffer.array(), - dataBuffer.arrayOffset() + dataBuffer.position(), length), + ByteArrayInputStream( + buffer.array(), + buffer.arrayOffset() + buffer.position(), length + ), length )?.let { messages.add(NetworkMessage(it)) } } catch (e: IOException) { throw NodeException(e.message) } finally { state = ReaderState.MAGIC - this.dataBuffer = null - BufferPool.deallocate(dataBuffer) } + return ReaderState.MAGIC } fun getMessages(): MutableList { @@ -163,8 +156,10 @@ class V3MessageReader { } private fun testChecksum(buffer: ByteBuffer): Boolean { - val payloadChecksum = cryptography().sha512(buffer.array(), - buffer.arrayOffset() + buffer.position(), length) + val payloadChecksum = cryptography().sha512( + buffer.array(), + buffer.arrayOffset() + buffer.position(), length + ) for (i in checksum.indices) { if (checksum[i] != payloadChecksum[i]) { return false @@ -173,17 +168,7 @@ class V3MessageReader { return true } - /** - * De-allocates all buffers. This method should be called iff the reader isn't used anymore, i.e. when its - * connection is severed. - */ - fun cleanup() { - state = null - headerBuffer?.let { BufferPool.deallocate(it) } - dataBuffer?.let { BufferPool.deallocate(it) } - } - private enum class ReaderState { - MAGIC, HEADER, DATA + MAGIC, HEADER, DATA, WAIT_FOR_DATA } } diff --git a/cryptography-sc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt b/cryptography-sc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt index 15a9313..8db8728 100644 --- a/cryptography-sc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt +++ b/cryptography-sc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt @@ -68,12 +68,12 @@ class CryptographyTest { @Test(expected = IOException::class) fun ensureExceptionForInsufficientProofOfWork() { - val objectMessage = ObjectMessage.Builder() - .nonce(ByteArray(8)) - .expiresTime(UnixTime.now + 28 * DAY) - .objectType(0) - .payload(GenericPayload.read(0, 1, ByteArrayInputStream(ByteArray(0)), 0)) - .build() + val objectMessage = ObjectMessage( + nonce = ByteArray(8), + expiresTime = UnixTime.now + 28 * DAY, + payload = GenericPayload.read(0, 1, ByteArrayInputStream(ByteArray(0)), 0), + type = 0 + ) crypto.checkProofOfWork(objectMessage, 1000, 1000) } @@ -86,10 +86,8 @@ class CryptographyTest { val objectMessage = ObjectMessage( nonce = ByteArray(8), expiresTime = UnixTime.now + 2 * MINUTE, - type = 0, payload = GenericPayload.read(0, 1, ByteArrayInputStream(ByteArray(0)), 0), - version = 0, - stream = 1 + type = 0 ) val waiter = CallbackWaiter() crypto.doProofOfWork(objectMessage, 1000, 1000, @@ -154,13 +152,19 @@ class CryptographyTest { companion object { val TEST_VALUE = "teststring".toByteArray() - val TEST_SHA1 = DatatypeConverter.parseHexBinary("" - + "b8473b86d4c2072ca9b08bd28e373e8253e865c4") - val TEST_SHA512 = DatatypeConverter.parseHexBinary("" - + "6253b39071e5df8b5098f59202d414c37a17d6a38a875ef5f8c7d89b0212b028" - + "692d3d2090ce03ae1de66c862fa8a561e57ed9eb7935ce627344f742c0931d72") - val TEST_RIPEMD160 = DatatypeConverter.parseHexBinary("" - + "cd566972b5e50104011a92b59fa8e0b1234851ae") + val TEST_SHA1 = DatatypeConverter.parseHexBinary( + "" + + "b8473b86d4c2072ca9b08bd28e373e8253e865c4" + ) + val TEST_SHA512 = DatatypeConverter.parseHexBinary( + "" + + "6253b39071e5df8b5098f59202d414c37a17d6a38a875ef5f8c7d89b0212b028" + + "692d3d2090ce03ae1de66c862fa8a561e57ed9eb7935ce627344f742c0931d72" + ) + val TEST_RIPEMD160 = DatatypeConverter.parseHexBinary( + "" + + "cd566972b5e50104011a92b59fa8e0b1234851ae" + ) private val crypto = SpongyCryptography() diff --git a/demo/build.gradle b/demo/build.gradle index 37eaa34..bb945c5 100644 --- a/demo/build.gradle +++ b/demo/build.gradle @@ -24,16 +24,16 @@ task fatCapsule(type: FatCapsule) { } dependencies { - compile project(':core') - compile project(':networking') - compile project(':repositories') - compile project(':cryptography-bc') - compile project(':wif') - compile 'org.slf4j:slf4j-simple' - compile 'args4j:args4j' - compile 'com.h2database:h2' - compile 'org.apache.commons:commons-text' - testCompile 'com.nhaarman:mockito-kotlin' - testCompile 'org.junit.jupiter:junit-jupiter-api' + implementation project(':core') + implementation project(':networking') + implementation project(':repositories') + implementation project(':cryptography-bc') + implementation project(':wif') + implementation 'org.slf4j:slf4j-simple' + implementation 'args4j:args4j' + implementation 'com.h2database:h2' + implementation 'org.apache.commons:commons-text' + testImplementation 'com.nhaarman:mockito-kotlin' + testImplementation 'org.junit.jupiter:junit-jupiter-api' testRuntime 'org.junit.jupiter:junit-jupiter-engine' } diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index 8ad5617..920d47e 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -115,6 +115,7 @@ public class Application { System.out.println(ctx.status()); System.out.println(); System.out.println("c) cleanup inventory"); + System.out.println("n) remove known nodes"); System.out.println("r) resend unacknowledged messages"); System.out.println(COMMAND_BACK); @@ -123,6 +124,9 @@ public class Application { case "c": ctx.cleanup(); break; + case "n": + ctx.internals().getNodeRegistry().cleanup(); + break; case "r": ctx.resendUnacknowledgedMessages(); break; diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java index db13951..e7ad2b2 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java @@ -63,6 +63,7 @@ public class Main { .inventory(new JdbcInventory(jdbcConfig)) .messageRepo(new JdbcMessageRepository(jdbcConfig)) .powRepo(new JdbcProofOfWorkRepository(jdbcConfig)) + .labelRepo(new JdbcLabelRepository(jdbcConfig)) .networkHandler(new NioNetworkHandler()) .cryptography(new BouncyCryptography()); ctxBuilder.getPreferences().setPort(48444); diff --git a/demo/src/test/java/ch/dissem/bitmessage/SystemTest.kt b/demo/src/test/java/ch/dissem/bitmessage/SystemTest.kt index e5ad40d..bb4f1e5 100644 --- a/demo/src/test/java/ch/dissem/bitmessage/SystemTest.kt +++ b/demo/src/test/java/ch/dissem/bitmessage/SystemTest.kt @@ -26,6 +26,7 @@ import ch.dissem.bitmessage.ports.Labeler import ch.dissem.bitmessage.repository.* import ch.dissem.bitmessage.utils.TTL import ch.dissem.bitmessage.utils.UnixTime.MINUTE +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.spy import com.nhaarman.mockito_kotlin.timeout import com.nhaarman.mockito_kotlin.verify @@ -34,7 +35,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTimeoutPreemptively import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.mockito.ArgumentMatchers.any import org.slf4j.LoggerFactory import java.time.Duration.ofMinutes import java.time.Duration.ofSeconds @@ -126,7 +126,7 @@ class SystemTest { verify( aliceLabeler, timeout(TimeUnit.MINUTES.toMillis(2)).atLeastOnce() - ).markAsAcknowledged(any()) + ).markAsAcknowledged(any()) } } @@ -145,7 +145,7 @@ class SystemTest { } } - internal class DebugLabeler internal constructor(private val name: String) : DefaultLabeler() { + internal open class DebugLabeler internal constructor(private val name: String) : DefaultLabeler() { private val LOG = LoggerFactory.getLogger("Labeler") private lateinit var alice: String private lateinit var bob: String diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/Connection.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/Connection.kt index 57040db..e5f70b3 100644 --- a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/Connection.kt +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/Connection.kt @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory import java.io.IOException import java.util.* import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit /** * Contains everything used by both the old streams-oriented NetworkHandler and the new NioNetworkHandler, @@ -165,11 +166,12 @@ class Connection( } } - // the TCP timeout starts out at 20 seconds + // According to the specification, the TCP timeout starts out at 20 seconds // after verack messages are exchanged, the timeout is raised to 10 minutes + // Let's tweak these numbers a bit: fun isExpired(): Boolean = when (state) { - State.CONNECTING -> io.lastUpdate < System.currentTimeMillis() - 20000 - State.ACTIVE -> io.lastUpdate < System.currentTimeMillis() - 600000 + State.CONNECTING -> io.lastUpdate < System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(9) + State.ACTIVE -> io.lastUpdate < System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(3) State.DISCONNECTED -> true } diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/ConnectionIO.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/ConnectionIO.kt index 63ca774..4e1fcab 100644 --- a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/ConnectionIO.kt +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/ConnectionIO.kt @@ -21,7 +21,6 @@ import ch.dissem.bitmessage.entity.GetData import ch.dissem.bitmessage.entity.MessagePayload import ch.dissem.bitmessage.entity.NetworkMessage import ch.dissem.bitmessage.entity.valueobject.InventoryVector -import ch.dissem.bitmessage.exception.NodeException import ch.dissem.bitmessage.factory.V3MessageReader import ch.dissem.bitmessage.utils.UnixTime import org.slf4j.LoggerFactory @@ -42,7 +41,7 @@ class ConnectionIO( ) { private val headerOut: ByteBuffer = ByteBuffer.allocate(HEADER_SIZE) private var payloadOut: ByteBuffer? = null - private var reader: V3MessageReader? = V3MessageReader() + private val reader = V3MessageReader() internal val sendingQueue: Deque<MessagePayload> = ConcurrentLinkedDeque<MessagePayload>() internal var lastUpdate = System.currentTimeMillis() @@ -55,8 +54,7 @@ class ConnectionIO( headerOut.flip() } - val inBuffer: ByteBuffer - get() = reader?.getActiveBuffer() ?: throw NodeException("Node is disconnected") + val inBuffer: ByteBuffer = reader.buffer fun updateWriter() { if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) { @@ -78,7 +76,7 @@ class ConnectionIO( } fun updateReader() { - reader?.let { reader -> + reader.let { reader -> reader.update() if (!reader.getMessages().isEmpty()) { val iterator = reader.getMessages().iterator() @@ -96,11 +94,11 @@ class ConnectionIO( fun updateSyncStatus() { if (!isSyncFinished) { - isSyncFinished = reader?.getMessages()?.isEmpty() ?: true && syncFinished(null) + isSyncFinished = reader.getMessages().isEmpty() && syncFinished(null) } } - protected fun syncFinished(msg: NetworkMessage?): Boolean { + private fun syncFinished(msg: NetworkMessage?): Boolean { if (mode != Connection.Mode.SYNC) { return false } @@ -127,10 +125,6 @@ class ConnectionIO( } fun disconnect() { - reader?.let { - it.cleanup() - reader = null - } payloadOut = null } @@ -151,7 +145,7 @@ class ConnectionIO( || headerOut.hasRemaining() || payloadOut?.hasRemaining() ?: false - fun nothingToSend() = sendingQueue.isEmpty() + private fun nothingToSend() = sendingQueue.isEmpty() companion object { val LOG = LoggerFactory.getLogger(ConnectionIO::class.java) diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NetworkConnectionInitializer.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NetworkConnectionInitializer.kt index 9fabcda..2cf9845 100644 --- a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NetworkConnectionInitializer.kt +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NetworkConnectionInitializer.kt @@ -77,12 +77,12 @@ class NetworkConnectionInitializer( activateConnection() } } else { - throw NodeException("Received unsupported version " + version.version + ", disconnecting.") + throw NodeException("Received unsupported version ${version.version}, disconnecting.") } } private fun activateConnection() { - LOG.info("Successfully established connection with node " + node) + LOG.info("Successfully established connection with node $node") markActive(version.streams) node.time = UnixTime.now if (mode != Connection.Mode.SYNC) { diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt index b483d9b..8a172d0 100644 --- a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt @@ -17,6 +17,7 @@ package ch.dissem.bitmessage.networking.nio import ch.dissem.bitmessage.InternalContext +import ch.dissem.bitmessage.constants.Network.HEADER_SIZE import ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER import ch.dissem.bitmessage.entity.CustomMessage import ch.dissem.bitmessage.entity.GetData @@ -24,7 +25,6 @@ import ch.dissem.bitmessage.entity.NetworkMessage import ch.dissem.bitmessage.entity.valueobject.InventoryVector import ch.dissem.bitmessage.entity.valueobject.NetworkAddress import ch.dissem.bitmessage.exception.NodeException -import ch.dissem.bitmessage.factory.BufferPool import ch.dissem.bitmessage.factory.V3MessageReader import ch.dissem.bitmessage.networking.nio.Connection.Mode.* import ch.dissem.bitmessage.ports.NetworkHandler @@ -94,29 +94,26 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB override fun send(server: InetAddress, port: Int, request: CustomMessage): CustomMessage { SocketChannel.open(InetSocketAddress(server, port)).use { channel -> channel.configureBlocking(true) - val headerBuffer = BufferPool.allocateHeaderBuffer() + val headerBuffer = ByteBuffer.allocate(HEADER_SIZE) val payloadBuffer = NetworkMessage(request).writer().writeHeaderAndGetPayloadBuffer(headerBuffer) headerBuffer.flip() while (headerBuffer.hasRemaining()) { channel.write(headerBuffer) } - BufferPool.deallocate(headerBuffer) while (payloadBuffer.hasRemaining()) { channel.write(payloadBuffer) } val reader = V3MessageReader() while (channel.isConnected && reader.getMessages().isEmpty()) { - if (channel.read(reader.getActiveBuffer()) > 0) { + if (channel.read(reader.buffer) > 0) { reader.update() } else { - reader.cleanup() throw NodeException("No response from node $server") } } val networkMessage: NetworkMessage? if (reader.getMessages().isEmpty()) { - reader.cleanup() throw NodeException("No response from node $server") } else { networkMessage = reader.getMessages().first() @@ -125,7 +122,6 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB if (networkMessage.payload is CustomMessage) { return networkMessage.payload as CustomMessage } else { - reader.cleanup() throw NodeException("Unexpected response from node $server: ${networkMessage.payload.javaClass}") } } @@ -196,14 +192,14 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB request(delayed) try { - Thread.sleep(30000) + Thread.sleep(10000) } catch (e: InterruptedException) { return@thread } } } - thread("selector worker", { + thread("selector worker") { try { val serverChannel = ServerSocketChannel.open() this.serverChannel = serverChannel @@ -272,10 +268,13 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB else -> key.interestOps(OP_READ) } } catch (e: CancelledKeyException) { + LOG.debug("${e.message}: ${connection.node}", e) connection.disconnect() } catch (e: NodeException) { + LOG.debug("${e.message}: ${connection.node}", e) connection.disconnect() } catch (e: IOException) { + LOG.debug("${e.message}: ${connection.node}", e) connection.disconnect() } } @@ -306,10 +305,11 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB channel.configureBlocking(false) channel.connect(InetSocketAddress(address.toInetAddress(), address.port)) val connection = Connection(ctx, CLIENT, address, requestedObjects, 0) - connections.put( - connection, - channel.register(selector, OP_CONNECT, connection) - ) + + connections[connection] = channel.register(selector, OP_CONNECT, connection) + + LOG.debug("Connection registered to $address") + } catch (ignore: NoRouteToHostException) { // We'll try to connect to many offline nodes, so // this is expected to happen quite a lot. @@ -321,7 +321,11 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB LOG.error(e.message, e) } } catch (e: IOException) { - LOG.error(e.message, e) + if (e.message == "Network is unreachable") { + LOG.debug("Network is unreachable: $address") + } else { + LOG.error("${e.message}: $address", e) + } } } @@ -335,7 +339,7 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB // isn't nice though. LOG.error(e.message, e) } - }) + } } private fun thread(threadName: String, runnable: () -> Unit): Thread { @@ -380,7 +384,7 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB val distribution = HashMap<Connection, MutableList<InventoryVector>>() for ((connection, _) in connections) { if (connection.state == Connection.State.ACTIVE) { - distribution.put(connection, mutableListOf<InventoryVector>()) + distribution[connection] = mutableListOf() } } if (distribution.isEmpty()) { @@ -455,7 +459,7 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB val outgoing = outgoingConnections[stream] ?: 0 streamProperties.add( Property( - "stream " + stream, Property("nodes", incoming + outgoing), + "stream $stream", Property("nodes", incoming + outgoing), Property("incoming", incoming), Property("outgoing", outgoing) ) @@ -476,8 +480,8 @@ class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMB companion object { private val LOG = LoggerFactory.getLogger(NioNetworkHandler::class.java) - private val REQUESTED_OBJECTS_MAX_TIME = (2 * 60000).toLong() // 2 minutes in ms - private val DELAYED = java.lang.Long.MIN_VALUE + private const val REQUESTED_OBJECTS_MAX_TIME = 2 * 60000L // 2 minutes in ms + private const val DELAYED = java.lang.Long.MIN_VALUE private fun write(channel: SocketChannel, connection: ConnectionIO) { writeBuffer(connection.outBuffers, channel)