🐘 Add settings to reduce memory usage

* NioNetworkHandler lets you tweak the minimum number of connections
* BufferPool can now set a limit to how many Buffers it retains.
  This one might still need some tweaking.
This commit is contained in:
Christian Basler 2018-06-12 07:00:52 +02:00
parent 37cda3df56
commit fe9fa0ba2f
5 changed files with 113 additions and 41 deletions

View File

@ -18,9 +18,11 @@ package ch.dissem.bitmessage.factory
import ch.dissem.bitmessage.constants.Network.HEADER_SIZE import ch.dissem.bitmessage.constants.Network.HEADER_SIZE
import ch.dissem.bitmessage.constants.Network.MAX_PAYLOAD_SIZE import ch.dissem.bitmessage.constants.Network.MAX_PAYLOAD_SIZE
import ch.dissem.bitmessage.exception.NodeException
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.* import java.util.*
import kotlin.math.max
/** /**
* A pool for [ByteBuffer]s. As they may use up a lot of memory, * A pool for [ByteBuffer]s. As they may use up a lot of memory,
@ -29,22 +31,44 @@ import java.util.*
object BufferPool { object BufferPool {
private val LOG = LoggerFactory.getLogger(BufferPool::class.java) private val LOG = LoggerFactory.getLogger(BufferPool::class.java)
private var limit: Int? = null
private var strictLimit = false
/**
* Sets a limit to how many buffers the pool handles. If strict is set to true, it will not issue any
* buffers once the limit is reached and will throw a NodeException instead. Otherwise, it will simply
* ignore returned buffers once the limit is reached (and therefore garbage collected)
*/
fun setLimit(limit: Int, strict: Boolean = false) {
this.limit = limit
this.strictLimit = strict
pools.values.forEach { it.limit = limit }
pools[HEADER_SIZE]!!.limit = 2 * limit
pools[MAX_PAYLOAD_SIZE]!!.limit = max(limit / 2, 1)
}
private val pools = mapOf( private val pools = mapOf(
HEADER_SIZE to Stack<ByteBuffer>(), HEADER_SIZE to Pool(),
54 to Stack<ByteBuffer>(), 54 to Pool(),
1000 to Stack<ByteBuffer>(), 1000 to Pool(),
60000 to Stack<ByteBuffer>(), 60000 to Pool(),
MAX_PAYLOAD_SIZE to Stack<ByteBuffer>() MAX_PAYLOAD_SIZE to Pool()
) )
@Synchronized fun allocate(capacity: Int): ByteBuffer { @Synchronized
fun allocate(capacity: Int): ByteBuffer {
val targetSize = getTargetSize(capacity) val targetSize = getTargetSize(capacity)
val pool = pools[targetSize] ?: throw IllegalStateException("No pool for size $targetSize available") val pool = pools[targetSize] ?: throw IllegalStateException("No pool for size $targetSize available")
if (pool.isEmpty()) {
LOG.trace("Creating new buffer of size $targetSize") return if (pool.isEmpty) {
return ByteBuffer.allocate(targetSize) if (pool.hasCapacity || !strictLimit) {
LOG.trace("Creating new buffer of size $targetSize")
ByteBuffer.allocate(targetSize)
} else {
throw NodeException("pool limit for capacity $capacity is reached")
}
} else { } else {
return pool.pop() pool.pop()
} }
} }
@ -53,18 +77,26 @@ object BufferPool {
* @return a buffer of size 24 * @return a buffer of size 24
*/ */
@Synchronized fun allocateHeaderBuffer(): ByteBuffer { @Synchronized
val pool = pools[HEADER_SIZE] fun allocateHeaderBuffer(): ByteBuffer {
if (pool == null || pool.isEmpty()) { val pool = pools[HEADER_SIZE] ?: throw IllegalStateException("No pool for header available")
return ByteBuffer.allocate(HEADER_SIZE) return if (pool.isEmpty) {
if (pool.hasCapacity || !strictLimit) {
LOG.trace("Creating new buffer of header")
ByteBuffer.allocate(HEADER_SIZE)
} else {
throw NodeException("pool limit for header buffer is reached")
}
} else { } else {
return pool.pop() pool.pop()
} }
} }
@Synchronized fun deallocate(buffer: ByteBuffer) { @Synchronized
fun deallocate(buffer: ByteBuffer) {
buffer.clear() buffer.clear()
val pool = pools[buffer.capacity()] ?: throw IllegalArgumentException("Illegal buffer capacity ${buffer.capacity()} one of ${pools.keys} expected.") val pool = pools[buffer.capacity()]
?: throw IllegalArgumentException("Illegal buffer capacity ${buffer.capacity()} one of ${pools.keys} expected.")
pool.push(buffer) pool.push(buffer)
} }
@ -74,4 +106,40 @@ object BufferPool {
} }
throw IllegalArgumentException("Requested capacity too large: requested=$capacity; max=$MAX_PAYLOAD_SIZE") throw IllegalArgumentException("Requested capacity too large: requested=$capacity; max=$MAX_PAYLOAD_SIZE")
} }
/**
* There is a race condition where the limit could be ignored for an allocation, but I think the consequences
* are benign.
*/
class Pool {
private val stack = Stack<ByteBuffer>()
private var capacity = 0
internal var limit: Int? = null
set(value) {
capacity = value ?: 0
field = value
}
val isEmpty
get() = stack.isEmpty()
val hasCapacity
@Synchronized
get() = limit == null || capacity > 0
@Synchronized
fun pop(): ByteBuffer {
capacity--
return stack.pop()
}
@Synchronized
fun push(buffer: ByteBuffer) {
if (hasCapacity) {
stack.push(buffer)
}
// else, let it be collected by the garbage collector
capacity++
}
}
} }

View File

@ -90,11 +90,11 @@ class V3MessageReader {
state = ReaderState.DATA state = ReaderState.DATA
this.headerBuffer = null this.headerBuffer = null
BufferPool.deallocate(headerBuffer) BufferPool.deallocate(headerBuffer)
val dataBuffer = BufferPool.allocate(length) this.dataBuffer = BufferPool.allocate(length).apply {
this.dataBuffer = dataBuffer clear()
dataBuffer.clear() limit(length)
dataBuffer.limit(length) data(this)
data(dataBuffer) }
} }
private fun data(dataBuffer: ByteBuffer) { private fun data(dataBuffer: ByteBuffer) {

View File

@ -58,7 +58,7 @@ class Connection(
private var lastObjectTime: Long = 0 private var lastObjectTime: Long = 0
lateinit var streams: LongArray lateinit var streams: LongArray
protected set private set
@Volatile var state = State.CONNECTING @Volatile var state = State.CONNECTING
private set private set

View File

@ -16,6 +16,7 @@
package ch.dissem.bitmessage.networking.nio package ch.dissem.bitmessage.networking.nio
import ch.dissem.bitmessage.constants.Network.HEADER_SIZE
import ch.dissem.bitmessage.entity.GetData import ch.dissem.bitmessage.entity.GetData
import ch.dissem.bitmessage.entity.MessagePayload import ch.dissem.bitmessage.entity.MessagePayload
import ch.dissem.bitmessage.entity.NetworkMessage import ch.dissem.bitmessage.entity.NetworkMessage
@ -39,7 +40,7 @@ class ConnectionIO(
private val getState: () -> Connection.State, private val getState: () -> Connection.State,
private val handleMessage: (MessagePayload) -> Unit private val handleMessage: (MessagePayload) -> Unit
) { ) {
private val headerOut: ByteBuffer = ByteBuffer.allocate(24) private val headerOut: ByteBuffer = ByteBuffer.allocate(HEADER_SIZE)
private var payloadOut: ByteBuffer? = null private var payloadOut: ByteBuffer? = null
private var reader: V3MessageReader? = V3MessageReader() private var reader: V3MessageReader? = V3MessageReader()
internal val sendingQueue: Deque<MessagePayload> = ConcurrentLinkedDeque<MessagePayload>() internal val sendingQueue: Deque<MessagePayload> = ConcurrentLinkedDeque<MessagePayload>()

View File

@ -17,7 +17,6 @@
package ch.dissem.bitmessage.networking.nio package ch.dissem.bitmessage.networking.nio
import ch.dissem.bitmessage.InternalContext import ch.dissem.bitmessage.InternalContext
import ch.dissem.bitmessage.constants.Network.HEADER_SIZE
import ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER import ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER
import ch.dissem.bitmessage.entity.CustomMessage import ch.dissem.bitmessage.entity.CustomMessage
import ch.dissem.bitmessage.entity.GetData import ch.dissem.bitmessage.entity.GetData
@ -25,6 +24,7 @@ import ch.dissem.bitmessage.entity.NetworkMessage
import ch.dissem.bitmessage.entity.valueobject.InventoryVector import ch.dissem.bitmessage.entity.valueobject.InventoryVector
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress import ch.dissem.bitmessage.entity.valueobject.NetworkAddress
import ch.dissem.bitmessage.exception.NodeException import ch.dissem.bitmessage.exception.NodeException
import ch.dissem.bitmessage.factory.BufferPool
import ch.dissem.bitmessage.factory.V3MessageReader import ch.dissem.bitmessage.factory.V3MessageReader
import ch.dissem.bitmessage.networking.nio.Connection.Mode.* import ch.dissem.bitmessage.networking.nio.Connection.Mode.*
import ch.dissem.bitmessage.ports.NetworkHandler import ch.dissem.bitmessage.ports.NetworkHandler
@ -48,7 +48,8 @@ import java.util.concurrent.*
/** /**
* Network handler using java.nio, resulting in less threads. * Network handler using java.nio, resulting in less threads.
*/ */
class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder { class NioNetworkHandler(private val magicNetworkNumber: Int = NETWORK_MAGIC_NUMBER) : NetworkHandler,
InternalContext.ContextHolder {
private val threadPool = Executors.newCachedThreadPool( private val threadPool = Executors.newCachedThreadPool(
pool("network") pool("network")
@ -93,12 +94,13 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
override fun send(server: InetAddress, port: Int, request: CustomMessage): CustomMessage { override fun send(server: InetAddress, port: Int, request: CustomMessage): CustomMessage {
SocketChannel.open(InetSocketAddress(server, port)).use { channel -> SocketChannel.open(InetSocketAddress(server, port)).use { channel ->
channel.configureBlocking(true) channel.configureBlocking(true)
val headerBuffer = ByteBuffer.allocate(HEADER_SIZE) val headerBuffer = BufferPool.allocateHeaderBuffer()
val payloadBuffer = NetworkMessage(request).writer().writeHeaderAndGetPayloadBuffer(headerBuffer) val payloadBuffer = NetworkMessage(request).writer().writeHeaderAndGetPayloadBuffer(headerBuffer)
headerBuffer.flip() headerBuffer.flip()
while (headerBuffer.hasRemaining()) { while (headerBuffer.hasRemaining()) {
channel.write(headerBuffer) channel.write(headerBuffer)
} }
BufferPool.deallocate(headerBuffer)
while (payloadBuffer.hasRemaining()) { while (payloadBuffer.hasRemaining()) {
channel.write(payloadBuffer) channel.write(payloadBuffer)
} }
@ -108,12 +110,14 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
if (channel.read(reader.getActiveBuffer()) > 0) { if (channel.read(reader.getActiveBuffer()) > 0) {
reader.update() reader.update()
} else { } else {
reader.cleanup()
throw NodeException("No response from node $server") throw NodeException("No response from node $server")
} }
} }
val networkMessage: NetworkMessage? val networkMessage: NetworkMessage?
if (reader.getMessages().isEmpty()) { if (reader.getMessages().isEmpty()) {
throw NodeException("No response from node " + server) reader.cleanup()
throw NodeException("No response from node $server")
} else { } else {
networkMessage = reader.getMessages().first() networkMessage = reader.getMessages().first()
} }
@ -121,13 +125,14 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
if (networkMessage.payload is CustomMessage) { if (networkMessage.payload is CustomMessage) {
return networkMessage.payload as CustomMessage return networkMessage.payload as CustomMessage
} else { } else {
reader.cleanup()
throw NodeException("Unexpected response from node $server: ${networkMessage.payload.javaClass}") throw NodeException("Unexpected response from node $server: ${networkMessage.payload.javaClass}")
} }
} }
} }
override fun start() { override fun start() {
if (selector?.isOpen ?: false) { if (selector?.isOpen == true) {
throw IllegalStateException("Network already running - you need to stop first.") throw IllegalStateException("Network already running - you need to stop first.")
} }
val selector = Selector.open() val selector = Selector.open()
@ -137,7 +142,7 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
starter = thread("connection manager") { starter = thread("connection manager") {
while (selector.isOpen) { while (selector.isOpen) {
var missing = NETWORK_MAGIC_NUMBER var missing = magicNetworkNumber
for ((connection, _) in connections) { for ((connection, _) in connections) {
if (connection.state == Connection.State.ACTIVE) { if (connection.state == Connection.State.ACTIVE) {
missing-- missing--
@ -229,10 +234,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
), ),
requestedObjects, 0 requestedObjects, 0
) )
connections.put( connections[connection] =
connection,
accepted.register(selector, OP_READ or OP_WRITE, connection) accepted.register(selector, OP_READ or OP_WRITE, connection)
)
} catch (e: AsynchronousCloseException) { } catch (e: AsynchronousCloseException) {
LOG.trace(e.message) LOG.trace(e.message)
} catch (e: IOException) { } catch (e: IOException) {
@ -260,13 +263,13 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
if (key.isReadable) { if (key.isReadable) {
read(channel, connection.io) read(channel, connection.io)
} }
if (connection.state == Connection.State.DISCONNECTED) { when {
key.interestOps(0) connection.state == Connection.State.DISCONNECTED -> {
channel.close() key.interestOps(0)
} else if (connection.io.isWritePending) { channel.close()
key.interestOps(OP_READ or OP_WRITE) }
} else { connection.io.isWritePending -> key.interestOps(OP_READ or OP_WRITE)
key.interestOps(OP_READ) else -> key.interestOps(OP_READ)
} }
} catch (e: CancelledKeyException) { } catch (e: CancelledKeyException) {
connection.disconnect() connection.disconnect()
@ -361,7 +364,7 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
override fun offer(iv: InventoryVector) { override fun offer(iv: InventoryVector) {
val targetConnections = connections.keys.filter { it.state == Connection.State.ACTIVE && !it.knowsOf(iv) } val targetConnections = connections.keys.filter { it.state == Connection.State.ACTIVE && !it.knowsOf(iv) }
selectRandom(NETWORK_MAGIC_NUMBER, targetConnections).forEach { it.offer(iv) } selectRandom(magicNetworkNumber, targetConnections).forEach { it.offer(iv) }
} }
override fun request(inventoryVectors: MutableCollection<InventoryVector>) { override fun request(inventoryVectors: MutableCollection<InventoryVector>) {