Tests with NioNetworkHandler as peer work now

This commit is contained in:
Christian Basler 2016-06-18 23:09:23 +02:00
parent 0fadb40c6c
commit ae2120675f
5 changed files with 64 additions and 49 deletions

View File

@ -60,6 +60,9 @@ public abstract class AbstractConnection {
protected volatile State state; protected volatile State state;
protected long lastObjectTime; protected long lastObjectTime;
private final long syncTimeout;
private int readTimeoutCounter;
protected long peerNonce; protected long peerNonce;
protected int version; protected int version;
protected long[] streams; protected long[] streams;
@ -70,12 +73,13 @@ public abstract class AbstractConnection {
NetworkAddress node, NetworkAddress node,
NetworkHandler.MessageListener listener, NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> commonRequestedObjects,
boolean threadsafe) { long syncTimeout, boolean threadsafe) {
this.ctx = context; this.ctx = context;
this.mode = mode; this.mode = mode;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node; this.node = node;
this.listener = listener; this.listener = listener;
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
if (threadsafe) { if (threadsafe) {
this.ivCache = new ConcurrentHashMap<>(); this.ivCache = new ConcurrentHashMap<>();
this.sendingQueue = new ConcurrentLinkedDeque<>(); this.sendingQueue = new ConcurrentLinkedDeque<>();
@ -107,6 +111,9 @@ public abstract class AbstractConnection {
receiveMessage(payload); receiveMessage(payload);
break; break;
case DISCONNECTED:
break;
default: default:
handleCommand(payload); handleCommand(payload);
break; break;
@ -283,6 +290,33 @@ public abstract class AbstractConnection {
} }
} }
@SuppressWarnings("RedundantIfStatement")
protected boolean syncFinished(NetworkMessage msg) {
if (mode != SYNC) {
return false;
}
if (Thread.interrupted()) {
return true;
}
if (state != ACTIVE) {
return false;
}
if (syncTimeout < UnixTime.now()) {
LOG.info("Synchronization timed out");
return true;
}
if (msg == null) {
if (requestedObjects.isEmpty() && sendingQueue.isEmpty())
return true;
readTimeoutCounter++;
return readTimeoutCounter > 1;
} else {
readTimeoutCounter = 0;
return false;
}
}
public void disconnect() { public void disconnect() {
state = DISCONNECTED; state = DISCONNECTED;

View File

@ -56,13 +56,11 @@ class Connection extends AbstractConnection {
private final long startTime; private final long startTime;
private final Socket socket; private final Socket socket;
private final long syncTimeout;
private final ReaderRunnable reader = new ReaderRunnable(); private final ReaderRunnable reader = new ReaderRunnable();
private final WriterRunnable writer = new WriterRunnable(); private final WriterRunnable writer = new WriterRunnable();
private InputStream in; private InputStream in;
private OutputStream out; private OutputStream out;
private int readTimeoutCounter;
private boolean socketInitialized; private boolean socketInitialized;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
@ -80,10 +78,9 @@ class Connection extends AbstractConnection {
private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket,
Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) {
super(context, mode, node, listener, commonRequestedObjects, true); super(context, mode, node, listener, commonRequestedObjects, syncTimeout, true);
this.startTime = UnixTime.now(); this.startTime = UnixTime.now();
this.socket = socket; this.socket = socket;
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
} }
public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener,
@ -110,33 +107,6 @@ class Connection extends AbstractConnection {
return node; return node;
} }
@SuppressWarnings("RedundantIfStatement")
private boolean syncFinished(NetworkMessage msg) {
if (mode != SYNC) {
return false;
}
if (Thread.interrupted()) {
return true;
}
if (state != ACTIVE) {
return false;
}
if (syncTimeout < UnixTime.now()) {
LOG.info("Synchronization timed out");
return true;
}
if (msg == null) {
if (requestedObjects.isEmpty() && sendingQueue.isEmpty())
return true;
readTimeoutCounter++;
return readTimeoutCounter > 1;
} else {
readTimeoutCounter = 0;
return false;
}
}
@Override @Override
protected void send(MessagePayload payload) { protected void send(MessagePayload payload) {
try { try {

View File

@ -42,11 +42,12 @@ public class ConnectionInfo extends AbstractConnection {
private ByteBuffer in = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private ByteBuffer in = ByteBuffer.allocate(MAX_MESSAGE_SIZE);
private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE); private ByteBuffer out = ByteBuffer.allocate(MAX_MESSAGE_SIZE);
private V3MessageReader reader = new V3MessageReader(); private V3MessageReader reader = new V3MessageReader();
private boolean syncFinished;
public ConnectionInfo(InternalContext context, Mode mode, public ConnectionInfo(InternalContext context, Mode mode,
NetworkAddress node, NetworkHandler.MessageListener listener, NetworkAddress node, NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects) { Set<InventoryVector> commonRequestedObjects, long syncTimeout) {
super(context, mode, node, listener, commonRequestedObjects, false); super(context, mode, node, listener, commonRequestedObjects, syncTimeout, false);
out.flip(); out.flip();
if (mode == CLIENT || mode == SYNC) { if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build());
@ -77,14 +78,20 @@ public class ConnectionInfo extends AbstractConnection {
reader.update(in); reader.update(in);
if (!reader.getMessages().isEmpty()) { if (!reader.getMessages().isEmpty()) {
Iterator<NetworkMessage> iterator = reader.getMessages().iterator(); Iterator<NetworkMessage> iterator = reader.getMessages().iterator();
NetworkMessage msg = null;
while (iterator.hasNext()) { while (iterator.hasNext()) {
NetworkMessage msg = iterator.next(); msg = iterator.next();
handleMessage(msg.getPayload()); handleMessage(msg.getPayload());
iterator.remove(); iterator.remove();
} }
syncFinished = syncFinished(msg);
} }
} }
public boolean isSyncFinished() {
return syncFinished;
}
@Override @Override
protected void send(MessagePayload payload) { protected void send(MessagePayload payload) {
sendingQueue.add(payload); sendingQueue.add(payload);

View File

@ -43,6 +43,7 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_READ;
@ -65,7 +66,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
@Override @Override
public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, long timeoutInSeconds) { public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) {
return pool.submit(new Callable<Void>() { return pool.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
@ -75,11 +76,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
channel.configureBlocking(false); channel.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
listener, new HashSet<InventoryVector>()); listener, new HashSet<InventoryVector>(), timeoutInSeconds);
while (channel.isConnected() && while (channel.isConnected() &&
(connection.getState() != ACTIVE (connection.getState() != ACTIVE || connection.isSyncFinished())) {
|| connection.getSendingQueue().isEmpty()
|| requestedObjects.isEmpty())) {
write(requestedObjects, channel, connection); write(requestedObjects, channel, connection);
read(channel, connection); read(channel, connection);
Thread.sleep(10); Thread.sleep(10);
@ -138,7 +137,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
throw new ApplicationException(e); throw new ApplicationException(e);
} }
final Set<InventoryVector> requestedObjects = new HashSet<>(); final Set<InventoryVector> requestedObjects = new HashSet<>();
start("connection listener", new Runnable() { thread("connection listener", new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
@ -152,9 +151,10 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
new ConnectionInfo(ctx, SERVER, new ConnectionInfo(ctx, SERVER,
new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(),
listener, listener,
requestedObjects requestedObjects, 0
)); ));
} catch (AsynchronousCloseException ignore) { } catch (AsynchronousCloseException ignore) {
LOG.trace(ignore.getMessage());
} catch (IOException e) { } catch (IOException e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
} }
@ -169,7 +169,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
}); });
start("connection starter", new Runnable() { thread("connection starter", new Runnable() {
@Override @Override
public void run() { public void run() {
while (selector.isOpen()) { while (selector.isOpen()) {
@ -184,7 +184,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
new ConnectionInfo(ctx, CLIENT, new ConnectionInfo(ctx, CLIENT,
address, address,
listener, listener,
requestedObjects requestedObjects, 0
)); ));
} catch (AsynchronousCloseException ignore) { } catch (AsynchronousCloseException ignore) {
} catch (IOException e) { } catch (IOException e) {
@ -200,7 +200,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
}); });
start("processor", new Runnable() { thread("processor", new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
@ -220,7 +220,12 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
read(channel, connection); read(channel, connection);
} }
if (connection.getSendingQueue().isEmpty()) { if (connection.getSendingQueue().isEmpty()) {
if (connection.getState() == DISCONNECTED) {
key.interestOps(0);
key.channel().close();
} else {
key.interestOps(OP_READ); key.interestOps(OP_READ);
}
} else { } else {
key.interestOps(OP_READ | OP_WRITE); key.interestOps(OP_READ | OP_WRITE);
} }
@ -269,7 +274,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
} }
private void start(String threadName, Runnable runnable) { private void thread(String threadName, Runnable runnable) {
Thread thread = new Thread(runnable, threadName); Thread thread = new Thread(runnable, threadName);
thread.setDaemon(true); thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY); thread.setPriority(Thread.MIN_PRIORITY);

View File

@ -194,12 +194,11 @@ public class NetworkHandlerTest {
assertThat(response.getData(), is(data)); assertThat(response.getData(), is(data));
} }
@Test(timeout = 5_000, expected = NodeException.class) @Test(expected = NodeException.class)
public void ensureCustomMessageWithoutResponseYieldsException() throws Exception { public void ensureCustomMessageWithoutResponseYieldsException() throws Exception {
byte[] data = cryptography().randomBytes(8); byte[] data = cryptography().randomBytes(8);
data[0] = (byte) 0; data[0] = (byte) 0;
CustomMessage request = new CustomMessage("test request", data); CustomMessage request = new CustomMessage("test request", data);
node.startup();
CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request); CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);