Better memory management for the out buffer

This commit is contained in:
Christian Basler 2016-07-27 07:38:39 +02:00
parent 48ff975ffd
commit 56ebb7b8fa
3 changed files with 52 additions and 24 deletions

View File

@ -19,7 +19,6 @@ package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.utils.Encode; import ch.dissem.bitmessage.utils.Encode;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
@ -93,8 +92,31 @@ public class NetworkMessage implements Streamable {
out.write(payloadBytes); out.write(payloadBytes);
} }
/**
* A more efficient implementation of the write method, writing header data to the provided buffer and returning
* a new buffer containing the payload.
*
* @param headerBuffer where the header data is written to (24 bytes)
* @return a buffer containing the payload, ready to be read.
*/
public ByteBuffer writeHeaderAndGetPayloadBuffer(ByteBuffer headerBuffer) {
return ByteBuffer.wrap(writeHeader(headerBuffer));
}
/**
* For improved memory efficiency, you should use {@link #writeHeaderAndGetPayloadBuffer(ByteBuffer)}
* and write the header buffer as well as the returned payload buffer into the channel.
*
* @param buffer where everything gets written to. Needs to be large enough for the whole message
* to be written.
*/
@Override @Override
public void write(ByteBuffer out) { public void write(ByteBuffer buffer) {
byte[] payloadBytes = writeHeader(buffer);
buffer.put(payloadBytes);
}
private byte[] writeHeader(ByteBuffer out) {
// magic // magic
Encode.int32(MAGIC, out); Encode.int32(MAGIC, out);
@ -124,6 +146,6 @@ public class NetworkMessage implements Streamable {
} }
// message payload // message payload
out.put(payloadBytes); return payloadBytes;
} }
} }

View File

@ -35,13 +35,13 @@ import java.util.Set;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_MESSAGE_SIZE;
/** /**
* Represents the current state of a connection. * Represents the current state of a connection.
*/ */
public class ConnectionInfo extends AbstractConnection { public class ConnectionInfo extends AbstractConnection {
private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private final ByteBuffer headerOut = ByteBuffer.allocate(24);
private ByteBuffer payloadOut;
private V3MessageReader reader = new V3MessageReader(); private V3MessageReader reader = new V3MessageReader();
private boolean syncFinished; private boolean syncFinished;
private long lastUpdate = Long.MAX_VALUE; private long lastUpdate = Long.MAX_VALUE;
@ -50,7 +50,7 @@ public class ConnectionInfo extends AbstractConnection {
NetworkAddress node, NetworkHandler.MessageListener listener, NetworkAddress node, NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, long syncTimeout) {
super(context, mode, node, listener, commonRequestedObjects, syncTimeout); super(context, mode, node, listener, commonRequestedObjects, syncTimeout);
out.flip(); headerOut.flip();
if (mode == CLIENT || mode == SYNC) { if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build());
} }
@ -76,17 +76,23 @@ public class ConnectionInfo extends AbstractConnection {
} }
public void updateWriter() { public void updateWriter() {
if ((out == null || !out.hasRemaining()) && !sendingQueue.isEmpty()) { if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) {
out.clear(); headerOut.clear();
MessagePayload payload = sendingQueue.poll(); MessagePayload payload = sendingQueue.poll();
new NetworkMessage(payload).write(out); payloadOut = new NetworkMessage(payload).writeHeaderAndGetPayloadBuffer(headerOut);
out.flip(); headerOut.flip();
lastUpdate = System.currentTimeMillis(); lastUpdate = System.currentTimeMillis();
} }
} }
public ByteBuffer getOutBuffer() { public ByteBuffer[] getOutBuffers() {
return out; return new ByteBuffer[]{headerOut, payloadOut};
}
public void cleanupBuffers() {
if (payloadOut != null && !payloadOut.hasRemaining()) {
payloadOut = null;
}
} }
public void updateReader() { public void updateReader() {

View File

@ -36,10 +36,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
@ -66,7 +63,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
private InternalContext ctx; private InternalContext ctx;
private Selector selector; private Selector selector;
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private Map<ConnectionInfo, SelectionKey> connections = synchronizedMap(new WeakHashMap<ConnectionInfo, SelectionKey>()); private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>();
private int requestedObjectsCount; private int requestedObjectsCount;
private Thread starter; private Thread starter;
@ -81,13 +78,11 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
listener, new HashSet<InventoryVector>(), timeoutInSeconds); listener, new HashSet<InventoryVector>(), timeoutInSeconds);
connections.put(connection, null);
while (channel.isConnected() && !connection.isSyncFinished()) { while (channel.isConnected() && !connection.isSyncFinished()) {
write(channel, connection); write(channel, connection);
read(channel, connection); read(channel, connection);
Thread.sleep(10); Thread.sleep(10);
} }
connections.remove(connection);
LOG.info("Synchronization finished"); LOG.info("Synchronization finished");
} }
return null; return null;
@ -302,16 +297,21 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
private static void write(SocketChannel channel, ConnectionInfo connection) private static void write(SocketChannel channel, ConnectionInfo connection)
throws IOException { throws IOException {
writeBuffer(connection.getOutBuffer(), channel); writeBuffer(connection.getOutBuffers(), channel);
connection.updateWriter(); connection.updateWriter();
writeBuffer(connection.getOutBuffer(), channel); writeBuffer(connection.getOutBuffers(), channel);
connection.cleanupBuffers();
} }
private static void writeBuffer(ByteBuffer buffer, SocketChannel channel) throws IOException { private static void writeBuffer(ByteBuffer[] buffers, SocketChannel channel) throws IOException {
if (buffer != null && buffer.hasRemaining()) { if (buffers[1] == null) {
channel.write(buffer); if (buffers[0].hasRemaining()) {
channel.write(buffers[0]);
}
} else if (buffers[1].hasRemaining() || buffers[0].hasRemaining()) {
channel.write(buffers);
} }
} }