Fixed synchronisation

This commit is contained in:
Christian Basler 2016-07-09 16:37:12 +02:00
parent d130080df2
commit 50f2c7e080
3 changed files with 14 additions and 10 deletions

View File

@ -61,7 +61,7 @@ public abstract class AbstractConnection {
protected long lastObjectTime; protected long lastObjectTime;
private final long syncTimeout; private final long syncTimeout;
private int readTimeoutCounter; private long syncReadTimeout = Long.MAX_VALUE;
protected long peerNonce; protected long peerNonce;
protected int version; protected int version;
@ -305,16 +305,13 @@ public abstract class AbstractConnection {
return true; return true;
} }
if (!sendingQueue.isEmpty()) { if (!sendingQueue.isEmpty()) {
syncReadTimeout = System.currentTimeMillis() + 1000;
return false; return false;
} }
if (msg == null) { if (msg == null) {
if (requestedObjects.isEmpty()) return syncReadTimeout < System.currentTimeMillis();
return true;
readTimeoutCounter++;
return readTimeoutCounter > 1;
} else { } else {
readTimeoutCounter = 0; syncReadTimeout = System.currentTimeMillis() + 1000;
return false; return false;
} }
} }

View File

@ -77,7 +77,6 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
public Void call() throws Exception { public Void call() throws Exception {
Set<InventoryVector> requestedObjects = new HashSet<>(); Set<InventoryVector> requestedObjects = new HashSet<>();
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) { try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.finishConnect();
channel.configureBlocking(false); channel.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
@ -88,6 +87,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
read(channel, connection); read(channel, connection);
Thread.sleep(10); Thread.sleep(10);
} }
connections.remove(connection);
LOG.info("Synchronization finished"); LOG.info("Synchronization finished");
} }
return null; return null;
@ -225,7 +225,6 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
while (selector.isOpen()) { while (selector.isOpen()) {
selector.select(1000); selector.select(1000);
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next(); SelectionKey key = keyIterator.next();
if (key.attachment() instanceof ConnectionInfo) { if (key.attachment() instanceof ConnectionInfo) {
@ -253,6 +252,14 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
keyIterator.remove(); keyIterator.remove();
} }
for (SelectionKey key : selector.keys()) {
if ((key.interestOps() & OP_WRITE) == 0) {
if (key.attachment() instanceof ConnectionInfo &&
!((ConnectionInfo) key.attachment()).getSendingQueue().isEmpty()) {
key.interestOps(OP_READ | OP_WRITE);
}
}
}
} }
selector.close(); selector.close();
} catch (ClosedSelectorException ignore) { } catch (ClosedSelectorException ignore) {

View File

@ -254,7 +254,7 @@ public class NetworkHandlerTest {
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
mock(NetworkHandler.MessageListener.class), mock(NetworkHandler.MessageListener.class),
100); 10);
future.get(); future.get();
assertInventorySize(1, nodeInventory); assertInventorySize(1, nodeInventory);
assertInventorySize(1, peerInventory); assertInventorySize(1, peerInventory);