diff --git a/build.gradle b/build.gradle index 02fa2b4..89325d4 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ buildscript { - ext.kotlin_version = '1.1.2-2' + ext.kotlin_version = '1.1.3' repositories { mavenCentral() } diff --git a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.kt b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.kt index f3ffa6c..ff3a58d 100644 --- a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.kt +++ b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.kt @@ -78,7 +78,7 @@ class BitmessageContext( port: Int = 8444, connectionTTL: Long = 30 * MINUTE, connectionLimit: Int = 150, - sendPubkeyOnIdentityCreation: Boolean, + sendPubkeyOnIdentityCreation: Boolean = true, doMissingProofOfWorkDelayInSeconds: Int = 30 ) { @@ -333,7 +333,7 @@ class BitmessageContext( fun status(): Property { return Property("status", - internals.networkHandler.networkStatus, + internals.networkHandler.getNetworkStatus(), Property("unacknowledged", internals.messageRepository.findMessagesToResend().size) ) } diff --git a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.kt b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.kt index 37c2a75..597f5a3 100644 --- a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.kt +++ b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.kt @@ -32,8 +32,12 @@ import java.util.* internal open class DefaultMessageListener( private val labeler: Labeler, private val listener: BitmessageContext.Listener -) : NetworkHandler.MessageListener { - private var ctx by InternalContext.lateinit +) : NetworkHandler.MessageListener, InternalContext.ContextHolder { + private lateinit var ctx: InternalContext + + override fun setContext(context: InternalContext) { + ctx = context + } override fun receive(objectMessage: ObjectMessage) { val payload = objectMessage.payload @@ -43,7 +47,7 @@ internal open class DefaultMessageListener( receive(objectMessage, payload as GetPubkey) } ObjectType.PUBKEY -> { - receive(objectMessage, payload as Pubkey) + receive(payload as Pubkey) } ObjectType.MSG -> { receive(objectMessage, payload as Msg) @@ -56,9 +60,6 @@ internal open class DefaultMessageListener( receive(payload) } } - else -> { - throw IllegalArgumentException("Unknown payload type " + payload.type!!) - } } } @@ -71,7 +72,7 @@ internal open class DefaultMessageListener( } } - protected fun receive(objectMessage: ObjectMessage, pubkey: Pubkey) { + protected fun receive(pubkey: Pubkey) { try { if (pubkey is V4Pubkey) { ctx.addressRepository.findContact(pubkey.tag)?.let { diff --git a/core/src/main/java/ch/dissem/bitmessage/InternalContext.kt b/core/src/main/java/ch/dissem/bitmessage/InternalContext.kt index 62bba56..a1c7f16 100644 --- a/core/src/main/java/ch/dissem/bitmessage/InternalContext.kt +++ b/core/src/main/java/ch/dissem/bitmessage/InternalContext.kt @@ -28,7 +28,6 @@ import ch.dissem.bitmessage.utils.UnixTime import org.slf4j.LoggerFactory import java.util.* import java.util.concurrent.Executors -import kotlin.reflect.KProperty /** * The internal context should normally only be used for port implementations. If you need it in your client @@ -67,8 +66,6 @@ class InternalContext( get() = _streams.toLongArray() init { - lateinit.instance = this - lateinit = ContextDelegate() Singleton.initialize(cryptography) // TODO: streams of new identities and subscriptions should also be added. This works only after a restart. @@ -218,19 +215,10 @@ class InternalContext( fun setContext(context: InternalContext) } - class ContextDelegate { - internal lateinit var instance: InternalContext - operator fun getValue(thisRef: Any?, property: KProperty<*>) = instance - operator fun setValue(thisRef: Any?, property: KProperty<*>, value: InternalContext) {} - } - companion object { private val LOG = LoggerFactory.getLogger(InternalContext::class.java) @JvmField val NETWORK_NONCE_TRIALS_PER_BYTE: Long = 1000 @JvmField val NETWORK_EXTRA_BYTES: Long = 1000 - - var lateinit = ContextDelegate() - private set } } diff --git a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.kt b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.kt index 67d6637..5d177a6 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.kt +++ b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.kt @@ -22,21 +22,23 @@ import ch.dissem.bitmessage.entity.* import ch.dissem.bitmessage.entity.payload.Msg import ch.dissem.bitmessage.ports.ProofOfWorkEngine import ch.dissem.bitmessage.ports.ProofOfWorkRepository.Item -import ch.dissem.bitmessage.utils.Strings import org.slf4j.LoggerFactory -import java.io.IOException import java.util.* /** * @author Christian Basler */ -class ProofOfWorkService : ProofOfWorkEngine.Callback { +class ProofOfWorkService : ProofOfWorkEngine.Callback, InternalContext.ContextHolder { - private val ctx by InternalContext.lateinit + private lateinit var ctx: InternalContext private val cryptography by lazy { ctx.cryptography } private val powRepo by lazy { ctx.proofOfWorkRepository } private val messageRepo by lazy { ctx.messageRepository } + override fun setContext(context: InternalContext) { + ctx = context + } + fun doMissingProofOfWork(delayInMilliseconds: Long) { val items = powRepo.getItems() if (items.isEmpty()) return diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.kt b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.kt index 01be7f6..30480a8 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.kt +++ b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.kt @@ -23,10 +23,28 @@ import ch.dissem.bitmessage.utils.UnixTime import java.io.OutputStream import java.net.InetAddress import java.net.InetSocketAddress +import java.net.Socket import java.net.SocketAddress import java.nio.ByteBuffer import java.util.* +fun ip6(inetAddress: InetAddress): ByteArray { + val address = inetAddress.address + when (address.size) { + 16 -> { + return address + } + 4 -> { + val ip6 = ByteArray(16) + ip6[10] = 0xff.toByte() + ip6[11] = 0xff.toByte() + System.arraycopy(address, 0, ip6, 12, 4) + return ip6 + } + else -> throw IllegalArgumentException("Weird address " + inetAddress) + } +} + /** * A node's address. It's written in IPv6 format. */ @@ -51,6 +69,9 @@ data class NetworkAddress( val port: Int ) : Streamable { + constructor(time: Long, stream: Long, services: Long = 1, socket: Socket) + : this(time, stream, services, ip6(socket.inetAddress), socket.port) + fun provides(service: Version.Service?): Boolean = service?.isEnabled(services) ?: false fun toInetAddress(): InetAddress { @@ -125,18 +146,7 @@ data class NetworkAddress( } fun ip(inetAddress: InetAddress): Builder { - val addr = inetAddress.address - if (addr.size == 16) { - this.ipv6 = addr - } else if (addr.size == 4) { - val ipv6 = ByteArray(16) - ipv6[10] = 0xff.toByte() - ipv6[11] = 0xff.toByte() - System.arraycopy(addr, 0, ipv6, 12, 4) - this.ipv6 = ipv6 - } else { - throw IllegalArgumentException("Weird address " + inetAddress) - } + ipv6 = ip6(inetAddress) return this } @@ -165,9 +175,8 @@ data class NetworkAddress( fun address(address: SocketAddress): Builder { if (address is InetSocketAddress) { - val inetAddress = address - ip(inetAddress.address) - port(inetAddress.port) + ip(address.address) + port(address.port) } else { throw IllegalArgumentException("Unknown type of address: " + address.javaClass) } @@ -180,4 +189,8 @@ data class NetworkAddress( ) } } + + companion object { + @JvmField val ANY = NetworkAddress(time = 0, stream = 0, services = 0, IPv6 = ByteArray(16), port = 0) + } } diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.kt b/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.kt index a301d37..8b1d031 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.kt +++ b/core/src/main/java/ch/dissem/bitmessage/factory/BufferPool.kt @@ -39,9 +39,9 @@ object BufferPool { @Synchronized fun allocate(capacity: Int): ByteBuffer { val targetSize = getTargetSize(capacity) - val pool = pools[targetSize] - if (pool == null || pool.isEmpty()) { - LOG.trace("Creating new buffer of size " + targetSize!!) + val pool = pools[targetSize] ?: throw IllegalStateException("No pool for size $targetSize available") + if (pool.isEmpty()) { + LOG.trace("Creating new buffer of size $targetSize") return ByteBuffer.allocate(targetSize) } else { return pool.pop() @@ -64,16 +64,14 @@ object BufferPool { @Synchronized fun deallocate(buffer: ByteBuffer) { buffer.clear() - val pool = pools[buffer.capacity()] - pool?.push(buffer) ?: throw IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() + - " one of " + pools.keys + " expected.") + val pool = pools[buffer.capacity()] ?: throw IllegalArgumentException("Illegal buffer capacity ${buffer.capacity()} one of ${pools.keys} expected.") + pool.push(buffer) } - private fun getTargetSize(capacity: Int): Int? { + private fun getTargetSize(capacity: Int): Int { for (size in pools.keys) { if (size >= capacity) return size } - throw IllegalArgumentException("Requested capacity too large: " + - "requested=" + capacity + "; max=" + MAX_PAYLOAD_SIZE) + throw IllegalArgumentException("Requested capacity too large: requested=$capacity; max=$MAX_PAYLOAD_SIZE") } } diff --git a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.kt b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.kt index 694e8c0..5b81621 100644 --- a/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.kt +++ b/core/src/main/java/ch/dissem/bitmessage/factory/V3MessageReader.kt @@ -40,23 +40,22 @@ class V3MessageReader { private val messages = LinkedList() - val activeBuffer: ByteBuffer - get() { - if (state != null && state != ReaderState.DATA) { - if (headerBuffer == null) { - headerBuffer = BufferPool.allocateHeaderBuffer() - } + fun getActiveBuffer(): ByteBuffer { + if (state != null && state != ReaderState.DATA) { + if (headerBuffer == null) { + headerBuffer = BufferPool.allocateHeaderBuffer() } - return if (state == ReaderState.DATA) - dataBuffer ?: throw IllegalStateException("data buffer is null") - else - headerBuffer ?: throw IllegalStateException("header buffer is null") } + return if (state == ReaderState.DATA) + dataBuffer ?: throw IllegalStateException("data buffer is null") + else + headerBuffer ?: throw IllegalStateException("header buffer is null") + } fun update() { if (state != ReaderState.DATA) { - activeBuffer - headerBuffer!!.flip() + getActiveBuffer() // in order to initialize + headerBuffer?.flip() ?: throw IllegalStateException("header buffer is null") } when (state) { V3MessageReader.ReaderState.MAGIC -> magic(headerBuffer ?: throw IllegalStateException("header buffer is null")) @@ -93,12 +92,12 @@ class V3MessageReader { BufferPool.deallocate(headerBuffer) val dataBuffer = BufferPool.allocate(length) this.dataBuffer = dataBuffer + dataBuffer.clear() + dataBuffer.limit(length) data(dataBuffer) } private fun data(dataBuffer: ByteBuffer) { - dataBuffer.clear() - dataBuffer.limit(length) if (dataBuffer.position() < length) { return } else { @@ -126,7 +125,7 @@ class V3MessageReader { } } - fun getMessages(): List { + fun getMessages(): MutableList { return messages } diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.kt b/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.kt index e30f943..b655bd1 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.kt +++ b/core/src/main/java/ch/dissem/bitmessage/ports/AbstractCryptography.kt @@ -36,13 +36,17 @@ import javax.crypto.spec.SecretKeySpec /** * Implements everything that isn't directly dependent on either Spongy- or Bouncycastle. */ -abstract class AbstractCryptography protected constructor(@JvmField protected val provider: Provider) : Cryptography { - private val context by InternalContext.lateinit +abstract class AbstractCryptography protected constructor(@JvmField protected val provider: Provider) : Cryptography, InternalContext.ContextHolder { + private lateinit var ctx: InternalContext @JvmField protected val ALGORITHM_ECDSA = "ECDSA" @JvmField protected val ALGORITHM_ECDSA_SHA1 = "SHA1withECDSA" @JvmField protected val ALGORITHM_EVP_SHA256 = "SHA256withECDSA" + override fun setContext(context: InternalContext) { + ctx = context + } + override fun sha512(data: ByteArray, offset: Int, length: Int): ByteArray { val mda = md("SHA-512") mda.update(data, offset, length) @@ -95,7 +99,7 @@ abstract class AbstractCryptography protected constructor(@JvmField protected va val target = getProofOfWorkTarget(objectMessage, max(nonceTrialsPerByte, NETWORK_NONCE_TRIALS_PER_BYTE), max(extraBytes, NETWORK_EXTRA_BYTES)) - context.proofOfWorkEngine.calculateNonce(initialHash, target, callback) + ctx.proofOfWorkEngine.calculateNonce(initialHash, target, callback) } @Throws(InsufficientProofOfWorkException::class) diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/AbstractMessageRepository.kt b/core/src/main/java/ch/dissem/bitmessage/ports/AbstractMessageRepository.kt index c951ad2..b6d2108 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/AbstractMessageRepository.kt +++ b/core/src/main/java/ch/dissem/bitmessage/ports/AbstractMessageRepository.kt @@ -27,8 +27,12 @@ import ch.dissem.bitmessage.utils.Strings import ch.dissem.bitmessage.utils.UnixTime import java.util.* -abstract class AbstractMessageRepository : MessageRepository { - protected var ctx by InternalContext.lateinit +abstract class AbstractMessageRepository : MessageRepository, InternalContext.ContextHolder { + protected lateinit var ctx: InternalContext + + override fun setContext(context: InternalContext) { + ctx = context + } protected fun saveContactIfNecessary(contact: BitmessageAddress?) { contact?.let { diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/DefaultLabeler.kt b/core/src/main/java/ch/dissem/bitmessage/ports/DefaultLabeler.kt index 4df9048..cc55db9 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/DefaultLabeler.kt +++ b/core/src/main/java/ch/dissem/bitmessage/ports/DefaultLabeler.kt @@ -22,8 +22,12 @@ import ch.dissem.bitmessage.entity.Plaintext.Status.* import ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST import ch.dissem.bitmessage.entity.valueobject.Label -open class DefaultLabeler : Labeler { - private var ctx by InternalContext.lateinit +open class DefaultLabeler : Labeler, InternalContext.ContextHolder { + private lateinit var ctx: InternalContext + + override fun setContext(context: InternalContext) { + ctx = context + } override fun setLabels(msg: Plaintext) { msg.status = RECEIVED diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.kt b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.kt index de7f24c..6425fe3 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.kt +++ b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.kt @@ -73,9 +73,9 @@ interface NetworkHandler { * @param inventoryVectors of the objects to be requested */ - fun request(inventoryVectors: Collection) + fun request(inventoryVectors: MutableCollection) - val networkStatus: Property + fun getNetworkStatus(): Property val isRunning: Boolean diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistry.kt b/core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistry.kt index 59e3567..922370b 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistry.kt +++ b/core/src/main/java/ch/dissem/bitmessage/ports/NodeRegistry.kt @@ -30,5 +30,5 @@ interface NodeRegistry { fun getKnownAddresses(limit: Int, vararg streams: Long): List - fun offerAddresses(addresses: List) + fun offerAddresses(nodes: List) } diff --git a/cryptography-bc/src/main/java/ch/dissem/bitmessage/cryptography/bc/BouncyCryptography.kt b/cryptography-bc/src/main/kotlin/ch/dissem/bitmessage/cryptography/bc/BouncyCryptography.kt similarity index 100% rename from cryptography-bc/src/main/java/ch/dissem/bitmessage/cryptography/bc/BouncyCryptography.kt rename to cryptography-bc/src/main/kotlin/ch/dissem/bitmessage/cryptography/bc/BouncyCryptography.kt diff --git a/cryptography-bc/src/test/java/ch/dissem/bitmessage/security/CryptographyTest.kt b/cryptography-bc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt similarity index 100% rename from cryptography-bc/src/test/java/ch/dissem/bitmessage/security/CryptographyTest.kt rename to cryptography-bc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt diff --git a/cryptography-sc/src/main/java/ch/dissem/bitmessage/cryptography/sc/SpongyCryptography.kt b/cryptography-sc/src/main/kotlin/ch/dissem/bitmessage/cryptography/sc/SpongyCryptography.kt similarity index 100% rename from cryptography-sc/src/main/java/ch/dissem/bitmessage/cryptography/sc/SpongyCryptography.kt rename to cryptography-sc/src/main/kotlin/ch/dissem/bitmessage/cryptography/sc/SpongyCryptography.kt diff --git a/cryptography-sc/src/test/java/ch/dissem/bitmessage/security/CryptographyTest.kt b/cryptography-sc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt similarity index 100% rename from cryptography-sc/src/test/java/ch/dissem/bitmessage/security/CryptographyTest.kt rename to cryptography-sc/src/test/kotlin/ch/dissem/bitmessage/security/CryptographyTest.kt diff --git a/networking/build.gradle b/networking/build.gradle index b01eb40..6ec5920 100644 --- a/networking/build.gradle +++ b/networking/build.gradle @@ -14,7 +14,7 @@ dependencies { compile project(':core') testCompile 'junit:junit:4.12' testCompile 'org.slf4j:slf4j-simple:1.7.25' - testCompile 'org.mockito:mockito-core:2.7.21' + testCompile 'com.nhaarman:mockito-kotlin:1.4.0' testCompile project(path: ':core', configuration: 'testArtifacts') testCompile project(':cryptography-bc') } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java deleted file mode 100644 index 8bcd2b0..0000000 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking; - -import ch.dissem.bitmessage.BitmessageContext; -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.*; -import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException; -import ch.dissem.bitmessage.exception.NodeException; -import ch.dissem.bitmessage.ports.NetworkHandler; -import ch.dissem.bitmessage.utils.UnixTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; - -import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES; -import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; -import static ch.dissem.bitmessage.networking.AbstractConnection.State.*; -import static ch.dissem.bitmessage.utils.Singleton.cryptography; -import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; - -/** - * Contains everything used by both the old streams-oriented NetworkHandler and the new NioNetworkHandler, - * respectively their connection objects. - */ -public abstract class AbstractConnection { - private static final Logger LOG = LoggerFactory.getLogger(AbstractConnection.class); - protected final InternalContext ctx; - protected final Mode mode; - protected final NetworkAddress host; - protected final NetworkAddress node; - protected final NetworkHandler.MessageListener listener; - protected final Map ivCache; - protected final Deque sendingQueue; - protected final Map commonRequestedObjects; - protected final Set requestedObjects; - - protected volatile State state; - protected long lastObjectTime; - - private final long syncTimeout; - private long syncReadTimeout = Long.MAX_VALUE; - - protected long peerNonce; - protected int version; - protected long[] streams; - private boolean verackSent; - private boolean verackReceived; - - public AbstractConnection(InternalContext context, Mode mode, - NetworkAddress node, - Map commonRequestedObjects, - long syncTimeout) { - this.ctx = context; - this.mode = mode; - this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); - this.node = node; - this.listener = context.getNetworkListener(); - this.syncTimeout = (syncTimeout > 0 ? UnixTime.now() + syncTimeout : 0); - this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap(10_000)); - this.ivCache = new ConcurrentHashMap<>(); - this.sendingQueue = new ConcurrentLinkedDeque<>(); - this.state = CONNECTING; - this.commonRequestedObjects = commonRequestedObjects; - } - - public Mode getMode() { - return mode; - } - - public NetworkAddress getNode() { - return node; - } - - public State getState() { - return state; - } - - public long[] getStreams() { - return streams; - } - - protected void handleMessage(MessagePayload payload) { - switch (state) { - case ACTIVE: - receiveMessage(payload); - break; - - case DISCONNECTED: - break; - - default: - handleCommand(payload); - break; - } - } - - private void receiveMessage(MessagePayload messagePayload) { - switch (messagePayload.getCommand()) { - case INV: - receiveMessage((Inv) messagePayload); - break; - case GETDATA: - receiveMessage((GetData) messagePayload); - break; - case OBJECT: - receiveMessage((ObjectMessage) messagePayload); - break; - case ADDR: - receiveMessage((Addr) messagePayload); - break; - case CUSTOM: - case VERACK: - case VERSION: - default: - throw new IllegalStateException("Unexpectedly received '" + messagePayload.getCommand() + "' command"); - } - } - - private void receiveMessage(Inv inv) { - int originalSize = inv.getInventory().size(); - updateIvCache(inv.getInventory()); - List missing = ctx.getInventory().getMissing(inv.getInventory(), streams); - missing.removeAll(commonRequestedObjects.keySet()); - LOG.trace("Received inventory with " + originalSize + " elements, of which are " - + missing.size() + " missing."); - send(new GetData(missing)); - } - - private void receiveMessage(GetData getData) { - for (InventoryVector iv : getData.getInventory()) { - ObjectMessage om = ctx.getInventory().getObject(iv); - if (om != null) sendingQueue.offer(om); - } - } - - private void receiveMessage(ObjectMessage objectMessage) { - requestedObjects.remove(objectMessage.getInventoryVector()); - if (ctx.getInventory().contains(objectMessage)) { - LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); - return; - } - try { - listener.receive(objectMessage); - cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES); - ctx.getInventory().storeObject(objectMessage); - // offer object to some random nodes so it gets distributed throughout the network: - ctx.getNetworkHandler().offer(objectMessage.getInventoryVector()); - lastObjectTime = UnixTime.now(); - } catch (InsufficientProofOfWorkException e) { - LOG.warn(e.getMessage()); - // DebugUtils.saveToFile(objectMessage); // this line must not be committed active - } catch (IOException e) { - LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); - } finally { - if (commonRequestedObjects.remove(objectMessage.getInventoryVector()) == null) { - LOG.debug("Received object that wasn't requested."); - } - } - } - - private void receiveMessage(Addr addr) { - LOG.trace("Received " + addr.getAddresses().size() + " addresses."); - ctx.getNodeRegistry().offerAddresses(addr.getAddresses()); - } - - private void updateIvCache(List inventory) { - cleanupIvCache(); - Long now = UnixTime.now(); - for (InventoryVector iv : inventory) { - ivCache.put(iv, now); - } - } - - public void offer(InventoryVector iv) { - sendingQueue.offer(new Inv(Collections.singletonList(iv))); - updateIvCache(Collections.singletonList(iv)); - } - - public boolean knowsOf(InventoryVector iv) { - return ivCache.containsKey(iv); - } - - public boolean requested(InventoryVector iv) { - return requestedObjects.contains(iv); - } - - private void cleanupIvCache() { - long fiveMinutesAgo = UnixTime.now() - 5 * MINUTE; - for (Map.Entry entry : ivCache.entrySet()) { - if (entry.getValue() < fiveMinutesAgo) { - ivCache.remove(entry.getKey()); - } - } - } - - private void handleCommand(MessagePayload payload) { - switch (payload.getCommand()) { - case VERSION: - handleVersion((Version) payload); - break; - case VERACK: - if (verackSent) { - activateConnection(); - } - verackReceived = true; - break; - case CUSTOM: - MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload); - if (response == null) { - disconnect(); - } else { - send(response); - } - break; - default: - throw new NodeException("Command 'version' or 'verack' expected, but was '" - + payload.getCommand() + "'"); - } - } - - private void activateConnection() { - LOG.info("Successfully established connection with node " + node); - state = ACTIVE; - node.setTime(UnixTime.now()); - if (mode != SYNC) { - sendAddresses(); - ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node)); - } - sendInventory(); - } - - private void sendAddresses() { - List addresses = ctx.getNodeRegistry().getKnownAddresses(1000, streams); - sendingQueue.offer(new Addr(addresses)); - } - - private void sendInventory() { - List inventory = ctx.getInventory().getInventory(streams); - for (int i = 0; i < inventory.size(); i += 50000) { - sendingQueue.offer(new Inv(inventory.subList(i, Math.min(inventory.size(), i + 50000)))); - } - } - - private void handleVersion(Version version) { - if (version.getNonce() == ctx.getClientNonce()) { - LOG.info("Tried to connect to self, disconnecting."); - disconnect(); - } else if (version.getVersion() >= BitmessageContext.CURRENT_VERSION) { - this.peerNonce = version.getNonce(); - if (peerNonce == ctx.getClientNonce()) disconnect(); - - this.version = version.getVersion(); - this.streams = version.getStreams(); - verackSent = true; - send(new VerAck()); - if (mode == SERVER) { - send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build()); - } - if (verackReceived) { - activateConnection(); - } - } else { - LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting."); - disconnect(); - } - } - - @SuppressWarnings("RedundantIfStatement") - protected boolean syncFinished(NetworkMessage msg) { - if (mode != SYNC) { - return false; - } - if (Thread.interrupted()) { - return true; - } - if (state != ACTIVE) { - return false; - } - if (syncTimeout < UnixTime.now()) { - LOG.info("Synchronization timed out"); - return true; - } - if (!sendingQueue.isEmpty()) { - syncReadTimeout = System.currentTimeMillis() + 1000; - return false; - } - if (msg == null) { - return syncReadTimeout < System.currentTimeMillis(); - } else { - syncReadTimeout = System.currentTimeMillis() + 1000; - return false; - } - } - - public void disconnect() { - state = DISCONNECTED; - - // Make sure objects that are still missing are requested from other nodes - ctx.getNetworkHandler().request(requestedObjects); - } - - protected abstract void send(MessagePayload payload); - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AbstractConnection that = (AbstractConnection) o; - return Objects.equals(node, that.node); - } - - @Override - public int hashCode() { - return Objects.hash(node); - } - - public enum Mode {SERVER, CLIENT, SYNC} - - public enum State {CONNECTING, ACTIVE, DISCONNECTED} -} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java deleted file mode 100644 index 4dcf29c..0000000 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking; - -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.GetData; -import ch.dissem.bitmessage.entity.MessagePayload; -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.Version; -import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener; -import ch.dissem.bitmessage.utils.UnixTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; -import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; -import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED; -import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; - -/** - * A connection to a specific node - */ -class Connection extends AbstractConnection { - public static final int READ_TIMEOUT = 2000; - private static final Logger LOG = LoggerFactory.getLogger(Connection.class); - private static final int CONNECT_TIMEOUT = 5000; - - private final long startTime; - private final Socket socket; - private final ReaderRunnable reader = new ReaderRunnable(); - private final WriterRunnable writer = new WriterRunnable(); - - private InputStream in; - private OutputStream out; - private boolean socketInitialized; - - public Connection(InternalContext context, Mode mode, Socket socket, - Map requestedObjectsMap) throws IOException { - this(context, mode, socket, requestedObjectsMap, - new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), - 0); - } - - public Connection(InternalContext context, Mode mode, NetworkAddress node, - Map requestedObjectsMap) { - this(context, mode, new Socket(), requestedObjectsMap, - node, 0); - } - - private Connection(InternalContext context, Mode mode, Socket socket, - Map commonRequestedObjects, NetworkAddress node, long syncTimeout) { - super(context, mode, node, commonRequestedObjects, syncTimeout); - this.startTime = UnixTime.now(); - this.socket = socket; - } - - public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, - long timeoutInSeconds) throws IOException { - return new Connection(ctx, SYNC, new Socket(address, port), - new HashMap(), - new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), - timeoutInSeconds); - } - - public long getStartTime() { - return startTime; - } - - public Mode getMode() { - return mode; - } - - public State getState() { - return state; - } - - public NetworkAddress getNode() { - return node; - } - - @Override - protected void send(MessagePayload payload) { - try { - if (payload instanceof GetData) { - requestedObjects.addAll(((GetData) payload).getInventory()); - } - synchronized (this) { - new NetworkMessage(payload).write(out); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - disconnect(); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Connection that = (Connection) o; - return Objects.equals(node, that.node); - } - - @Override - public int hashCode() { - return Objects.hash(node); - } - - private synchronized void initSocket(Socket socket) throws IOException { - if (!socketInitialized) { - if (!socket.isConnected()) { - LOG.trace("Trying to connect to node " + node); - socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); - } - socket.setSoTimeout(READ_TIMEOUT); - in = socket.getInputStream(); - out = socket.getOutputStream(); - socketInitialized = true; - } - } - - public ReaderRunnable getReader() { - return reader; - } - - public WriterRunnable getWriter() { - return writer; - } - - public class ReaderRunnable implements Runnable { - @Override - public void run() { - try (Socket socket = Connection.this.socket) { - initSocket(socket); - if (mode == CLIENT || mode == SYNC) { - send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build()); - } - while (state != DISCONNECTED) { - if (mode != SYNC) { - if (state == ACTIVE && requestedObjects.isEmpty() && sendingQueue.isEmpty()) { - Thread.sleep(1000); - } else { - Thread.sleep(100); - } - } - receive(); - } - } catch (Exception e) { - LOG.trace("Reader disconnected from node " + node + ": " + e.getMessage()); - } finally { - disconnect(); - try { - socket.close(); - } catch (Exception e) { - LOG.debug(e.getMessage(), e); - } - } - } - - private void receive() throws InterruptedException { - try { - NetworkMessage msg = Factory.getNetworkMessage(version, in); - if (msg == null) - return; - handleMessage(msg.getPayload()); - if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect(); - } catch (SocketTimeoutException ignore) { - if (state == ACTIVE && syncFinished(null)) disconnect(); - } - } - - } - - private boolean checkOpenRequests() { - return !requestedObjects.isEmpty() && lastObjectTime > 0 && (UnixTime.now() - lastObjectTime) > 2 * MINUTE; - } - - public class WriterRunnable implements Runnable { - @Override - public void run() { - try (Socket socket = Connection.this.socket) { - initSocket(socket); - while (state != DISCONNECTED) { - if (sendingQueue.isEmpty()) { - Thread.sleep(1000); - } else { - send(sendingQueue.poll()); - } - } - } catch (IOException | InterruptedException e) { - LOG.trace("Writer disconnected from node " + node + ": " + e.getMessage()); - disconnect(); - } - } - } -} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java deleted file mode 100644 index 7e0c96e..0000000 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking; - -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.utils.UnixTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.List; - -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; -import static ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER; - -/** - * @author Christian Basler - */ -@Deprecated -@SuppressWarnings("deprecation") -public class ConnectionOrganizer implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class); - - private final InternalContext ctx; - private final DefaultNetworkHandler networkHandler; - - private Connection initialConnection; - - public ConnectionOrganizer(InternalContext ctx, - DefaultNetworkHandler networkHandler) { - this.ctx = ctx; - this.networkHandler = networkHandler; - } - - @Override - public void run() { - try { - while (networkHandler.isRunning()) { - try { - int active = 0; - long now = UnixTime.now(); - - int diff = networkHandler.connections.size() - ctx.getConnectionLimit(); - if (diff > 0) { - for (Connection c : networkHandler.connections) { - c.disconnect(); - diff--; - if (diff == 0) break; - } - } - boolean forcedDisconnect = false; - for (Iterator iterator = networkHandler.connections.iterator(); iterator.hasNext(); ) { - Connection c = iterator.next(); - // Just in case they were all created at the same time, don't disconnect - // all at once. - if (!forcedDisconnect && now - c.getStartTime() > ctx.getConnectionTTL()) { - c.disconnect(); - forcedDisconnect = true; - } - switch (c.getState()) { - case DISCONNECTED: - iterator.remove(); - break; - case ACTIVE: - active++; - break; - default: - // nothing to do - } - } - - if (active < NETWORK_MAGIC_NUMBER) { - List addresses = ctx.getNodeRegistry().getKnownAddresses( - NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); - boolean first = active == 0 && initialConnection == null; - for (NetworkAddress address : addresses) { - Connection c = new Connection(ctx, CLIENT, address, networkHandler.requestedObjects); - if (first) { - initialConnection = c; - first = false; - } - networkHandler.startConnection(c); - } - Thread.sleep(10000); - } else if (initialConnection == null) { - Thread.sleep(30000); - } else { - initialConnection.disconnect(); - initialConnection = null; - Thread.sleep(10000); - } - } catch (InterruptedException e) { - networkHandler.stop(); - } catch (Exception e) { - LOG.error("Error in connection manager. Ignored.", e); - } - } - } finally { - LOG.debug("Connection manager shutting down."); - networkHandler.stop(); - } - } -} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java deleted file mode 100644 index b3e51e8..0000000 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking; - -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.InternalContext.ContextHolder; -import ch.dissem.bitmessage.entity.CustomMessage; -import ch.dissem.bitmessage.entity.GetData; -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.exception.ApplicationException; -import ch.dissem.bitmessage.exception.NodeException; -import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.NetworkHandler; -import ch.dissem.bitmessage.utils.Collections; -import ch.dissem.bitmessage.utils.Property; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -import java.util.*; -import java.util.concurrent.*; - -import static ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; -import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; -import static ch.dissem.bitmessage.utils.DebugUtils.inc; -import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; - -/** - * Handles all the networky stuff. - * - * @deprecated use {@link ch.dissem.bitmessage.networking.nio.NioNetworkHandler NioNetworkHandler} instead. - */ -@Deprecated -public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { - - final Collection connections = new ConcurrentLinkedQueue<>(); - private final ExecutorService pool = Executors.newCachedThreadPool( - pool("network") - .lowPrio() - .daemon() - .build()); - private InternalContext ctx; - private ServerRunnable server; - private volatile boolean running; - - final Map requestedObjects = new ConcurrentHashMap<>(50_000); - - @Override - public void setContext(InternalContext context) { - this.ctx = context; - } - - @Override - public Future synchronize(InetAddress server, int port, long timeoutInSeconds) { - try { - Connection connection = Connection.sync(ctx, server, port, ctx.getNetworkListener(), timeoutInSeconds); - Future reader = pool.submit(connection.getReader()); - pool.execute(connection.getWriter()); - return reader; - } catch (IOException e) { - throw new ApplicationException(e); - } - } - - @Override - public CustomMessage send(InetAddress server, int port, CustomMessage request) { - try (Socket socket = new Socket(server, port)) { - socket.setSoTimeout(Connection.READ_TIMEOUT); - new NetworkMessage(request).write(socket.getOutputStream()); - NetworkMessage networkMessage = Factory.getNetworkMessage(3, socket.getInputStream()); - if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) { - return (CustomMessage) networkMessage.getPayload(); - } else { - if (networkMessage == null) { - throw new NodeException("No response from node " + server); - } else { - throw new NodeException("Unexpected response from node " + - server + ": " + networkMessage.getPayload().getCommand()); - } - } - } catch (IOException e) { - throw new NodeException(e.getMessage(), e); - } - } - - @Override - public void start() { - if (running) { - throw new IllegalStateException("Network already running - you need to stop first."); - } - try { - running = true; - connections.clear(); - server = new ServerRunnable(ctx, this); - pool.execute(server); - pool.execute(new ConnectionOrganizer(ctx, this)); - } catch (IOException e) { - throw new ApplicationException(e); - } - } - - @Override - public boolean isRunning() { - return running; - } - - @Override - public void stop() { - server.close(); - synchronized (connections) { - running = false; - for (Connection c : connections) { - c.disconnect(); - } - } - requestedObjects.clear(); - } - - void startConnection(Connection c) { - if (!running) return; - - synchronized (connections) { - if (!running) return; - - // prevent connecting twice to the same node - if (connections.contains(c)) { - return; - } - connections.add(c); - } - pool.execute(c.getReader()); - pool.execute(c.getWriter()); - } - - @Override - public void offer(final InventoryVector iv) { - List target = new LinkedList<>(); - for (Connection connection : connections) { - if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { - target.add(connection); - } - } - List randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target); - for (Connection connection : randomSubset) { - connection.offer(iv); - } - } - - @Override - public Property getNetworkStatus() { - TreeSet streams = new TreeSet<>(); - TreeMap incomingConnections = new TreeMap<>(); - TreeMap outgoingConnections = new TreeMap<>(); - - for (Connection connection : connections) { - if (connection.getState() == ACTIVE) { - for (long stream : connection.getStreams()) { - streams.add(stream); - if (connection.getMode() == SERVER) { - inc(incomingConnections, stream); - } else { - inc(outgoingConnections, stream); - } - } - } - } - Property[] streamProperties = new Property[streams.size()]; - int i = 0; - for (Long stream : streams) { - int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0; - int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0; - streamProperties[i] = new Property("stream " + stream, - null, new Property("nodes", incoming + outgoing), - new Property("incoming", incoming), - new Property("outgoing", outgoing) - ); - i++; - } - return new Property("network", null, - new Property("connectionManager", running ? "running" : "stopped"), - new Property("connections", streamProperties), - new Property("requestedObjects", requestedObjects.size()) - ); - } - - @Override - public void request(Collection inventoryVectors) { - if (!running || inventoryVectors.isEmpty()) return; - - Map> distribution = new HashMap<>(); - for (Connection connection : connections) { - if (connection.getState() == ACTIVE) { - distribution.put(connection, new LinkedList()); - } - } - Iterator iterator = inventoryVectors.iterator(); - if (!iterator.hasNext()) { - return; - } - InventoryVector next = iterator.next(); - Connection previous = null; - do { - for (Connection connection : distribution.keySet()) { - if (connection == previous) { - next = iterator.next(); - } - if (connection.knowsOf(next)) { - List ivs = distribution.get(connection); - if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { - connection.send(new GetData(ivs)); - ivs.clear(); - } - ivs.add(next); - iterator.remove(); - - if (iterator.hasNext()) { - next = iterator.next(); - previous = connection; - } else { - break; - } - } - } - } while (iterator.hasNext()); - - for (Connection connection : distribution.keySet()) { - List ivs = distribution.get(connection); - if (!ivs.isEmpty()) { - connection.send(new GetData(ivs)); - } - } - } -} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java deleted file mode 100644 index eca4d5d..0000000 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking; - -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.ports.NetworkHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; - -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; - -/** - * @author Christian Basler - */ -@Deprecated -public class ServerRunnable implements Runnable, Closeable { - private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class); - private final InternalContext ctx; - private final ServerSocket serverSocket; - private final DefaultNetworkHandler networkHandler; - - public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler) throws IOException { - this.ctx = ctx; - this.networkHandler = networkHandler; - this.serverSocket = new ServerSocket(ctx.getPort()); - } - - @Override - public void run() { - while (!serverSocket.isClosed()) { - try { - Socket socket = serverSocket.accept(); - socket.setSoTimeout(Connection.READ_TIMEOUT); - networkHandler.startConnection(new Connection(ctx, SERVER, socket, networkHandler.requestedObjects)); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); - } - } - } - - @Override - public void close() { - try { - serverSocket.close(); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); - } - } -} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java deleted file mode 100644 index 012b43f..0000000 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking.nio; - -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.GetData; -import ch.dissem.bitmessage.entity.MessagePayload; -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.Version; -import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.NodeException; -import ch.dissem.bitmessage.factory.V3MessageReader; -import ch.dissem.bitmessage.networking.AbstractConnection; -import ch.dissem.bitmessage.utils.UnixTime; - -import java.nio.ByteBuffer; -import java.util.*; - -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; - -/** - * Represents the current state of a connection. - */ -public class ConnectionInfo extends AbstractConnection { - private final ByteBuffer headerOut = ByteBuffer.allocate(24); - private ByteBuffer payloadOut; - private V3MessageReader reader = new V3MessageReader(); - private boolean syncFinished; - private long lastUpdate = System.currentTimeMillis(); - - public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, - Map commonRequestedObjects, long syncTimeout) { - super(context, mode, node, commonRequestedObjects, syncTimeout); - headerOut.flip(); - if (mode == CLIENT || mode == SYNC) { - send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build()); - } - } - - public State getState() { - return state; - } - - public boolean knowsOf(InventoryVector iv) { - return ivCache.containsKey(iv); - } - - public Queue getSendingQueue() { - return sendingQueue; - } - - public ByteBuffer getInBuffer() { - if (reader == null) { - throw new NodeException("Node is disconnected"); - } - return reader.getActiveBuffer(); - } - - public void updateWriter() { - if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) { - headerOut.clear(); - MessagePayload payload = sendingQueue.poll(); - payloadOut = new NetworkMessage(payload).writeHeaderAndGetPayloadBuffer(headerOut); - headerOut.flip(); - lastUpdate = System.currentTimeMillis(); - } - } - - public ByteBuffer[] getOutBuffers() { - return new ByteBuffer[]{headerOut, payloadOut}; - } - - public void cleanupBuffers() { - if (payloadOut != null && !payloadOut.hasRemaining()) { - payloadOut = null; - } - } - - public void updateReader() { - reader.update(); - if (!reader.getMessages().isEmpty()) { - Iterator iterator = reader.getMessages().iterator(); - NetworkMessage msg = null; - while (iterator.hasNext()) { - msg = iterator.next(); - handleMessage(msg.getPayload()); - iterator.remove(); - } - syncFinished = syncFinished(msg); - } - lastUpdate = System.currentTimeMillis(); - } - - public void updateSyncStatus() { - if (!syncFinished) { - syncFinished = (reader == null || reader.getMessages().isEmpty()) && syncFinished(null); - } - } - - public boolean isExpired() { - switch (state) { - case CONNECTING: - // the TCP timeout starts out at 20 seconds - return lastUpdate < System.currentTimeMillis() - 20_000; - case ACTIVE: - // after verack messages are exchanged, the timeout is raised to 10 minutes - return lastUpdate < System.currentTimeMillis() - 600_000; - case DISCONNECTED: - return true; - default: - throw new IllegalStateException("Unknown state: " + state); - } - } - - @Override - public void disconnect() { - super.disconnect(); - if (reader != null) { - reader.cleanup(); - reader = null; - } - payloadOut = null; - } - - public boolean isSyncFinished() { - return syncFinished; - } - - @Override - protected void send(MessagePayload payload) { - sendingQueue.add(payload); - if (payload instanceof GetData) { - Long now = UnixTime.now(); - List inventory = ((GetData) payload).getInventory(); - requestedObjects.addAll(inventory); - for (InventoryVector iv : inventory) { - commonRequestedObjects.put(iv, now); - } - } - } - - public boolean isWritePending() { - return !sendingQueue.isEmpty() - || headerOut != null && headerOut.hasRemaining() - || payloadOut != null && payloadOut.hasRemaining(); - } -} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java deleted file mode 100644 index 40e79d0..0000000 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking.nio; - -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.CustomMessage; -import ch.dissem.bitmessage.entity.GetData; -import ch.dissem.bitmessage.entity.NetworkMessage; -import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.ApplicationException; -import ch.dissem.bitmessage.exception.NodeException; -import ch.dissem.bitmessage.factory.V3MessageReader; -import ch.dissem.bitmessage.ports.NetworkHandler; -import ch.dissem.bitmessage.utils.DebugUtils; -import ch.dissem.bitmessage.utils.Property; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.NoRouteToHostException; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.*; - -import static ch.dissem.bitmessage.constants.Network.HEADER_SIZE; -import static ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; -import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; -import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; -import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED; -import static ch.dissem.bitmessage.utils.Collections.selectRandom; -import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; -import static java.nio.channels.SelectionKey.*; - -/** - * Network handler using java.nio, resulting in less threads. - */ -public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder { - private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class); - private static final long REQUESTED_OBJECTS_MAX_TIME = 2 * 60_000; // 2 minutes - private static final Long DELAYED = Long.MIN_VALUE; - - private final ExecutorService threadPool = Executors.newCachedThreadPool( - pool("network") - .lowPrio() - .daemon() - .build()); - - private InternalContext ctx; - private Selector selector; - private ServerSocketChannel serverChannel; - private Queue connectionQueue = new ConcurrentLinkedQueue<>(); - private Map connections = new ConcurrentHashMap<>(); - private final Map requestedObjects = new ConcurrentHashMap<>(10_000); - - private Thread starter; - - @NotNull - @Override - public Future synchronize(@NotNull final InetAddress server, final int port, final long timeoutInSeconds) { - return threadPool.submit(new Callable() { - @Override - public Void call() throws Exception { - try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { - channel.configureBlocking(false); - ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, - new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), - new HashMap(), timeoutInSeconds); - while (channel.isConnected() && !connection.isSyncFinished()) { - write(channel, connection); - read(channel, connection); - Thread.sleep(10); - } - LOG.info("Synchronization finished"); - } - return null; - } - }); - } - - @NotNull - @Override - public CustomMessage send(@NotNull InetAddress server, int port, @NotNull CustomMessage request) { - try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { - channel.configureBlocking(true); - ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE); - ByteBuffer payloadBuffer = new NetworkMessage(request).writeHeaderAndGetPayloadBuffer(headerBuffer); - headerBuffer.flip(); - while (headerBuffer.hasRemaining()) { - channel.write(headerBuffer); - } - while (payloadBuffer.hasRemaining()) { - channel.write(payloadBuffer); - } - - V3MessageReader reader = new V3MessageReader(); - while (channel.isConnected() && reader.getMessages().isEmpty()) { - if (channel.read(reader.getActiveBuffer()) > 0) { - reader.update(); - } else { - throw new NodeException("No response from node " + server); - } - } - NetworkMessage networkMessage; - if (reader.getMessages().isEmpty()) { - throw new NodeException("No response from node " + server); - } else { - networkMessage = reader.getMessages().get(0); - } - - if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) { - return (CustomMessage) networkMessage.getPayload(); - } else { - if (networkMessage == null) { - throw new NodeException("Empty response from node " + server); - } else { - throw new NodeException("Unexpected response from node " + server + ": " - + networkMessage.getPayload().getClass()); - } - } - } catch (IOException e) { - throw new ApplicationException(e); - } - } - - @Override - public void start() { - if (selector != null && selector.isOpen()) { - throw new IllegalStateException("Network already running - you need to stop first."); - } - try { - selector = Selector.open(); - } catch (IOException e) { - throw new ApplicationException(e); - } - requestedObjects.clear(); - - starter = thread("connection manager", new Runnable() { - @Override - public void run() { - while (selector.isOpen()) { - int missing = NETWORK_MAGIC_NUMBER; - for (ConnectionInfo connectionInfo : connections.keySet()) { - if (connectionInfo.getState() == ACTIVE) { - missing--; - if (missing == 0) break; - } - } - if (missing > 0) { - List addresses = ctx.getNodeRegistry().getKnownAddresses(100, ctx.getStreams()); - addresses = selectRandom(missing, addresses); - for (NetworkAddress address : addresses) { - if (!isConnectedTo(address)) { - connectionQueue.offer(address); - } - } - } - - Iterator> it = connections.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry e = it.next(); - if (!e.getValue().isValid() || e.getKey().isExpired()) { - try { - e.getValue().channel().close(); - } catch (Exception ignore) { - } - e.getValue().cancel(); - e.getValue().attach(null); - e.getKey().disconnect(); - it.remove(); - } - } - - // The list 'requested objects' helps to prevent downloading an object - // twice. From time to time there is an error though, and an object is - // never downloaded. To prevent a large list of failed objects and give - // them a chance to get downloaded again, we will attempt to download an - // object from another node after some time out. - long timedOut = System.currentTimeMillis() - REQUESTED_OBJECTS_MAX_TIME; - List delayed = new LinkedList<>(); - Iterator> iterator = requestedObjects.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry e = iterator.next(); - //noinspection NumberEquality - if (e.getValue() == DELAYED) { - iterator.remove(); - } else if (e.getValue() < timedOut) { - delayed.add(e.getKey()); - e.setValue(DELAYED); - } - } - request(delayed); - - try { - Thread.sleep(30_000); - } catch (InterruptedException e) { - return; - } - } - } - }); - - thread("selector worker", new Runnable() { - @Override - public void run() { - try { - serverChannel = ServerSocketChannel.open(); - serverChannel.configureBlocking(false); - serverChannel.socket().bind(new InetSocketAddress(ctx.getPort())); - serverChannel.register(selector, OP_ACCEPT, null); - - while (selector.isOpen()) { - selector.select(1000); - Iterator keyIterator = selector.selectedKeys().iterator(); - while (keyIterator.hasNext()) { - SelectionKey key = keyIterator.next(); - keyIterator.remove(); - if (key.attachment() == null) { - try { - if (key.isAcceptable()) { - // handle accept - try { - SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept(); - accepted.configureBlocking(false); - ConnectionInfo connection = new ConnectionInfo(ctx, SERVER, - new NetworkAddress.Builder() - .ip(accepted.socket().getInetAddress()) - .port(accepted.socket().getPort()) - .stream(1) - .build(), - requestedObjects, 0 - ); - connections.put( - connection, - accepted.register(selector, OP_READ | OP_WRITE, connection) - ); - } catch (AsynchronousCloseException e) { - LOG.trace(e.getMessage()); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - } catch (CancelledKeyException e) { - LOG.debug(e.getMessage(), e); - } - } else { - // handle read/write - SocketChannel channel = (SocketChannel) key.channel(); - ConnectionInfo connection = (ConnectionInfo) key.attachment(); - try { - if (key.isConnectable()) { - if (!channel.finishConnect()) { - continue; - } - } - if (key.isWritable()) { - write(channel, connection); - } - if (key.isReadable()) { - read(channel, connection); - } - if (connection.getState() == DISCONNECTED) { - key.interestOps(0); - channel.close(); - } else if (connection.isWritePending()) { - key.interestOps(OP_READ | OP_WRITE); - } else { - key.interestOps(OP_READ); - } - } catch (CancelledKeyException | NodeException | IOException e) { - connection.disconnect(); - } - } - } - // set interest ops - for (Map.Entry e : connections.entrySet()) { - try { - if (e.getValue().isValid() - && (e.getValue().interestOps() & OP_WRITE) == 0 - && (e.getValue().interestOps() & OP_CONNECT) == 0 - && !e.getKey().getSendingQueue().isEmpty()) { - e.getValue().interestOps(OP_READ | OP_WRITE); - } - } catch (CancelledKeyException x) { - e.getKey().disconnect(); - } - } - // start new connections - if (!connectionQueue.isEmpty()) { - NetworkAddress address = connectionQueue.poll(); - try { - SocketChannel channel = SocketChannel.open(); - channel.configureBlocking(false); - channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort())); - ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, - address, - requestedObjects, 0 - ); - connections.put( - connection, - channel.register(selector, OP_CONNECT, connection) - ); - } catch (NoRouteToHostException ignore) { - // We'll try to connect to many offline nodes, so - // this is expected to happen quite a lot. - } catch (AsynchronousCloseException e) { - // The exception is expected if the network is being - // shut down, as we actually do asynchronously close - // the connections. - if (isRunning()) { - LOG.error(e.getMessage(), e); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - } - selector.close(); - } catch (ClosedSelectorException ignore) { - } catch (IOException e) { - throw new ApplicationException(e); - } - } - }); - } - - private static void write(SocketChannel channel, ConnectionInfo connection) - throws IOException { - writeBuffer(connection.getOutBuffers(), channel); - - connection.updateWriter(); - - writeBuffer(connection.getOutBuffers(), channel); - connection.cleanupBuffers(); - } - - private static void writeBuffer(ByteBuffer[] buffers, SocketChannel channel) throws IOException { - if (buffers[1] == null) { - if (buffers[0].hasRemaining()) { - channel.write(buffers[0]); - } - } else if (buffers[1].hasRemaining() || buffers[0].hasRemaining()) { - channel.write(buffers); - } - } - - private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException { - if (channel.read(connection.getInBuffer()) > 0) { - connection.updateReader(); - } - connection.updateSyncStatus(); - } - - private Thread thread(String threadName, Runnable runnable) { - Thread thread = new Thread(runnable, threadName); - thread.setDaemon(true); - thread.setPriority(Thread.MIN_PRIORITY); - thread.start(); - return thread; - } - - @Override - public void stop() { - try { - serverChannel.socket().close(); - selector.close(); - for (SelectionKey selectionKey : connections.values()) { - selectionKey.channel().close(); - } - } catch (IOException e) { - throw new ApplicationException(e); - } - } - - @Override - public void offer(@NotNull InventoryVector iv) { - List target = new LinkedList<>(); - for (ConnectionInfo connection : connections.keySet()) { - if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { - target.add(connection); - } - } - List randomSubset = selectRandom(NETWORK_MAGIC_NUMBER, target); - for (ConnectionInfo connection : randomSubset) { - connection.offer(iv); - } - } - - @Override - public void request(@NotNull Collection inventoryVectors) { - if (!isRunning()) { - requestedObjects.clear(); - return; - } - Iterator iterator = inventoryVectors.iterator(); - if (!iterator.hasNext()) { - return; - } - - Map> distribution = new HashMap<>(); - for (ConnectionInfo connection : connections.keySet()) { - if (connection.getState() == ACTIVE) { - distribution.put(connection, new LinkedList()); - } - } - if (distribution.isEmpty()) { - return; - } - InventoryVector next = iterator.next(); - ConnectionInfo previous = null; - do { - for (ConnectionInfo connection : distribution.keySet()) { - if (connection == previous || previous == null) { - if (iterator.hasNext()) { - previous = connection; - next = iterator.next(); - } else { - break; - } - } - if (connection.knowsOf(next) && !connection.requested(next)) { - List ivs = distribution.get(connection); - if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { - connection.send(new GetData(ivs)); - ivs.clear(); - } - ivs.add(next); - iterator.remove(); - - if (iterator.hasNext()) { - next = iterator.next(); - previous = connection; - } else { - break; - } - } - } - } while (iterator.hasNext()); - - // remove objects nobody knows of - for (InventoryVector iv : inventoryVectors) { - requestedObjects.remove(iv); - } - - for (ConnectionInfo connection : distribution.keySet()) { - List ivs = distribution.get(connection); - if (!ivs.isEmpty()) { - connection.send(new GetData(ivs)); - } - } - } - - @NotNull - @Override - public Property getNetworkStatus() { - TreeSet streams = new TreeSet<>(); - TreeMap incomingConnections = new TreeMap<>(); - TreeMap outgoingConnections = new TreeMap<>(); - - for (ConnectionInfo connection : connections.keySet()) { - if (connection.getState() == ACTIVE) { - for (long stream : connection.getStreams()) { - streams.add(stream); - if (connection.getMode() == SERVER) { - DebugUtils.inc(incomingConnections, stream); - } else { - DebugUtils.inc(outgoingConnections, stream); - } - } - } - } - Property[] streamProperties = new Property[streams.size()]; - int i = 0; - for (Long stream : streams) { - int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0; - int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0; - streamProperties[i] = new Property("stream " + stream, - null, new Property("nodes", incoming + outgoing), - new Property("incoming", incoming), - new Property("outgoing", outgoing) - ); - i++; - } - return new Property("network", null, - new Property("connectionManager", isRunning() ? "running" : "stopped"), - new Property("connections", streamProperties), - new Property("requestedObjects", requestedObjects.size()) - ); - } - - private boolean isConnectedTo(NetworkAddress address) { - for (ConnectionInfo c : connections.keySet()) { - if (c.getNode().equals(address)) { - return true; - } - } - return false; - } - - @Override - public boolean isRunning() { - return selector != null && selector.isOpen() && starter.isAlive(); - } - - @Override - public void setContext(@NotNull InternalContext context) { - this.ctx = context; - } -} diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/Connection.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/Connection.kt new file mode 100644 index 0000000..03552b6 --- /dev/null +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/Connection.kt @@ -0,0 +1,202 @@ +/* + * Copyright 2017 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking.nio + +import ch.dissem.bitmessage.InternalContext +import ch.dissem.bitmessage.InternalContext.Companion.NETWORK_EXTRA_BYTES +import ch.dissem.bitmessage.InternalContext.Companion.NETWORK_NONCE_TRIALS_PER_BYTE +import ch.dissem.bitmessage.entity.* +import ch.dissem.bitmessage.entity.valueobject.InventoryVector +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress +import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException +import ch.dissem.bitmessage.ports.NetworkHandler +import ch.dissem.bitmessage.utils.Singleton.cryptography +import ch.dissem.bitmessage.utils.UnixTime +import ch.dissem.bitmessage.utils.UnixTime.MINUTE +import org.slf4j.LoggerFactory +import java.io.IOException +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +/** + * Contains everything used by both the old streams-oriented NetworkHandler and the new NioNetworkHandler, + * respectively their connection objects. + */ +class Connection( + private val ctx: InternalContext, + val mode: Mode, + val node: NetworkAddress, + private val commonRequestedObjects: MutableMap, + syncTimeout: Long +) { + private val requestedObjects: MutableSet = Collections.newSetFromMap(ConcurrentHashMap(10000)) + + internal val io = ConnectionIO(mode, syncTimeout, commonRequestedObjects, requestedObjects, { state }, this::handleMessage) + private var initializer: NetworkConnectionInitializer? = NetworkConnectionInitializer(ctx, node, mode, io::send) { s -> + state = State.ACTIVE + streams = s + initializer = null + } + + private val listener: NetworkHandler.MessageListener = ctx.networkListener + private val ivCache: MutableMap = ConcurrentHashMap() + + private var lastObjectTime: Long = 0 + + lateinit var streams: LongArray + protected set + + @Volatile var state = State.CONNECTING + private set + + val isSyncFinished + get() = io.isSyncFinished + + val nothingToSend + get() = io.sendingQueue.isEmpty() + + init { + initializer!!.start() + } + + fun send(payload: MessagePayload) = io.send(payload) + + protected fun handleMessage(payload: MessagePayload) { + when (state) { + State.CONNECTING -> initializer!!.handleCommand(payload) + State.ACTIVE -> receiveMessage(payload) + State.DISCONNECTED -> disconnect() + } + } + + private fun receiveMessage(messagePayload: MessagePayload) { + when (messagePayload.command) { + MessagePayload.Command.INV -> receiveMessage(messagePayload as Inv) + MessagePayload.Command.GETDATA -> receiveMessage(messagePayload as GetData) + MessagePayload.Command.OBJECT -> receiveMessage(messagePayload as ObjectMessage) + MessagePayload.Command.ADDR -> receiveMessage(messagePayload as Addr) + else -> throw IllegalStateException("Unexpectedly received '${messagePayload.command}' command") + } + } + + private fun receiveMessage(inv: Inv) { + val originalSize = inv.inventory.size + updateIvCache(inv.inventory) + val missing = ctx.inventory.getMissing(inv.inventory, *streams) + LOG.trace("Received inventory with $originalSize elements, of which are ${missing.size} missing.") + io.send(GetData(missing - commonRequestedObjects.keys)) + } + + private fun receiveMessage(getData: GetData) { + getData.inventory.forEach { iv -> ctx.inventory.getObject(iv)?.let { obj -> io.send(obj) } } + } + + private fun receiveMessage(objectMessage: ObjectMessage) { + requestedObjects.remove(objectMessage.inventoryVector) + if (ctx.inventory.contains(objectMessage)) { + LOG.trace("Received object " + objectMessage.inventoryVector + " - already in inventory") + return + } + try { + listener.receive(objectMessage) + cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES) + ctx.inventory.storeObject(objectMessage) + // offer object to some random nodes so it gets distributed throughout the network: + ctx.networkHandler.offer(objectMessage.inventoryVector) + lastObjectTime = UnixTime.now + } catch (e: InsufficientProofOfWorkException) { + LOG.warn(e.message) + // DebugUtils.saveToFile(objectMessage); // this line must not be committed active + } catch (e: IOException) { + LOG.error("Stream " + objectMessage.stream + ", object type " + objectMessage.type + ": " + e.message, e) + } finally { + if (commonRequestedObjects.remove(objectMessage.inventoryVector) == null) { + LOG.debug("Received object that wasn't requested.") + } + } + } + + private fun receiveMessage(addr: Addr) { + LOG.trace("Received " + addr.addresses.size + " addresses.") + ctx.nodeRegistry.offerAddresses(addr.addresses) + } + + private fun updateIvCache(inventory: List) { + cleanupIvCache() + val now = UnixTime.now + for (iv in inventory) { + ivCache.put(iv, now) + } + } + + fun offer(iv: InventoryVector) { + io.send(Inv(listOf(iv))) + updateIvCache(listOf(iv)) + } + + fun knowsOf(iv: InventoryVector): Boolean { + return ivCache.containsKey(iv) + } + + fun requested(iv: InventoryVector): Boolean { + return requestedObjects.contains(iv) + } + + private fun cleanupIvCache() { + val fiveMinutesAgo = UnixTime.now - 5 * MINUTE + for ((key, value) in ivCache) { + if (value < fiveMinutesAgo) { + ivCache.remove(key) + } + } + } + + // the TCP timeout starts out at 20 seconds + // after verack messages are exchanged, the timeout is raised to 10 minutes + fun isExpired(): Boolean = when (state) { + State.CONNECTING -> io.lastUpdate < System.currentTimeMillis() - 20000 + State.ACTIVE -> io.lastUpdate < System.currentTimeMillis() - 600000 + State.DISCONNECTED -> true + } + + fun disconnect() { + state = State.DISCONNECTED + io.disconnect() + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is Connection) return false + return node == other.node + } + + override fun hashCode(): Int { + return Objects.hash(node) + } + + enum class State { + CONNECTING, ACTIVE, DISCONNECTED + } + + enum class Mode { + SERVER, CLIENT, SYNC + } + + companion object { + private val LOG = LoggerFactory.getLogger(Connection::class.java) + } +} diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/ConnectionIO.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/ConnectionIO.kt new file mode 100644 index 0000000..ec926e3 --- /dev/null +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/ConnectionIO.kt @@ -0,0 +1,158 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking.nio + +import ch.dissem.bitmessage.entity.GetData +import ch.dissem.bitmessage.entity.MessagePayload +import ch.dissem.bitmessage.entity.NetworkMessage +import ch.dissem.bitmessage.entity.valueobject.InventoryVector +import ch.dissem.bitmessage.exception.NodeException +import ch.dissem.bitmessage.factory.V3MessageReader +import ch.dissem.bitmessage.utils.UnixTime +import org.slf4j.LoggerFactory +import java.nio.ByteBuffer +import java.util.* +import java.util.concurrent.ConcurrentLinkedDeque + +/** + * Represents the current state of a connection. + */ +class ConnectionIO( + private val mode: Connection.Mode, + syncTimeout: Long, + private val commonRequestedObjects: MutableMap, + private val requestedObjects: MutableSet, + private val getState: () -> Connection.State, + private val handleMessage: (MessagePayload) -> Unit +) { + private val headerOut: ByteBuffer = ByteBuffer.allocate(24) + private var payloadOut: ByteBuffer? = null + private var reader: V3MessageReader? = V3MessageReader() + internal val sendingQueue: Deque = ConcurrentLinkedDeque() + + internal var lastUpdate = System.currentTimeMillis() + private set + + private val syncTimeout: Long = if (syncTimeout > 0) UnixTime.now + syncTimeout else 0 + private var syncReadTimeout = java.lang.Long.MAX_VALUE + + init { + headerOut.flip() + } + + val inBuffer: ByteBuffer + get() = reader?.getActiveBuffer() ?: throw NodeException("Node is disconnected") + + fun updateWriter() { + if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) { + headerOut.clear() + val payload = sendingQueue.poll() + payloadOut = NetworkMessage(payload).writeHeaderAndGetPayloadBuffer(headerOut) + headerOut.flip() + lastUpdate = System.currentTimeMillis() + } + } + + val outBuffers: Array + get() = payloadOut?.let { arrayOf(headerOut, it) } ?: arrayOf(headerOut) + + fun cleanupBuffers() { + payloadOut?.let { + if (!it.hasRemaining()) payloadOut = null + } + } + + fun updateReader() { + reader?.let { reader -> + reader.update() + if (!reader.getMessages().isEmpty()) { + val iterator = reader.getMessages().iterator() + var msg: NetworkMessage? = null + while (iterator.hasNext()) { + msg = iterator.next() + handleMessage(msg.payload) + iterator.remove() + } + isSyncFinished = syncFinished(msg) + } + lastUpdate = System.currentTimeMillis() + } + } + + fun updateSyncStatus() { + if (!isSyncFinished) { + isSyncFinished = reader?.getMessages()?.isEmpty() ?: true && syncFinished(null) + } + } + + protected fun syncFinished(msg: NetworkMessage?): Boolean { + if (mode != Connection.Mode.SYNC) { + return false + } + if (Thread.interrupted() || getState() == Connection.State.DISCONNECTED) { + return true + } + if (getState() == Connection.State.CONNECTING) { + return false + } + if (syncTimeout < UnixTime.now) { + LOG.info("Synchronization timed out") + return true + } + if (!nothingToSend()) { + syncReadTimeout = System.currentTimeMillis() + 1000 + return false + } + if (msg == null) { + return syncReadTimeout < System.currentTimeMillis() + } else { + syncReadTimeout = System.currentTimeMillis() + 1000 + return false + } + } + + fun disconnect() { + reader?.let { + it.cleanup() + reader = null + } + payloadOut = null + } + + fun send(payload: MessagePayload) { + sendingQueue.add(payload) + if (payload is GetData) { + val now = UnixTime.now + val inventory = payload.inventory + requestedObjects.addAll(inventory) + inventory.forEach { iv -> commonRequestedObjects.put(iv, now) } + } + } + + var isSyncFinished = false + + val isWritePending: Boolean + get() = !sendingQueue.isEmpty() + || headerOut.hasRemaining() + || payloadOut?.hasRemaining() ?: false + + fun nothingToSend() = sendingQueue.isEmpty() + + companion object { + val LOG = LoggerFactory.getLogger(ConnectionIO::class.java) + } +} diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NetworkConnectionInitializer.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NetworkConnectionInitializer.kt new file mode 100644 index 0000000..4fd6d7c --- /dev/null +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NetworkConnectionInitializer.kt @@ -0,0 +1,113 @@ +/* + * Copyright 2017 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking.nio + +import ch.dissem.bitmessage.BitmessageContext +import ch.dissem.bitmessage.InternalContext +import ch.dissem.bitmessage.entity.* +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress +import ch.dissem.bitmessage.exception.NodeException +import ch.dissem.bitmessage.utils.UnixTime +import org.slf4j.LoggerFactory + +/** + * Handles the initialization phase of connection and, due to their design, custom commands. + */ +class NetworkConnectionInitializer( + private val ctx: InternalContext, + val node: NetworkAddress, + val mode: Connection.Mode, + val send: (MessagePayload) -> Unit, + val markActive: (LongArray) -> Unit +) { + private lateinit var version: Version + + private var verackSent: Boolean = false + private var verackReceived: Boolean = false + + fun start() { + if (mode == Connection.Mode.CLIENT || mode == Connection.Mode.SYNC) { + send(Version(nonce = ctx.clientNonce, addrFrom = NetworkAddress.ANY, addrRecv = node)) + } + } + + fun handleCommand(payload: MessagePayload) { + when (payload.command) { + MessagePayload.Command.VERSION -> handleVersion(payload as Version) + MessagePayload.Command.VERACK -> { + if (verackSent) { + activateConnection() + } + verackReceived = true + } + MessagePayload.Command.CUSTOM -> { + ctx.customCommandHandler.handle(payload as CustomMessage)?.let { response -> + send(response) + } ?: throw NodeException("No response for custom command available") + } + else -> throw NodeException("Command 'version' or 'verack' expected, but was '${payload.command}'") + } + } + + private fun handleVersion(version: Version) { + if (version.nonce == ctx.clientNonce) { + throw NodeException("Tried to connect to self, disconnecting.") + } else if (version.version >= BitmessageContext.CURRENT_VERSION) { + this.version = version + verackSent = true + send(VerAck()) + if (mode == Connection.Mode.SERVER) { + send(Version.Builder().defaults(ctx.clientNonce).addrFrom(NetworkAddress.ANY).addrRecv(node).build()) + } + if (verackReceived) { + activateConnection() + } + } else { + throw NodeException("Received unsupported version " + version.version + ", disconnecting.") + } + } + + private fun activateConnection() { + LOG.info("Successfully established connection with node " + node) + markActive(version.streams) + node.time = UnixTime.now + if (mode != Connection.Mode.SYNC) { + sendAddresses() + ctx.nodeRegistry.offerAddresses(listOf(node)) + } + sendInventory() + } + + + private fun sendAddresses() { + val addresses = ctx.nodeRegistry.getKnownAddresses(1000, *version.streams) + send(Addr(addresses)) + } + + private fun sendInventory() { + val inventory = ctx.inventory.getInventory(*version.streams) + var i = 0 + while (i < inventory.size) { + send(Inv(inventory.subList(i, Math.min(inventory.size, i + 50000)))) + i += 50000 + } + } + + companion object { + val LOG = LoggerFactory.getLogger(NetworkConnectionInitializer::class.java)!! + } +} diff --git a/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt new file mode 100644 index 0000000..8900b49 --- /dev/null +++ b/networking/src/main/kotlin/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.kt @@ -0,0 +1,476 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking.nio + +import ch.dissem.bitmessage.InternalContext +import ch.dissem.bitmessage.constants.Network.HEADER_SIZE +import ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER +import ch.dissem.bitmessage.entity.CustomMessage +import ch.dissem.bitmessage.entity.GetData +import ch.dissem.bitmessage.entity.NetworkMessage +import ch.dissem.bitmessage.entity.valueobject.InventoryVector +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress +import ch.dissem.bitmessage.exception.NodeException +import ch.dissem.bitmessage.factory.V3MessageReader +import ch.dissem.bitmessage.networking.nio.Connection.Mode.* +import ch.dissem.bitmessage.ports.NetworkHandler +import ch.dissem.bitmessage.utils.Collections.selectRandom +import ch.dissem.bitmessage.utils.DebugUtils +import ch.dissem.bitmessage.utils.Property +import ch.dissem.bitmessage.utils.ThreadFactoryBuilder.Companion.pool +import ch.dissem.bitmessage.utils.UnixTime.now +import org.slf4j.LoggerFactory +import java.io.IOException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.NoRouteToHostException +import java.nio.ByteBuffer +import java.nio.channels.* +import java.nio.channels.SelectionKey.* +import java.util.* +import java.util.concurrent.* + +/** + * Network handler using java.nio, resulting in less threads. + */ +class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { + + private val threadPool = Executors.newCachedThreadPool( + pool("network") + .lowPrio() + .daemon() + .build()) + + private lateinit var ctx: InternalContext + private var selector: Selector? = null + private var serverChannel: ServerSocketChannel? = null + private val connectionQueue = ConcurrentLinkedQueue() + private val connections = ConcurrentHashMap() + private val requestedObjects = ConcurrentHashMap(10000) + + private var starter: Thread? = null + + override fun setContext(context: InternalContext) { + ctx = context + } + + override fun synchronize(server: InetAddress, port: Int, timeoutInSeconds: Long): Future { + return threadPool.submit(Callable { + SocketChannel.open(InetSocketAddress(server, port)).use { channel -> + channel.configureBlocking(false) + val connection = Connection(ctx, SYNC, + NetworkAddress.Builder().ip(server).port(port).stream(1).build(), + HashMap(), timeoutInSeconds) + while (channel.isConnected && !connection.isSyncFinished) { + write(channel, connection.io) + read(channel, connection.io) + Thread.sleep(10) + } + LOG.info("Synchronization finished") + } + null + }) + } + + override fun send(server: InetAddress, port: Int, request: CustomMessage): CustomMessage { + SocketChannel.open(InetSocketAddress(server, port)).use { channel -> + channel.configureBlocking(true) + val headerBuffer = ByteBuffer.allocate(HEADER_SIZE) + val payloadBuffer = NetworkMessage(request).writeHeaderAndGetPayloadBuffer(headerBuffer) + headerBuffer.flip() + while (headerBuffer.hasRemaining()) { + channel.write(headerBuffer) + } + while (payloadBuffer.hasRemaining()) { + channel.write(payloadBuffer) + } + + val reader = V3MessageReader() + while (channel.isConnected && reader.getMessages().isEmpty()) { + if (channel.read(reader.getActiveBuffer()) > 0) { + reader.update() + } else { + throw NodeException("No response from node $server") + } + } + val networkMessage: NetworkMessage? + if (reader.getMessages().isEmpty()) { + throw NodeException("No response from node " + server) + } else { + networkMessage = reader.getMessages().first() + } + + if (networkMessage.payload is CustomMessage) { + return networkMessage.payload as CustomMessage + } else { + throw NodeException("Unexpected response from node $server: ${networkMessage.payload.javaClass}") + } + } + } + + override fun start() { + if (selector?.isOpen ?: false) { + throw IllegalStateException("Network already running - you need to stop first.") + } + val selector = Selector.open() + this.selector = selector + + requestedObjects.clear() + + starter = thread("connection manager") { + while (selector.isOpen) { + var missing = NETWORK_MAGIC_NUMBER + for (connection in connections.keys) { + if (connection.state == Connection.State.ACTIVE) { + missing-- + if (missing == 0) break + } + } + if (missing > 0) { + var addresses = ctx.nodeRegistry.getKnownAddresses(100, *ctx.streams) + addresses = selectRandom(missing, addresses) + for (address in addresses) { + if (!isConnectedTo(address)) { + connectionQueue.offer(address) + } + } + } + + val it = connections.entries.iterator() + while (it.hasNext()) { + val e = it.next() + if (!e.value.isValid || e.key.isExpired()) { + try { + e.value.channel().close() + } catch (ignore: Exception) { + } + + e.value.cancel() + e.value.attach(null) + e.key.disconnect() + it.remove() + } + } + + // The list 'requested objects' helps to prevent downloading an object + // twice. From time to time there is an error though, and an object is + // never downloaded. To prevent a large list of failed objects and give + // them a chance to get downloaded again, we will attempt to download an + // object from another node after some time out. + val timedOut = System.currentTimeMillis() - REQUESTED_OBJECTS_MAX_TIME + val delayed = mutableListOf() + val iterator = requestedObjects.entries.iterator() + while (iterator.hasNext()) { + val e = iterator.next() + + if (e.value == DELAYED) { + iterator.remove() + } else if (e.value < timedOut) { + delayed.add(e.key) + e.setValue(DELAYED) + } + } + request(delayed) + + try { + Thread.sleep(30000) + } catch (e: InterruptedException) { + return@thread + } + } + } + + thread("selector worker", { + try { + val serverChannel = ServerSocketChannel.open() + this.serverChannel = serverChannel + serverChannel.configureBlocking(false) + serverChannel.socket().bind(InetSocketAddress(ctx.port)) + serverChannel.register(selector, OP_ACCEPT, null) + + while (selector.isOpen) { + selector.select(1000) + val keyIterator = selector.selectedKeys().iterator() + while (keyIterator.hasNext()) { + val key = keyIterator.next() + keyIterator.remove() + if (key.attachment() == null) { + try { + if (key.isAcceptable) { + // handle accept + try { + val accepted = (key.channel() as ServerSocketChannel).accept() + accepted.configureBlocking(false) + val connection = Connection(ctx, SERVER, + NetworkAddress( + time = now, + stream = 1L, + socket = accepted.socket()!! + ), + requestedObjects, 0 + ) + connections.put( + connection, + accepted.register(selector, OP_READ or OP_WRITE, connection) + ) + } catch (e: AsynchronousCloseException) { + LOG.trace(e.message) + } catch (e: IOException) { + LOG.error(e.message, e) + } + + } + } catch (e: CancelledKeyException) { + LOG.debug(e.message, e) + } + + } else { + // handle read/write + val channel = key.channel() as SocketChannel + val connection = key.attachment() as Connection + try { + if (key.isConnectable) { + if (!channel.finishConnect()) { + continue + } + } + if (key.isWritable) { + write(channel, connection.io) + } + if (key.isReadable) { + read(channel, connection.io) + } + if (connection.state == Connection.State.DISCONNECTED) { + key.interestOps(0) + channel.close() + } else if (connection.io.isWritePending) { + key.interestOps(OP_READ or OP_WRITE) + } else { + key.interestOps(OP_READ) + } + } catch (e: CancelledKeyException) { + connection.disconnect() + } catch (e: NodeException) { + connection.disconnect() + } catch (e: IOException) { + connection.disconnect() + } + } + } + // set interest ops + for ((connection, selectionKey) in connections) { + try { + if (selectionKey.isValid + && selectionKey.interestOps() and OP_WRITE == 0 + && selectionKey.interestOps() and OP_CONNECT == 0 + && !connection.nothingToSend) { + selectionKey.interestOps(OP_READ or OP_WRITE) + } + } catch (x: CancelledKeyException) { + connection.disconnect() + } + + } + // start new connections + if (!connectionQueue.isEmpty()) { + val address = connectionQueue.poll() + try { + val channel = SocketChannel.open() + channel.configureBlocking(false) + channel.connect(InetSocketAddress(address.toInetAddress(), address.port)) + val connection = Connection(ctx, CLIENT, address, requestedObjects, 0) + connections.put( + connection, + channel.register(selector, OP_CONNECT, connection) + ) + } catch (ignore: NoRouteToHostException) { + // We'll try to connect to many offline nodes, so + // this is expected to happen quite a lot. + } catch (e: AsynchronousCloseException) { + // The exception is expected if the network is being + // shut down, as we actually do asynchronously close + // the connections. + if (isRunning) { + LOG.error(e.message, e) + } + } catch (e: IOException) { + LOG.error(e.message, e) + } + + } + } + selector.close() + } catch (_: ClosedSelectorException) { + } + }) + } + + private fun thread(threadName: String, runnable: () -> Unit): Thread { + val thread = Thread(runnable, threadName) + thread.isDaemon = true + thread.priority = Thread.MIN_PRIORITY + thread.start() + return thread + } + + override fun stop() { + serverChannel?.socket()?.close() + selector?.close() + for (selectionKey in connections.values) { + selectionKey.channel().close() + } + } + + override fun offer(iv: InventoryVector) { + val targetConnections = connections.keys.filter { it.state == Connection.State.ACTIVE && !it.knowsOf(iv) } + selectRandom(NETWORK_MAGIC_NUMBER, targetConnections).forEach { it.offer(iv) } + } + + override fun request(inventoryVectors: MutableCollection) { + if (!isRunning) { + requestedObjects.clear() + return + } + val iterator = inventoryVectors.iterator() + if (!iterator.hasNext()) { + return + } + + val distribution = HashMap>() + for (connection in connections.keys) { + if (connection.state == Connection.State.ACTIVE) { + distribution.put(connection, mutableListOf()) + } + } + if (distribution.isEmpty()) { + return + } + var next = iterator.next() + var previous: Connection? = null + do { + for (connection in distribution.keys) { + if (connection === previous || previous == null) { + if (iterator.hasNext()) { + previous = connection + next = iterator.next() + } else { + break + } + } + if (connection.knowsOf(next) && !connection.requested(next)) { + val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection") + if (ivs.size == GetData.MAX_INVENTORY_SIZE) { + connection.send(GetData(ivs)) + ivs.clear() + } + ivs.add(next) + iterator.remove() + + if (iterator.hasNext()) { + next = iterator.next() + previous = connection + } else { + break + } + } + } + } while (iterator.hasNext()) + + // remove objects nobody knows of + for (iv in inventoryVectors) { + requestedObjects.remove(iv) + } + + for (connection in distribution.keys) { + val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection") + if (!ivs.isEmpty()) { + connection.send(GetData(ivs)) + } + } + } + + override fun getNetworkStatus(): Property { + val streams = TreeSet() + val incomingConnections = TreeMap() + val outgoingConnections = TreeMap() + + for (connection in connections.keys) { + if (connection.state == Connection.State.ACTIVE) { + for (stream in connection.streams) { + streams.add(stream) + if (connection.mode == SERVER) { + DebugUtils.inc(incomingConnections, stream) + } else { + DebugUtils.inc(outgoingConnections, stream) + } + } + } + } + val streamProperties = mutableListOf() + for (stream in streams) { + val incoming = incomingConnections[stream] ?: 0 + val outgoing = outgoingConnections[stream] ?: 0 + streamProperties.add(Property("stream " + stream, Property("nodes", incoming + outgoing), + Property("incoming", incoming), + Property("outgoing", outgoing) + )) + } + return Property("network", + Property("connectionManager", if (isRunning) "running" else "stopped"), + Property("connections", *streamProperties.toTypedArray()), + Property("requestedObjects", requestedObjects.size) + ) + } + + private fun isConnectedTo(address: NetworkAddress): Boolean { + for (c in connections.keys) { + if (c.node == address) { + return true + } + } + return false + } + + override val isRunning: Boolean + get() = selector?.isOpen ?: false && starter?.isAlive ?: false + + companion object { + private val LOG = LoggerFactory.getLogger(NioNetworkHandler::class.java) + private val REQUESTED_OBJECTS_MAX_TIME = (2 * 60000).toLong() // 2 minutes in ms + private val DELAYED = java.lang.Long.MIN_VALUE + + private fun write(channel: SocketChannel, connection: ConnectionIO) { + writeBuffer(connection.outBuffers, channel) + + connection.updateWriter() + + writeBuffer(connection.outBuffers, channel) + connection.cleanupBuffers() + } + + private fun writeBuffer(buffers: Array, channel: SocketChannel) { + if (buffers.any { buf -> buf.hasRemaining() }) channel.write(buffers) + } + + private fun read(channel: SocketChannel, connection: ConnectionIO) { + if (channel.read(connection.inBuffer) > 0) { + connection.updateReader() + } + connection.updateSyncStatus() + } + } +} diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java deleted file mode 100644 index f079ede..0000000 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.networking; - -import ch.dissem.bitmessage.BitmessageContext; -import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography; -import ch.dissem.bitmessage.entity.CustomMessage; -import ch.dissem.bitmessage.entity.MessagePayload; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.NodeException; -import ch.dissem.bitmessage.networking.nio.NioNetworkHandler; -import ch.dissem.bitmessage.ports.*; -import ch.dissem.bitmessage.testutils.TestInventory; -import ch.dissem.bitmessage.utils.Property; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.DisableOnDebug; -import org.junit.rules.TestRule; -import org.junit.rules.Timeout; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Future; - -import static ch.dissem.bitmessage.utils.Singleton.cryptography; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; - -/** - * Tests network handlers. This test is parametrized, so it can test both the nio and classic implementation - * as well as their combinations. It might be slightly over the top and will most probably be cleaned up once - * the nio implementation is deemed stable. - */ -@RunWith(Parameterized.class) -public class NetworkHandlerTest { - private static final Logger LOG = LoggerFactory.getLogger(NetworkHandlerTest.class); - private static NetworkAddress peerAddress = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build(); - - private TestInventory peerInventory; - private TestInventory nodeInventory; - - private BitmessageContext peer; - private BitmessageContext node; - - private final NetworkHandler peerNetworkHandler; - private final NetworkHandler nodeNetworkHandler; - - @Rule - public final TestRule timeout = new DisableOnDebug(Timeout.seconds(60)); - - public NetworkHandlerTest(NetworkHandler peer, NetworkHandler node) { - this.peerNetworkHandler = peer; - this.nodeNetworkHandler = node; - } - - @Parameterized.Parameters - @SuppressWarnings("deprecation") - public static List parameters() { - return Arrays.asList(new Object[][]{ - {new DefaultNetworkHandler(), new DefaultNetworkHandler()}, - {new DefaultNetworkHandler(), new NioNetworkHandler()}, - {new NioNetworkHandler(), new DefaultNetworkHandler()}, - {new NioNetworkHandler(), new NioNetworkHandler()} - }); - } - - @Before - public void setUp() throws InterruptedException { - peerInventory = new TestInventory(); - peer = new BitmessageContext.Builder() - .addressRepo(mock(AddressRepository.class)) - .inventory(peerInventory) - .messageRepo(mock(MessageRepository.class)) - .powRepo(mock(ProofOfWorkRepository.class)) - .port(peerAddress.getPort()) - .nodeRegistry(new TestNodeRegistry()) - .networkHandler(peerNetworkHandler) - .cryptography(new BouncyCryptography()) - .listener(mock(BitmessageContext.Listener.class)) - .customCommandHandler(new CustomCommandHandler() { - @Override - public MessagePayload handle(CustomMessage request) { - byte[] data = request.getData(); - if (data.length > 0) { - switch (data[0]) { - case 0: - return null; - case 1: - break; - case 3: - data[0] = 0; - break; - default: - break; - } - } - return new CustomMessage("test response", request.getData()); - } - }) - .build(); - peer.startup(); - Thread.sleep(100); - - nodeInventory = new TestInventory(); - node = new BitmessageContext.Builder() - .addressRepo(mock(AddressRepository.class)) - .inventory(nodeInventory) - .messageRepo(mock(MessageRepository.class)) - .powRepo(mock(ProofOfWorkRepository.class)) - .port(6002) - .nodeRegistry(new TestNodeRegistry(peerAddress)) - .networkHandler(nodeNetworkHandler) - .cryptography(new BouncyCryptography()) - .listener(mock(BitmessageContext.Listener.class)) - .build(); - } - - @After - public void cleanUp() { - shutdown(peer); - shutdown(node); - shutdown(nodeNetworkHandler); - } - - private static void shutdown(BitmessageContext ctx) { - if (!ctx.isRunning()) return; - - ctx.shutdown(); - do { - try { - Thread.sleep(100); - } catch (InterruptedException ignore) { - } - } while (ctx.isRunning()); - } - - private static void shutdown(NetworkHandler networkHandler) { - if (!networkHandler.isRunning()) return; - - networkHandler.stop(); - do { - try { - Thread.sleep(100); - } catch (InterruptedException ignore) { - if (networkHandler.isRunning()) { - LOG.warn("Thread interrupted while waiting for network shutdown - " + - "this could cause problems in subsequent tests."); - } - return; - } - } while (networkHandler.isRunning()); - } - - private Property waitForNetworkStatus(BitmessageContext ctx) throws InterruptedException { - Property status; - do { - Thread.sleep(100); - status = ctx.status().getProperty("network", "connections", "stream 1"); - } while (status == null); - return status; - } - - @Test - public void ensureNodesAreConnecting() throws Exception { - node.startup(); - - Property nodeStatus = waitForNetworkStatus(node); - Property peerStatus = waitForNetworkStatus(peer); - - assertEquals(1, nodeStatus.getProperty("outgoing").getValue()); - assertEquals(1, peerStatus.getProperty("incoming").getValue()); - } - - @Test - public void ensureCustomMessageIsSentAndResponseRetrieved() throws Exception { - byte[] data = cryptography().randomBytes(8); - data[0] = (byte) 1; - CustomMessage request = new CustomMessage("test request", data); - node.startup(); - - CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request); - - assertThat(response, notNullValue()); - assertThat(response.getCustomCommand(), is("test response")); - assertThat(response.getData(), is(data)); - } - - @Test(expected = NodeException.class) - public void ensureCustomMessageWithoutResponseYieldsException() throws Exception { - byte[] data = cryptography().randomBytes(8); - data[0] = (byte) 0; - CustomMessage request = new CustomMessage("test request", data); - - CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request); - - assertThat(response, notNullValue()); - assertThat(response.getCustomCommand(), is("test response")); - assertThat(response.getData(), is(request.getData())); - } - - @Test - public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception { - peerInventory.init( - "V4Pubkey.payload", - "V5Broadcast.payload" - ); - - nodeInventory.init( - "V1Msg.payload", - "V4Pubkey.payload" - ); - - Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10); - future.get(); - assertInventorySize(3, nodeInventory); - assertInventorySize(3, peerInventory); - } - - @Test - public void ensureObjectsAreSynchronizedIfOnlyPeerHasObjects() throws Exception { - peerInventory.init( - "V4Pubkey.payload", - "V5Broadcast.payload" - ); - - nodeInventory.init(); - - Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10); - future.get(); - assertInventorySize(2, nodeInventory); - assertInventorySize(2, peerInventory); - } - - @Test - public void ensureObjectsAreSynchronizedIfOnlyNodeHasObjects() throws Exception { - peerInventory.init(); - - nodeInventory.init( - "V1Msg.payload" - ); - - Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10); - future.get(); - assertInventorySize(1, nodeInventory); - assertInventorySize(1, peerInventory); - } - - private void assertInventorySize(int expected, TestInventory inventory) throws InterruptedException { - long timeout = System.currentTimeMillis() + 1000; - while (expected != inventory.getInventory().size() && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - } - assertEquals(expected, inventory.getInventory().size()); - } - -} diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.kt b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.kt new file mode 100644 index 0000000..4eaface --- /dev/null +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.kt @@ -0,0 +1,266 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking + +import ch.dissem.bitmessage.BitmessageContext +import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography +import ch.dissem.bitmessage.entity.CustomMessage +import ch.dissem.bitmessage.entity.MessagePayload +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress +import ch.dissem.bitmessage.exception.NodeException +import ch.dissem.bitmessage.networking.nio.NioNetworkHandler +import ch.dissem.bitmessage.ports.* +import ch.dissem.bitmessage.testutils.TestInventory +import ch.dissem.bitmessage.utils.Property +import ch.dissem.bitmessage.utils.Singleton.cryptography +import com.nhaarman.mockito_kotlin.mock +import org.hamcrest.Matchers.`is` +import org.hamcrest.Matchers.notNullValue +import org.junit.After +import org.junit.Assert.* +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.DisableOnDebug +import org.junit.rules.TestRule +import org.junit.rules.Timeout +import org.slf4j.LoggerFactory + +/** + * Tests network handlers. This test is parametrized, so it can test both the nio and classic implementation + * as well as their combinations. It might be slightly over the top and will most probably be cleaned up once + * the nio implementation is deemed stable. + */ +class NetworkHandlerTest { + + private lateinit var peerInventory: TestInventory + private lateinit var nodeInventory: TestInventory + + private lateinit var peer: BitmessageContext + private lateinit var node: BitmessageContext + + private lateinit var peerNetworkHandler: NetworkHandler + private lateinit var nodeNetworkHandler: NetworkHandler + + @JvmField @Rule val timeout: TestRule = DisableOnDebug(Timeout.seconds(60)) + + @Before + fun setUp() { + peerInventory = TestInventory() + peerNetworkHandler = NioNetworkHandler() + peer = BitmessageContext( + cryptography = BouncyCryptography(), + inventory = peerInventory, + nodeRegistry = TestNodeRegistry(), + networkHandler = peerNetworkHandler, + addressRepository = mock(), + messageRepository = mock(), + proofOfWorkRepository = mock(), + customCommandHandler = object : CustomCommandHandler { + override fun handle(request: CustomMessage): MessagePayload? { + val data = request.getData() + if (data.isNotEmpty()) { + when (data[0]) { + 0.toByte() -> return null + 1.toByte() -> { + } + 3.toByte() -> data[0] = 0 + } + } + return CustomMessage("test response", request.getData()) + } + }, + listener = mock(), + port = peerAddress.port + ) + peer.startup() + Thread.sleep(100) + + nodeInventory = TestInventory() + nodeNetworkHandler = NioNetworkHandler() + node = BitmessageContext( + cryptography = BouncyCryptography(), + inventory = nodeInventory, + nodeRegistry = TestNodeRegistry(peerAddress), + networkHandler = nodeNetworkHandler, + addressRepository = mock(), + messageRepository = mock(), + proofOfWorkRepository = mock(), + customCommandHandler = object : CustomCommandHandler { + override fun handle(request: CustomMessage): MessagePayload? { + val data = request.getData() + if (data.isNotEmpty()) { + when (data[0]) { + 0.toByte() -> return null + 1.toByte() -> { + } + 3.toByte() -> data[0] = 0 + } + } + return CustomMessage("test response", request.getData()) + } + }, + listener = mock(), + port = 6002 + ) + } + + @After + fun cleanUp() { + shutdown(peer) + shutdown(node) + shutdown(nodeNetworkHandler) + } + + private fun waitForNetworkStatus(ctx: BitmessageContext): Property { + var status: Property? + do { + Thread.sleep(100) + status = ctx.status().getProperty("network", "connections", "stream 1") + } while (status == null) + return status + } + + @Test + fun `ensure nodes are connecting`() { + node.startup() + + val nodeStatus = waitForNetworkStatus(node) + val peerStatus = waitForNetworkStatus(peer) + + assertEquals(1, nodeStatus.getProperty("outgoing")!!.value) + assertEquals(1, peerStatus.getProperty("incoming")!!.value) + } + + @Test + fun `ensure CustomMessage is sent and response retrieved`() { + val data = cryptography().randomBytes(8) + data[0] = 1.toByte() + val request = CustomMessage("test request", data) + node.startup() + + val response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.port, request) + + assertThat(response, notNullValue()) + assertThat(response.customCommand, `is`("test response")) + assertThat(response.getData(), `is`(data)) + } + + @Test(expected = NodeException::class) + fun `ensure CustomMessage without response yields exception`() { + val data = cryptography().randomBytes(8) + data[0] = 0.toByte() + val request = CustomMessage("test request", data) + + val response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.port, request) + + assertThat(response, notNullValue()) + assertThat(response.customCommand, `is`("test response")) + assertThat(response.getData(), `is`(request.getData())) + } + + @Test + fun `ensure objects are synchronized if both have objects`() { + peerInventory.init( + "V4Pubkey.payload", + "V5Broadcast.payload" + ) + + nodeInventory.init( + "V1Msg.payload", + "V4Pubkey.payload" + ) + + val future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.port, 10) + future.get() + assertInventorySize(3, nodeInventory) + assertInventorySize(3, peerInventory) + } + + @Test + fun `ensure objects are synchronized if only peer has objects`() { + peerInventory.init( + "V4Pubkey.payload", + "V5Broadcast.payload" + ) + + nodeInventory.init() + + val future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.port, 10) + future.get() + assertInventorySize(2, nodeInventory) + assertInventorySize(2, peerInventory) + } + + @Test + fun `ensure objects are synchronized if only node has objects`() { + peerInventory.init() + + nodeInventory.init( + "V1Msg.payload" + ) + + val future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.port, 10) + future.get() + assertInventorySize(1, nodeInventory) + assertInventorySize(1, peerInventory) + } + + private fun assertInventorySize(expected: Int, inventory: TestInventory) { + val timeout = System.currentTimeMillis() + 1000 + while (expected != inventory.getInventory().size && System.currentTimeMillis() < timeout) { + Thread.sleep(10) + } + assertEquals(expected.toLong(), inventory.getInventory().size.toLong()) + } + + companion object { + private val LOG = LoggerFactory.getLogger(NetworkHandlerTest::class.java) + private val peerAddress = NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build() + + private fun shutdown(ctx: BitmessageContext) { + if (!ctx.isRunning) return + + ctx.shutdown() + do { + try { + Thread.sleep(100) + } catch (ignore: InterruptedException) { + } + + } while (ctx.isRunning) + } + + private fun shutdown(networkHandler: NetworkHandler) { + if (!networkHandler.isRunning) return + + networkHandler.stop() + do { + try { + Thread.sleep(100) + } catch (ignore: InterruptedException) { + if (networkHandler.isRunning) { + LOG.warn("Thread interrupted while waiting for network shutdown - " + "this could cause problems in subsequent tests.") + } + return + } + + } while (networkHandler.isRunning) + } + } + +} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java deleted file mode 100644 index 2f1e107..0000000 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.repository; - -import ch.dissem.bitmessage.entity.BitmessageAddress; -import ch.dissem.bitmessage.entity.payload.Pubkey; -import ch.dissem.bitmessage.entity.payload.V3Pubkey; -import ch.dissem.bitmessage.entity.payload.V4Pubkey; -import ch.dissem.bitmessage.entity.valueobject.PrivateKey; -import ch.dissem.bitmessage.exception.ApplicationException; -import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.AddressRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.sql.*; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -public class JdbcAddressRepository extends JdbcHelper implements AddressRepository { - private static final Logger LOG = LoggerFactory.getLogger(JdbcAddressRepository.class); - - public JdbcAddressRepository(JdbcConfig config) { - super(config); - } - - @Override - public BitmessageAddress findContact(byte[] ripeOrTag) { - for (BitmessageAddress address : find("public_key is null")) { - if (address.getVersion() > 3) { - if (Arrays.equals(ripeOrTag, address.getTag())) return address; - } else { - if (Arrays.equals(ripeOrTag, address.getRipe())) return address; - } - } - return null; - } - - @Override - public BitmessageAddress findIdentity(byte[] ripeOrTag) { - for (BitmessageAddress address : find("private_key is not null")) { - if (address.getVersion() > 3) { - if (Arrays.equals(ripeOrTag, address.getTag())) return address; - } else { - if (Arrays.equals(ripeOrTag, address.getRipe())) return address; - } - } - return null; - } - - @Override - public List getIdentities() { - return find("private_key IS NOT NULL"); - } - - @Override - public List getChans() { - return find("chan = '1'"); - } - - @Override - public List getSubscriptions() { - return find("subscribed = '1'"); - } - - @Override - public List getSubscriptions(long broadcastVersion) { - if (broadcastVersion > 4) { - return find("subscribed = '1' AND version > 3"); - } else { - return find("subscribed = '1' AND version <= 3"); - } - } - - @Override - public List getContacts() { - return find("private_key IS NULL OR chan = '1'"); - } - - private List find(String where) { - List result = new LinkedList<>(); - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT address, alias, public_key, private_key, subscribed, chan " + - "FROM Address WHERE " + where) - ) { - while (rs.next()) { - BitmessageAddress address; - - InputStream privateKeyStream = rs.getBinaryStream("private_key"); - if (privateKeyStream == null) { - address = new BitmessageAddress(rs.getString("address")); - Blob publicKeyBlob = rs.getBlob("public_key"); - if (publicKeyBlob != null) { - Pubkey pubkey = Factory.readPubkey(address.getVersion(), address.getStream(), - publicKeyBlob.getBinaryStream(), (int) publicKeyBlob.length(), false); - if (address.getVersion() == 4 && pubkey instanceof V3Pubkey) { - pubkey = new V4Pubkey((V3Pubkey) pubkey); - } - address.setPubkey(pubkey); - } - } else { - PrivateKey privateKey = PrivateKey.read(privateKeyStream); - address = new BitmessageAddress(privateKey); - } - address.setAlias(rs.getString("alias")); - address.setSubscribed(rs.getBoolean("subscribed")); - address.setChan(rs.getBoolean("chan")); - - result.add(address); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - return result; - } - - private boolean exists(BitmessageAddress address) { - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT '1' FROM Address " + - "WHERE address='" + address.getAddress() + "'") - ) { - return rs.next(); - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - throw new ApplicationException(e); - } - } - - @Override - public void save(BitmessageAddress address) { - try { - if (exists(address)) { - update(address); - } else { - insert(address); - } - } catch (IOException | SQLException e) { - LOG.error(e.getMessage(), e); - } - } - - private void update(BitmessageAddress address) throws IOException, SQLException { - StringBuilder statement = new StringBuilder("UPDATE Address SET alias=?"); - if (address.getPubkey() != null) { - statement.append(", public_key=?"); - } - if (address.getPrivateKey() != null) { - statement.append(", private_key=?"); - } - statement.append(", subscribed=?, chan=? WHERE address=?"); - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement(statement.toString()) - ) { - int i = 0; - ps.setString(++i, address.getAlias()); - if (address.getPubkey() != null) { - writePubkey(ps, ++i, address.getPubkey()); - } - if (address.getPrivateKey() != null) { - writeBlob(ps, ++i, address.getPrivateKey()); - } - ps.setBoolean(++i, address.isSubscribed()); - ps.setBoolean(++i, address.isChan()); - ps.setString(++i, address.getAddress()); - ps.executeUpdate(); - } - } - - private void insert(BitmessageAddress address) throws IOException, SQLException { - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement( - "INSERT INTO Address (address, version, alias, public_key, private_key, subscribed, chan) " + - "VALUES (?, ?, ?, ?, ?, ?, ?)") - ) { - ps.setString(1, address.getAddress()); - ps.setLong(2, address.getVersion()); - ps.setString(3, address.getAlias()); - writePubkey(ps, 4, address.getPubkey()); - writeBlob(ps, 5, address.getPrivateKey()); - ps.setBoolean(6, address.isSubscribed()); - ps.setBoolean(7, address.isChan()); - ps.executeUpdate(); - } - } - - protected void writePubkey(PreparedStatement ps, int parameterIndex, Pubkey data) throws SQLException, IOException { - if (data != null) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - data.writeUnencrypted(out); - ps.setBytes(parameterIndex, out.toByteArray()); - } else { - ps.setBytes(parameterIndex, null); - } - } - - @Override - public void remove(BitmessageAddress address) { - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement() - ) { - stmt.executeUpdate("DELETE FROM Address WHERE address = '" + address.getAddress() + "'"); - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public BitmessageAddress getAddress(String address) { - List result = find("address = '" + address + "'"); - if (result.size() > 0) return result.get(0); - return null; - } -} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcConfig.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcConfig.java deleted file mode 100644 index 7448b19..0000000 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcConfig.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.repository; - -import org.flywaydb.core.Flyway; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -/** - * The base configuration for all JDBC based repositories. You should only make one instance, - * as flyway initializes/updates the database at object creation. - */ -public class JdbcConfig { - protected final Flyway flyway; - protected final String dbUrl; - protected final String dbUser; - protected final String dbPassword; - - public JdbcConfig(String dbUrl, String dbUser, String dbPassword) { - this.dbUrl = dbUrl; - this.dbUser = dbUser; - this.dbPassword = dbPassword; - this.flyway = new Flyway(); - flyway.setDataSource(dbUrl, dbUser, dbPassword); - flyway.migrate(); - } - - public JdbcConfig() { - this("jdbc:h2:~/jabit;AUTO_SERVER=TRUE", "sa", null); - } - - public Connection getConnection() throws SQLException { - return DriverManager.getConnection(dbUrl, dbUser, dbPassword); - } -} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java deleted file mode 100644 index 8548267..0000000 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.repository; - -import ch.dissem.bitmessage.entity.Streamable; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -/** - * Helper class that does Flyway migration, provides JDBC connections and some helper methods. - */ -public abstract class JdbcHelper { - - protected final JdbcConfig config; - - protected JdbcHelper(JdbcConfig config) { - this.config = config; - } - - public static void writeBlob(PreparedStatement ps, int parameterIndex, Streamable data) throws SQLException, IOException { - if (data == null) { - ps.setBytes(parameterIndex, null); - } else { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - data.write(os); - ps.setBytes(parameterIndex, os.toByteArray()); - } - } -} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcInventory.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcInventory.java deleted file mode 100644 index 65ea15f..0000000 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcInventory.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.repository; - -import ch.dissem.bitmessage.entity.ObjectMessage; -import ch.dissem.bitmessage.entity.payload.ObjectType; -import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.exception.ApplicationException; -import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.Inventory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.*; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import static ch.dissem.bitmessage.utils.SqlStrings.join; -import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; -import static ch.dissem.bitmessage.utils.UnixTime.now; - -public class JdbcInventory extends JdbcHelper implements Inventory { - private static final Logger LOG = LoggerFactory.getLogger(JdbcInventory.class); - - private final Map> cache = new ConcurrentHashMap<>(); - - public JdbcInventory(JdbcConfig config) { - super(config); - } - - @Override - public List getInventory(long... streams) { - List result = new LinkedList<>(); - for (long stream : streams) { - getCache(stream).entrySet().stream() - .filter(e -> e.getValue() > now()) - .forEach(e -> result.add(e.getKey())); - } - return result; - } - - private Map getCache(long stream) { - Map result = cache.get(stream); - if (result == null) { - synchronized (cache) { - if (cache.get(stream) == null) { - result = new ConcurrentHashMap<>(); - cache.put(stream, result); - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT hash, expires FROM Inventory " + - "WHERE expires > " + (now() - 5 * MINUTE) + " AND stream = " + stream) - ) { - while (rs.next()) { - result.put(InventoryVector.fromHash(rs.getBytes("hash")), rs.getLong("expires")); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - } - } - } - return result; - } - - @Override - public List getMissing(List offer, long... streams) { - for (long stream : streams) { - offer.removeAll(getCache(stream).keySet()); - } - return offer; - } - - @Override - public ObjectMessage getObject(InventoryVector vector) { - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT data, version FROM Inventory WHERE hash = X'" + vector + "'") - ) { - if (rs.next()) { - Blob data = rs.getBlob("data"); - return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()); - } else { - LOG.info("Object requested that we don't have. IV: " + vector); - return null; - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new ApplicationException(e); - } - } - - @Override - public List getObjects(long stream, long version, ObjectType... types) { - StringBuilder query = new StringBuilder("SELECT data, version FROM Inventory WHERE 1=1"); - if (stream > 0) { - query.append(" AND stream = ").append(stream); - } - if (version > 0) { - query.append(" AND version = ").append(version); - } - if (types.length > 0) { - query.append(" AND type IN (").append(join(types)).append(')'); - } - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery(query.toString()) - ) { - List result = new LinkedList<>(); - while (rs.next()) { - Blob data = rs.getBlob("data"); - result.add(Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length())); - } - return result; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new ApplicationException(e); - } - } - - @Override - public void storeObject(ObjectMessage object) { - if (getCache(object.getStream()).containsKey(object.getInventoryVector())) - return; - - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement("INSERT INTO Inventory " + - "(hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)") - ) { - InventoryVector iv = object.getInventoryVector(); - LOG.trace("Storing object " + iv); - ps.setBytes(1, iv.getHash()); - ps.setLong(2, object.getStream()); - ps.setLong(3, object.getExpiresTime()); - writeBlob(ps, 4, object); - ps.setLong(5, object.getType()); - ps.setLong(6, object.getVersion()); - ps.executeUpdate(); - getCache(object.getStream()).put(iv, object.getExpiresTime()); - } catch (SQLException e) { - LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public boolean contains(ObjectMessage object) { - return getCache(object.getStream()).entrySet().stream() - .anyMatch(x -> x.getKey().equals(object.getInventoryVector())); - } - - @Override - public void cleanup() { - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement() - ) { - stmt.executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 5 * MINUTE)); - } catch (SQLException e) { - LOG.debug(e.getMessage(), e); - } - for (Map c : cache.values()) { - c.entrySet().removeIf(e -> e.getValue() < (now() - 5 * MINUTE)); - } - } -} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java deleted file mode 100644 index acecd33..0000000 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.repository; - -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.ApplicationException; -import ch.dissem.bitmessage.ports.NodeRegistry; -import ch.dissem.bitmessage.utils.Collections; -import ch.dissem.bitmessage.utils.SqlStrings; -import ch.dissem.bitmessage.utils.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.*; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static ch.dissem.bitmessage.ports.NodeRegistryHelper.loadStableNodes; -import static ch.dissem.bitmessage.utils.UnixTime.*; - -public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { - private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class); - private Map> stableNodes; - - public JdbcNodeRegistry(JdbcConfig config) { - super(config); - cleanUp(); - } - - private void cleanUp() { - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement( - "DELETE FROM Node WHERE time getKnownAddresses(int limit, long... streams) { - List result = new LinkedList<>(); - String query = - "SELECT stream, address, port, services, time" + - " FROM Node WHERE stream IN (" + SqlStrings.join(streams) + ")" + - " ORDER BY TIME DESC" + - " LIMIT " + limit; - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery(query) - ) { - while (rs.next()) { - result.add( - new NetworkAddress.Builder() - .stream(rs.getLong("stream")) - .ipv6(rs.getBytes("address")) - .port(rs.getInt("port")) - .services(rs.getLong("services")) - .time(rs.getLong("time")) - .build() - ); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new ApplicationException(e); - } - if (result.isEmpty()) { - synchronized (this) { - if (stableNodes == null) { - stableNodes = loadStableNodes(); - } - } - for (long stream : streams) { - Set nodes = stableNodes.get(stream); - if (nodes != null && !nodes.isEmpty()) { - result.add(Collections.selectRandom(nodes)); - } - } - if (result.isEmpty()) { - // There might have been an error resolving domain names due to a missing internet exception. - // Try to load the stable nodes again next time. - stableNodes = null; - } - } - return result; - } - - @Override - public void offerAddresses(List nodes) { - cleanUp(); - nodes.stream() - .filter(node -> node.getTime() < now() + 2 * MINUTE && node.getTime() > now() - 28 * DAY) - .forEach(node -> { - synchronized (this) { - NetworkAddress existing = loadExisting(node); - if (existing == null) { - insert(node); - } else if (node.getTime() > existing.getTime()) { - update(node); - } - } - }); - } - - private void insert(NetworkAddress node) { - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement( - "INSERT INTO Node (stream, address, port, services, time) " + - "VALUES (?, ?, ?, ?, ?)") - ) { - ps.setLong(1, node.getStream()); - ps.setBytes(2, node.getIPv6()); - ps.setInt(3, node.getPort()); - ps.setLong(4, node.getServices()); - ps.setLong(5, node.getTime()); - ps.executeUpdate(); - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - } - - private void update(NetworkAddress node) { - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement( - "UPDATE Node SET services=?, time=? WHERE stream=? AND address=? AND port=?") - ) { - ps.setLong(1, node.getServices()); - ps.setLong(2, node.getTime()); - ps.setLong(3, node.getStream()); - ps.setBytes(4, node.getIPv6()); - ps.setInt(5, node.getPort()); - ps.executeUpdate(); - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - } -} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcProofOfWorkRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcProofOfWorkRepository.java deleted file mode 100644 index c509e4a..0000000 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcProofOfWorkRepository.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright 2016 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.repository; - -import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.ObjectMessage; -import ch.dissem.bitmessage.exception.ApplicationException; -import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.ProofOfWorkRepository; -import ch.dissem.bitmessage.utils.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.sql.*; -import java.util.LinkedList; -import java.util.List; - -import static ch.dissem.bitmessage.utils.Singleton.cryptography; - -/** - * @author Christian Basler - */ -public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWorkRepository, InternalContext.ContextHolder { - private static final Logger LOG = LoggerFactory.getLogger(JdbcProofOfWorkRepository.class); - private InternalContext ctx; - - public JdbcProofOfWorkRepository(JdbcConfig config) { - super(config); - } - - @Override - public Item getItem(byte[] initialHash) { - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement("SELECT data, version, nonce_trials_per_byte, " + - "extra_bytes, expiration_time, message_id FROM POW WHERE initial_hash=?") - ) { - ps.setBytes(1, initialHash); - try (ResultSet rs = ps.executeQuery()) { - if (rs.next()) { - Blob data = rs.getBlob("data"); - if (rs.getObject("message_id") == null) { - return new Item( - Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()), - rs.getLong("nonce_trials_per_byte"), - rs.getLong("extra_bytes") - ); - } else { - return new Item( - Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()), - rs.getLong("nonce_trials_per_byte"), - rs.getLong("extra_bytes"), - rs.getLong("expiration_time"), - ctx.getMessageRepository().getMessage(rs.getLong("message_id")) - ); - } - } else { - throw new IllegalArgumentException("Object requested that we don't have. Initial hash: " + Strings.hex(initialHash)); - } - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - throw new ApplicationException(e); - } - } - - @Override - public List getItems() { - try ( - Connection connection = config.getConnection(); - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT initial_hash FROM POW") - ) { - List result = new LinkedList<>(); - while (rs.next()) { - result.add(rs.getBytes("initial_hash")); - } - return result; - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - throw new ApplicationException(e); - } - } - - @Override - public void putObject(Item item) { - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement("INSERT INTO POW (initial_hash, data, version, " + - "nonce_trials_per_byte, extra_bytes, expiration_time, message_id) " + - "VALUES (?, ?, ?, ?, ?, ?, ?)") - ) { - ps.setBytes(1, cryptography().getInitialHash(item.getObjectMessage())); - writeBlob(ps, 2, item.getObjectMessage()); - ps.setLong(3, item.getObjectMessage().getVersion()); - ps.setLong(4, item.getNonceTrialsPerByte()); - ps.setLong(5, item.getExtraBytes()); - - if (item.getMessage() == null) { - ps.setObject(6, null); - ps.setObject(7, null); - } else { - ps.setLong(6, item.getExpirationTime()); - ps.setLong(7, (Long) item.getMessage().getId()); - } - ps.executeUpdate(); - } catch (IOException | SQLException e) { - LOG.debug("Error storing object of type " + item.getObjectMessage().getPayload().getClass().getSimpleName(), e); - throw new ApplicationException(e); - } - } - - @Override - public void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) { - putObject(new Item(object, nonceTrialsPerByte, extraBytes)); - } - - @Override - public void removeObject(byte[] initialHash) { - try ( - Connection connection = config.getConnection(); - PreparedStatement ps = connection.prepareStatement("DELETE FROM POW WHERE initial_hash=?") - ) { - ps.setBytes(1, initialHash); - ps.executeUpdate(); - } catch (SQLException e) { - LOG.debug(e.getMessage(), e); - } - } - - @Override - public void setContext(InternalContext context) { - this.ctx = context; - } -} diff --git a/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcAddressRepository.kt b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcAddressRepository.kt new file mode 100644 index 0000000..29a0f87 --- /dev/null +++ b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcAddressRepository.kt @@ -0,0 +1,203 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.repository + +import ch.dissem.bitmessage.entity.BitmessageAddress +import ch.dissem.bitmessage.entity.payload.Pubkey +import ch.dissem.bitmessage.entity.payload.V3Pubkey +import ch.dissem.bitmessage.entity.payload.V4Pubkey +import ch.dissem.bitmessage.entity.valueobject.PrivateKey +import ch.dissem.bitmessage.factory.Factory +import ch.dissem.bitmessage.ports.AddressRepository +import org.slf4j.LoggerFactory +import java.io.ByteArrayOutputStream +import java.io.IOException +import java.sql.PreparedStatement +import java.sql.SQLException +import java.util.* + +class JdbcAddressRepository(config: JdbcConfig) : JdbcHelper(config), AddressRepository { + + override fun findContact(ripeOrTag: ByteArray) = find("private_key is null").firstOrNull { + if (it.version > 3) { + Arrays.equals(ripeOrTag, it.tag) + } else { + Arrays.equals(ripeOrTag, it.ripe) + } + } + + override fun findIdentity(ripeOrTag: ByteArray) = find("private_key is not null").firstOrNull { + if (it.version > 3) { + Arrays.equals(ripeOrTag, it.tag) + } else { + Arrays.equals(ripeOrTag, it.ripe) + } + } + + override fun getIdentities() = find("private_key IS NOT NULL") + + override fun getChans() = find("chan = '1'") + + override fun getSubscriptions() = find("subscribed = '1'") + + override fun getSubscriptions(broadcastVersion: Long): List = if (broadcastVersion > 4) { + find("subscribed = '1' AND version > 3") + } else { + find("subscribed = '1' AND version <= 3") + } + + override fun getContacts() = find("private_key IS NULL OR chan = '1'") + + private fun find(where: String): List { + val result = LinkedList() + try { + config.getConnection().use { connection -> + connection.createStatement().use { stmt -> + stmt.executeQuery(""" + SELECT address, alias, public_key, private_key, subscribed, chan + FROM Address + WHERE $where + """).use { rs -> + while (rs.next()) { + val address: BitmessageAddress + + val privateKeyStream = rs.getBinaryStream("private_key") + if (privateKeyStream == null) { + address = BitmessageAddress(rs.getString("address")) + rs.getBlob("public_key")?.let { publicKeyBlob -> + var pubkey: Pubkey = Factory.readPubkey(address.version, address.stream, + publicKeyBlob.binaryStream, publicKeyBlob.length().toInt(), false)!! + if (address.version == 4L && pubkey is V3Pubkey) { + pubkey = V4Pubkey(pubkey) + } + address.pubkey = pubkey + } + } else { + val privateKey = PrivateKey.read(privateKeyStream) + address = BitmessageAddress(privateKey) + } + address.alias = rs.getString("alias") + address.isSubscribed = rs.getBoolean("subscribed") + address.isChan = rs.getBoolean("chan") + + result.add(address) + } + } + } + } + } catch (e: SQLException) { + LOG.error(e.message, e) + } + + return result + } + + private fun exists(address: BitmessageAddress): Boolean { + config.getConnection().use { connection -> + connection.createStatement().use { stmt -> + stmt.executeQuery("SELECT '1' FROM Address " + + "WHERE address='" + address.address + "'").use { rs -> return rs.next() } + } + } + } + + override fun save(address: BitmessageAddress) { + try { + if (exists(address)) { + update(address) + } else { + insert(address) + } + } catch (e: IOException) { + LOG.error(e.message, e) + } catch (e: SQLException) { + LOG.error(e.message, e) + } + } + + private fun update(address: BitmessageAddress) { + val statement = StringBuilder("UPDATE Address SET alias=?") + if (address.pubkey != null) { + statement.append(", public_key=?") + } + if (address.privateKey != null) { + statement.append(", private_key=?") + } + statement.append(", subscribed=?, chan=? WHERE address=?") + config.getConnection().use { connection -> + connection.prepareStatement(statement.toString()).use { ps -> + var i = 0 + ps.setString(++i, address.alias) + if (address.pubkey != null) { + writePubkey(ps, ++i, address.pubkey) + } + if (address.privateKey != null) { + JdbcHelper.writeBlob(ps, ++i, address.privateKey) + } + ps.setBoolean(++i, address.isSubscribed) + ps.setBoolean(++i, address.isChan) + ps.setString(++i, address.address) + ps.executeUpdate() + } + } + } + + private fun insert(address: BitmessageAddress) { + config.getConnection().use { connection -> + connection.prepareStatement( + "INSERT INTO Address (address, version, alias, public_key, private_key, subscribed, chan) " + "VALUES (?, ?, ?, ?, ?, ?, ?)").use { ps -> + ps.setString(1, address.address) + ps.setLong(2, address.version) + ps.setString(3, address.alias) + writePubkey(ps, 4, address.pubkey) + JdbcHelper.writeBlob(ps, 5, address.privateKey) + ps.setBoolean(6, address.isSubscribed) + ps.setBoolean(7, address.isChan) + ps.executeUpdate() + } + } + } + + private fun writePubkey(ps: PreparedStatement, parameterIndex: Int, data: Pubkey?) { + if (data != null) { + val out = ByteArrayOutputStream() + data.writeUnencrypted(out) + ps.setBytes(parameterIndex, out.toByteArray()) + } else { + ps.setBytes(parameterIndex, null) + } + } + + override fun remove(address: BitmessageAddress) { + try { + config.getConnection().use { connection -> connection.createStatement().use { stmt -> stmt.executeUpdate("DELETE FROM Address WHERE address = '" + address.address + "'") } } + } catch (e: SQLException) { + LOG.error(e.message, e) + } + + } + + override fun getAddress(address: String): BitmessageAddress? { + val result = find("address = '$address'") + if (result.isNotEmpty()) return result[0] + return null + } + + companion object { + private val LOG = LoggerFactory.getLogger(JdbcAddressRepository::class.java) + } +} diff --git a/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcConfig.kt b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcConfig.kt new file mode 100644 index 0000000..311bf70 --- /dev/null +++ b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcConfig.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.repository + +import org.flywaydb.core.Flyway +import java.sql.DriverManager + +/** + * The base configuration for all JDBC based repositories. You should only make one instance, + * as flyway initializes/updates the database at object creation. + */ +open class JdbcConfig @JvmOverloads constructor( + protected val dbUrl: String = "jdbc:h2:~/jabit;AUTO_SERVER=TRUE", + protected val dbUser: String = "sa", + protected val dbPassword: String? = null +) { + protected val flyway = Flyway() + + init { + flyway.setDataSource(dbUrl, dbUser, dbPassword) + flyway.migrate() + } + + fun getConnection() = DriverManager.getConnection(dbUrl, dbUser, dbPassword) +} diff --git a/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcHelper.kt b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcHelper.kt new file mode 100644 index 0000000..ce8ca1f --- /dev/null +++ b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcHelper.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.repository + +import ch.dissem.bitmessage.entity.Streamable +import java.io.ByteArrayOutputStream +import java.sql.PreparedStatement + +/** + * Helper class that does Flyway migration, provides JDBC connections and some helper methods. + */ +abstract class JdbcHelper protected constructor(protected val config: JdbcConfig) { + companion object { + @JvmStatic fun writeBlob(ps: PreparedStatement, parameterIndex: Int, data: Streamable?) { + if (data == null) { + ps.setBytes(parameterIndex, null) + } else { + val os = ByteArrayOutputStream() + data.write(os) + ps.setBytes(parameterIndex, os.toByteArray()) + } + } + } +} diff --git a/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcInventory.kt b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcInventory.kt new file mode 100644 index 0000000..637c5dd --- /dev/null +++ b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcInventory.kt @@ -0,0 +1,166 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.repository + +import ch.dissem.bitmessage.entity.ObjectMessage +import ch.dissem.bitmessage.entity.payload.ObjectType +import ch.dissem.bitmessage.entity.valueobject.InventoryVector +import ch.dissem.bitmessage.factory.Factory +import ch.dissem.bitmessage.ports.Inventory +import ch.dissem.bitmessage.utils.SqlStrings.join +import ch.dissem.bitmessage.utils.UnixTime.MINUTE +import ch.dissem.bitmessage.utils.UnixTime.now +import org.slf4j.LoggerFactory +import java.sql.SQLException +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory { + + private val cache = ConcurrentHashMap>() + + override fun getInventory(vararg streams: Long): List { + val result = LinkedList() + for (stream in streams) { + getCache(stream).entries.stream() + .filter { e -> e.value > now } + .forEach { e -> result.add(e.key) } + } + return result + } + + private fun getCache(stream: Long): MutableMap { + var result: MutableMap? = cache[stream] + if (result == null) { + synchronized(cache) { + if (cache[stream] == null) { + val map = ConcurrentHashMap() + cache.put(stream, map) + result = map + try { + config.getConnection().use { connection -> + connection.createStatement().use { stmt -> + stmt.executeQuery("SELECT hash, expires FROM Inventory " + + "WHERE expires > " + (now - 5 * MINUTE) + " AND stream = " + stream).use { rs -> + while (rs.next()) { + map.put(InventoryVector(rs.getBytes("hash")), rs.getLong("expires")) + } + } + } + } + } catch (e: SQLException) { + LOG.error(e.message, e) + } + } + } + } + return result!! + } + + override fun getMissing(offer: List, vararg streams: Long): List = offer - streams.flatMap { getCache(it).keys } + + override fun getObject(vector: InventoryVector): ObjectMessage? { + config.getConnection().use { connection -> + connection.createStatement().use { stmt -> + stmt.executeQuery("SELECT data, version FROM Inventory WHERE hash = X'$vector'").use { rs -> + if (rs.next()) { + val data = rs.getBlob("data") + return Factory.getObjectMessage(rs.getInt("version"), data.binaryStream, data.length().toInt()) + } else { + LOG.info("Object requested that we don't have. IV: " + vector) + return null + } + } + } + } + } + + override fun getObjects(stream: Long, version: Long, vararg types: ObjectType): List { + val query = StringBuilder("SELECT data, version FROM Inventory WHERE 1=1") + if (stream > 0) { + query.append(" AND stream = ").append(stream) + } + if (version > 0) { + query.append(" AND version = ").append(version) + } + if (types.isNotEmpty()) { + query.append(" AND type IN (").append(join(*types)).append(')') + } + config.getConnection().use { connection -> + connection.createStatement().use { stmt -> + stmt.executeQuery(query.toString()).use { rs -> + val result = LinkedList() + while (rs.next()) { + val data = rs.getBlob("data") + result.add(Factory.getObjectMessage(rs.getInt("version"), data.binaryStream, data.length().toInt())!!) + } + return result + } + } + } + } + + override fun storeObject(objectMessage: ObjectMessage) { + if (getCache(objectMessage.stream).containsKey(objectMessage.inventoryVector)) + return + + try { + config.getConnection().use { connection -> + connection.prepareStatement("INSERT INTO Inventory " + "(hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)").use { ps -> + val iv = objectMessage.inventoryVector + LOG.trace("Storing object " + iv) + ps.setBytes(1, iv.hash) + ps.setLong(2, objectMessage.stream) + ps.setLong(3, objectMessage.expiresTime) + JdbcHelper.Companion.writeBlob(ps, 4, objectMessage) + ps.setLong(5, objectMessage.type) + ps.setLong(6, objectMessage.version) + ps.executeUpdate() + getCache(objectMessage.stream).put(iv, objectMessage.expiresTime) + } + } + } catch (e: SQLException) { + LOG.debug("Error storing object of type " + objectMessage.payload.javaClass.simpleName, e) + } catch (e: Exception) { + LOG.error(e.message, e) + } + } + + override fun contains(objectMessage: ObjectMessage): Boolean { + return getCache(objectMessage.stream).any { (key, _) -> key == objectMessage.inventoryVector } + } + + override fun cleanup() { + try { + config.getConnection().use { connection -> + connection.createStatement().use { stmt -> + stmt.executeUpdate("DELETE FROM Inventory WHERE expires < " + (now - 5 * MINUTE)) + } + } + } catch (e: SQLException) { + LOG.debug(e.message, e) + } + + for (c in cache.values) { + c.entries.removeIf { e -> e.value < now - 5 * MINUTE } + } + } + + companion object { + private val LOG = LoggerFactory.getLogger(JdbcInventory::class.java) + } +} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.kt b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcMessageRepository.kt similarity index 96% rename from repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.kt rename to repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcMessageRepository.kt index 2f4f851..6daf32c 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.kt +++ b/repositories/src/main/kotlin/ch/dissem/bitmessage/repository/JdbcMessageRepository.kt @@ -21,7 +21,7 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector import ch.dissem.bitmessage.entity.valueobject.Label import ch.dissem.bitmessage.ports.AbstractMessageRepository import ch.dissem.bitmessage.ports.MessageRepository -import ch.dissem.bitmessage.repository.JdbcHelper.writeBlob +import ch.dissem.bitmessage.repository.JdbcHelper.Companion.writeBlob import org.slf4j.LoggerFactory import java.io.IOException import java.sql.Connection @@ -34,7 +34,7 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep override fun findLabels(where: String): List