Minor improvements and fixes
This commit is contained in:
parent
dad05d835b
commit
a240606909
@ -47,14 +47,14 @@ class BufferPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public synchronized ByteBuffer allocate(int capacity) {
|
public synchronized ByteBuffer allocate(int capacity) {
|
||||||
for (Map.Entry<Integer, Stack<ByteBuffer>> e : pools.entrySet()) {
|
|
||||||
if (e.getKey() >= capacity && !e.getValue().isEmpty()) {
|
|
||||||
return e.getValue().pop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Integer targetSize = getTargetSize(capacity);
|
Integer targetSize = getTargetSize(capacity);
|
||||||
LOG.debug("Creating new buffer of size " + targetSize);
|
Stack<ByteBuffer> pool = pools.get(targetSize);
|
||||||
return ByteBuffer.allocate(targetSize);
|
if (pool.isEmpty()) {
|
||||||
|
LOG.trace("Creating new buffer of size " + targetSize);
|
||||||
|
return ByteBuffer.allocate(targetSize);
|
||||||
|
} else {
|
||||||
|
return pool.pop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -64,21 +64,22 @@ class BufferPool {
|
|||||||
*/
|
*/
|
||||||
public synchronized ByteBuffer allocateHeaderBuffer() {
|
public synchronized ByteBuffer allocateHeaderBuffer() {
|
||||||
Stack<ByteBuffer> pool = pools.get(HEADER_SIZE);
|
Stack<ByteBuffer> pool = pools.get(HEADER_SIZE);
|
||||||
if (!pool.isEmpty()) {
|
if (pool.isEmpty()) {
|
||||||
return pool.pop();
|
|
||||||
} else {
|
|
||||||
return ByteBuffer.allocate(HEADER_SIZE);
|
return ByteBuffer.allocate(HEADER_SIZE);
|
||||||
|
} else {
|
||||||
|
return pool.pop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void deallocate(ByteBuffer buffer) {
|
public synchronized void deallocate(ByteBuffer buffer) {
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
|
Stack<ByteBuffer> pool = pools.get(buffer.capacity());
|
||||||
if (!pools.keySet().contains(buffer.capacity())) {
|
if (pool == null) {
|
||||||
throw new IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() +
|
throw new IllegalArgumentException("Illegal buffer capacity " + buffer.capacity() +
|
||||||
" one of " + pools.keySet() + " expected.");
|
" one of " + pools.keySet() + " expected.");
|
||||||
|
} else {
|
||||||
|
pool.push(buffer);
|
||||||
}
|
}
|
||||||
pools.get(buffer.capacity()).push(buffer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Integer getTargetSize(int capacity) {
|
private Integer getTargetSize(int capacity) {
|
||||||
|
@ -145,7 +145,7 @@ public abstract class AbstractConnection {
|
|||||||
updateIvCache(inv.getInventory());
|
updateIvCache(inv.getInventory());
|
||||||
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
|
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
|
||||||
missing.removeAll(commonRequestedObjects);
|
missing.removeAll(commonRequestedObjects);
|
||||||
LOG.debug("Received inventory with " + originalSize + " elements, of which are "
|
LOG.trace("Received inventory with " + originalSize + " elements, of which are "
|
||||||
+ missing.size() + " missing.");
|
+ missing.size() + " missing.");
|
||||||
send(new GetData.Builder().inventory(missing).build());
|
send(new GetData.Builder().inventory(missing).build());
|
||||||
}
|
}
|
||||||
|
@ -135,6 +135,7 @@ public class ConnectionInfo extends AbstractConnection {
|
|||||||
reader.cleanup();
|
reader.cleanup();
|
||||||
reader = null;
|
reader = null;
|
||||||
}
|
}
|
||||||
|
payloadOut = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isSyncFinished() {
|
public boolean isSyncFinished() {
|
||||||
|
@ -239,8 +239,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
|
|||||||
}
|
}
|
||||||
e.getValue().cancel();
|
e.getValue().cancel();
|
||||||
e.getValue().attach(null);
|
e.getValue().attach(null);
|
||||||
it.remove();
|
|
||||||
e.getKey().disconnect();
|
e.getKey().disconnect();
|
||||||
|
it.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
@ -387,12 +387,20 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
|
|||||||
distribution.put(connection, new LinkedList<InventoryVector>());
|
distribution.put(connection, new LinkedList<InventoryVector>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (distribution.isEmpty()){
|
||||||
|
return;
|
||||||
|
}
|
||||||
InventoryVector next = iterator.next();
|
InventoryVector next = iterator.next();
|
||||||
ConnectionInfo previous = null;
|
ConnectionInfo previous = null;
|
||||||
do {
|
do {
|
||||||
for (ConnectionInfo connection : distribution.keySet()) {
|
for (ConnectionInfo connection : distribution.keySet()) {
|
||||||
if (connection == previous || previous == null) {
|
if (connection == previous || previous == null) {
|
||||||
next = iterator.next();
|
if (iterator.hasNext()) {
|
||||||
|
previous = connection;
|
||||||
|
next = iterator.next();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (connection.knowsOf(next)) {
|
if (connection.knowsOf(next)) {
|
||||||
List<InventoryVector> ivs = distribution.get(connection);
|
List<InventoryVector> ivs = distribution.get(connection);
|
||||||
|
@ -117,7 +117,7 @@ public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry {
|
|||||||
public void offerAddresses(List<NetworkAddress> nodes) {
|
public void offerAddresses(List<NetworkAddress> nodes) {
|
||||||
cleanUp();
|
cleanUp();
|
||||||
nodes.stream()
|
nodes.stream()
|
||||||
.filter(node -> node.getTime() < now(+24 * HOUR) && node.getTime() > now(-28 * DAY))
|
.filter(node -> node.getTime() < now(+2 * MINUTE) && node.getTime() > now(-28 * DAY))
|
||||||
.forEach(node -> {
|
.forEach(node -> {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
NetworkAddress existing = loadExisting(node);
|
NetworkAddress existing = loadExisting(node);
|
||||||
|
Loading…
Reference in New Issue
Block a user