diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java b/core/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java index e549101..f27384e 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/NetworkMessage.java @@ -19,7 +19,6 @@ package ch.dissem.bitmessage.entity; import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.utils.Encode; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; @@ -93,8 +92,31 @@ public class NetworkMessage implements Streamable { out.write(payloadBytes); } + /** + * A more efficient implementation of the write method, writing header data to the provided buffer and returning + * a new buffer containing the payload. + * + * @param headerBuffer where the header data is written to (24 bytes) + * @return a buffer containing the payload, ready to be read. + */ + public ByteBuffer writeHeaderAndGetPayloadBuffer(ByteBuffer headerBuffer) { + return ByteBuffer.wrap(writeHeader(headerBuffer)); + } + + /** + * For improved memory efficiency, you should use {@link #writeHeaderAndGetPayloadBuffer(ByteBuffer)} + * and write the header buffer as well as the returned payload buffer into the channel. + * + * @param buffer where everything gets written to. Needs to be large enough for the whole message + * to be written. + */ @Override - public void write(ByteBuffer out) { + public void write(ByteBuffer buffer) { + byte[] payloadBytes = writeHeader(buffer); + buffer.put(payloadBytes); + } + + private byte[] writeHeader(ByteBuffer out) { // magic Encode.int32(MAGIC, out); @@ -124,6 +146,6 @@ public class NetworkMessage implements Streamable { } // message payload - out.put(payloadBytes); + return payloadBytes; } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index ed0462d..647bc2c 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -35,13 +35,13 @@ import java.util.Set; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; -import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_MESSAGE_SIZE; /** * Represents the current state of a connection. */ public class ConnectionInfo extends AbstractConnection { - private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE); + private final ByteBuffer headerOut = ByteBuffer.allocate(24); + private ByteBuffer payloadOut; private V3MessageReader reader = new V3MessageReader(); private boolean syncFinished; private long lastUpdate = Long.MAX_VALUE; @@ -50,7 +50,7 @@ public class ConnectionInfo extends AbstractConnection { NetworkAddress node, NetworkHandler.MessageListener listener, Set commonRequestedObjects, long syncTimeout) { super(context, mode, node, listener, commonRequestedObjects, syncTimeout); - out.flip(); + headerOut.flip(); if (mode == CLIENT || mode == SYNC) { send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build()); } @@ -76,17 +76,23 @@ public class ConnectionInfo extends AbstractConnection { } public void updateWriter() { - if ((out == null || !out.hasRemaining()) && !sendingQueue.isEmpty()) { - out.clear(); + if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) { + headerOut.clear(); MessagePayload payload = sendingQueue.poll(); - new NetworkMessage(payload).write(out); - out.flip(); + payloadOut = new NetworkMessage(payload).writeHeaderAndGetPayloadBuffer(headerOut); + headerOut.flip(); lastUpdate = System.currentTimeMillis(); } } - public ByteBuffer getOutBuffer() { - return out; + public ByteBuffer[] getOutBuffers() { + return new ByteBuffer[]{headerOut, payloadOut}; + } + + public void cleanupBuffers() { + if (payloadOut != null && !payloadOut.hasRemaining()) { + payloadOut = null; + } } public void updateReader() { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index 7ccea71..069662b 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -36,10 +36,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*; import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; @@ -66,7 +63,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private InternalContext ctx; private Selector selector; private ServerSocketChannel serverChannel; - private Map connections = synchronizedMap(new WeakHashMap()); + private Map connections = new ConcurrentHashMap<>(); private int requestedObjectsCount; private Thread starter; @@ -81,13 +78,11 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), listener, new HashSet(), timeoutInSeconds); - connections.put(connection, null); while (channel.isConnected() && !connection.isSyncFinished()) { write(channel, connection); read(channel, connection); Thread.sleep(10); } - connections.remove(connection); LOG.info("Synchronization finished"); } return null; @@ -302,16 +297,21 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private static void write(SocketChannel channel, ConnectionInfo connection) throws IOException { - writeBuffer(connection.getOutBuffer(), channel); + writeBuffer(connection.getOutBuffers(), channel); connection.updateWriter(); - writeBuffer(connection.getOutBuffer(), channel); + writeBuffer(connection.getOutBuffers(), channel); + connection.cleanupBuffers(); } - private static void writeBuffer(ByteBuffer buffer, SocketChannel channel) throws IOException { - if (buffer != null && buffer.hasRemaining()) { - channel.write(buffer); + private static void writeBuffer(ByteBuffer[] buffers, SocketChannel channel) throws IOException { + if (buffers[1] == null) { + if (buffers[0].hasRemaining()) { + channel.write(buffers[0]); + } + } else if (buffers[1].hasRemaining() || buffers[0].hasRemaining()) { + channel.write(buffers); } }