Improved tests and fixed some

This commit is contained in:
2016-06-16 19:47:59 +02:00
parent ed4fd1002b
commit 0fadb40c6c
16 changed files with 234 additions and 105 deletions

View File

@ -12,7 +12,7 @@ uploadArchives {
dependencies {
compile project(':core')
testCompile 'junit:junit:4.11'
testCompile 'junit:junit:4.12'
testCompile 'org.slf4j:slf4j-simple:1.7.12'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile project(path: ':core', configuration: 'testArtifacts')

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.*;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
@ -62,6 +63,8 @@ public abstract class AbstractConnection {
protected long peerNonce;
protected int version;
protected long[] streams;
private boolean verackSent;
private boolean verackReceived;
public AbstractConnection(InternalContext context, Mode mode,
NetworkAddress node,
@ -198,7 +201,7 @@ public abstract class AbstractConnection {
return ivCache.containsKey(iv);
}
protected void cleanupIvCache() {
private void cleanupIvCache() {
Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE);
for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) {
if (entry.getValue() < fiveMinutesAgo) {
@ -213,16 +216,10 @@ public abstract class AbstractConnection {
handleVersion((Version) payload);
break;
case VERACK:
switch (mode) {
case SERVER:
activateConnection();
break;
case CLIENT:
case SYNC:
default:
// NO OP
break;
if (verackSent) {
activateConnection();
}
verackReceived = true;
break;
case CUSTOM:
MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload);
@ -237,7 +234,7 @@ public abstract class AbstractConnection {
}
}
protected void activateConnection() {
private void activateConnection() {
LOG.info("Successfully established connection with node " + node);
state = ACTIVE;
node.setTime(UnixTime.now());
@ -272,17 +269,13 @@ public abstract class AbstractConnection {
this.version = version.getVersion();
this.streams = version.getStreams();
verackSent = true;
send(new VerAck());
switch (mode) {
case SERVER:
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
break;
case CLIENT:
case SYNC:
activateConnection();
break;
default:
// NO OP
if (mode == SERVER) {
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
}
if (verackReceived) {
activateConnection();
}
} else {
LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting.");

View File

@ -19,6 +19,7 @@ package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.V3MessageReader;
@ -26,9 +27,12 @@ import ch.dissem.bitmessage.networking.AbstractConnection;
import ch.dissem.bitmessage.ports.NetworkHandler;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.ports.NetworkHandler.MAX_MESSAGE_SIZE;
/**
@ -43,6 +47,10 @@ public class ConnectionInfo extends AbstractConnection {
NetworkAddress node, NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects) {
super(context, mode, node, listener, commonRequestedObjects, false);
out.flip();
if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(peerNonce).addrFrom(host).addrRecv(node).build());
}
}
public State getState() {
@ -77,12 +85,8 @@ public class ConnectionInfo extends AbstractConnection {
}
}
public List<NetworkMessage> getMessages() {
return reader.getMessages();
}
@Override
protected void send(MessagePayload payload) {
sendingQueue.addFirst(payload);
sendingQueue.add(payload);
}
}

View File

@ -37,11 +37,14 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
@ -51,13 +54,40 @@ import static java.nio.channels.SelectionKey.OP_WRITE;
public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder {
private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class);
private final ExecutorService pool = Executors.newCachedThreadPool(
pool("network")
.lowPrio()
.daemon()
.build());
private InternalContext ctx;
private Selector selector;
private ServerSocketChannel serverChannel;
@Override
public Future<?> synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds) {
return null;
public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, long timeoutInSeconds) {
return pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Set<InventoryVector> requestedObjects = new HashSet<>();
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.finishConnect();
channel.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
listener, new HashSet<InventoryVector>());
while (channel.isConnected() &&
(connection.getState() != ACTIVE
|| connection.getSendingQueue().isEmpty()
|| requestedObjects.isEmpty())) {
write(requestedObjects, channel, connection);
read(channel, connection);
Thread.sleep(10);
}
}
return null;
}
});
}
@Override
@ -66,7 +96,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
channel.configureBlocking(true);
ByteBuffer buffer = ByteBuffer.allocate(MAX_MESSAGE_SIZE);
new NetworkMessage(request).write(buffer);
channel.write(buffer);
while (buffer.hasRemaining()) {
channel.write(buffer);
}
buffer.clear();
V3MessageReader reader = new V3MessageReader();
@ -106,34 +138,74 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
throw new ApplicationException(e);
}
final Set<InventoryVector> requestedObjects = new HashSet<>();
new Thread(new Runnable() {
start("connection listener", new Runnable() {
@Override
public void run() {
try {
serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(ctx.getPort()));
SocketChannel accepted = serverChannel.accept();
accepted.configureBlocking(false);
// FIXME: apparently it isn't good practice to generally listen for OP_WRITE
accepted.register(selector, OP_READ | OP_WRITE).attach(
new ConnectionInfo(ctx, SERVER,
new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(),
listener,
requestedObjects
));
serverChannel.socket().bind(new InetSocketAddress(ctx.getPort()));
while (selector.isOpen() && serverChannel.isOpen()) {
try {
SocketChannel accepted = serverChannel.accept();
accepted.configureBlocking(false);
accepted.register(selector, OP_READ | OP_WRITE,
new ConnectionInfo(ctx, SERVER,
new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(),
listener,
requestedObjects
));
} catch (AsynchronousCloseException ignore) {
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
} catch (ClosedSelectorException | AsynchronousCloseException ignore) {
} catch (IOException e) {
throw new ApplicationException(e);
} catch (RuntimeException e) {
e.printStackTrace();
throw e;
}
}
}, "Server").start();
new Thread(new Runnable() {
});
start("connection starter", new Runnable() {
@Override
public void run() {
while (selector.isOpen()) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
2, ctx.getStreams());
for (NetworkAddress address : addresses) {
try {
SocketChannel channel = SocketChannel.open(
new InetSocketAddress(address.toInetAddress(), address.getPort()));
channel.configureBlocking(false);
channel.register(selector, OP_READ | OP_WRITE,
new ConnectionInfo(ctx, CLIENT,
address,
listener,
requestedObjects
));
} catch (AsynchronousCloseException ignore) {
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
try {
Thread.sleep(30_000);
} catch (InterruptedException e) {
return;
}
}
}
});
start("processor", new Runnable() {
@Override
public void run() {
try {
while (selector.isOpen()) {
// TODO: establish outgoing connections
selector.select(1000);
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
@ -141,22 +213,16 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
if (key.attachment() instanceof ConnectionInfo) {
SocketChannel channel = (SocketChannel) key.channel();
ConnectionInfo connection = (ConnectionInfo) key.attachment();
if (key.isWritable()) {
if (connection.getOutBuffer().hasRemaining()) {
channel.write(connection.getOutBuffer());
}
while (!connection.getOutBuffer().hasRemaining() && !connection.getSendingQueue().isEmpty()) {
MessagePayload payload = connection.getSendingQueue().poll();
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
}
new NetworkMessage(payload).write(connection.getOutBuffer());
}
write(requestedObjects, channel, connection);
}
if (key.isReadable()) {
channel.read(connection.getInBuffer());
connection.updateReader();
read(channel, connection);
}
if (connection.getSendingQueue().isEmpty()) {
key.interestOps(OP_READ);
} else {
key.interestOps(OP_READ | OP_WRITE);
}
}
keyIterator.remove();
@ -168,13 +234,52 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
throw new ApplicationException(e);
}
}
}, "Connections").start();
});
}
private static void write(Set<InventoryVector> requestedObjects, SocketChannel channel, ConnectionInfo connection)
throws IOException {
if (!connection.getSendingQueue().isEmpty()) {
ByteBuffer buffer = connection.getOutBuffer();
if (buffer.hasRemaining()) {
channel.write(buffer);
}
while (!buffer.hasRemaining()
&& !connection.getSendingQueue().isEmpty()) {
buffer.clear();
MessagePayload payload = connection.getSendingQueue().poll();
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
}
new NetworkMessage(payload).write(buffer);
buffer.flip();
if (buffer.hasRemaining()) {
channel.write(buffer);
}
}
}
}
private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException {
ByteBuffer buffer = connection.getInBuffer();
while (channel.read(buffer) > 0) {
buffer.flip();
connection.updateReader();
buffer.compact();
}
}
private void start(String threadName, Runnable runnable) {
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
thread.start();
}
@Override
public void stop() {
try {
serverChannel.close();
serverChannel.socket().close();
for (SelectionKey key : selector.keys()) {
key.channel().close();
}

View File

@ -22,14 +22,23 @@ import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.*;
import ch.dissem.bitmessage.utils.Property;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
@ -42,6 +51,7 @@ import static org.mockito.Mockito.mock;
/**
* FIXME: there really should be sensible tests for the network handler
*/
@RunWith(Parameterized.class)
public class NetworkHandlerTest {
private static final Logger LOG = LoggerFactory.getLogger(NetworkHandlerTest.class);
private static NetworkAddress peerAddress = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build();
@ -51,7 +61,27 @@ public class NetworkHandlerTest {
private BitmessageContext peer;
private BitmessageContext node;
private NetworkHandler networkHandler;
private final NetworkHandler peerNetworkHandler;
private final NetworkHandler nodeNetworkHandler;
@Rule
public final TestRule timeout = new DisableOnDebug(Timeout.seconds(5));
public NetworkHandlerTest(NetworkHandler peer, NetworkHandler node) {
this.peerNetworkHandler = peer;
this.nodeNetworkHandler = node;
}
@Parameterized.Parameters
public static List<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{new DefaultNetworkHandler(), new DefaultNetworkHandler()},
{new DefaultNetworkHandler(), new NioNetworkHandler()},
{new NioNetworkHandler(), new DefaultNetworkHandler()},
{new NioNetworkHandler(), new NioNetworkHandler()}
});
}
@Before
public void setUp() {
@ -63,7 +93,7 @@ public class NetworkHandlerTest {
.powRepo(mock(ProofOfWorkRepository.class))
.port(peerAddress.getPort())
.nodeRegistry(new TestNodeRegistry())
.networkHandler(new DefaultNetworkHandler())
.networkHandler(peerNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.customCommandHandler(new CustomCommandHandler() {
@ -90,7 +120,6 @@ public class NetworkHandlerTest {
peer.startup();
nodeInventory = new TestInventory();
networkHandler = new DefaultNetworkHandler();
node = new BitmessageContext.Builder()
.addressRepo(mock(AddressRepository.class))
.inventory(nodeInventory)
@ -98,7 +127,7 @@ public class NetworkHandlerTest {
.powRepo(mock(ProofOfWorkRepository.class))
.port(6002)
.nodeRegistry(new TestNodeRegistry(peerAddress))
.networkHandler(networkHandler)
.networkHandler(nodeNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.build();
@ -108,7 +137,7 @@ public class NetworkHandlerTest {
public void cleanUp() {
shutdown(peer);
shutdown(node);
shutdown(networkHandler);
shutdown(nodeNetworkHandler);
}
private static void shutdown(BitmessageContext ctx) {
@ -140,7 +169,7 @@ public class NetworkHandlerTest {
} while (networkHandler.isRunning());
}
@Test(timeout = 5_000)
@Test
public void ensureNodesAreConnecting() throws Exception {
node.startup();
Property status;
@ -151,14 +180,14 @@ public class NetworkHandlerTest {
assertEquals(1, status.getProperty("outgoing").getValue());
}
@Test(timeout = 5_000)
@Test
public void ensureCustomMessageIsSentAndResponseRetrieved() throws Exception {
byte[] data = cryptography().randomBytes(8);
data[0] = (byte) 1;
CustomMessage request = new CustomMessage("test request", data);
node.startup();
CustomMessage response = networkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
assertThat(response, notNullValue());
assertThat(response.getCustomCommand(), is("test response"));
@ -172,14 +201,14 @@ public class NetworkHandlerTest {
CustomMessage request = new CustomMessage("test request", data);
node.startup();
CustomMessage response = networkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
assertThat(response, notNullValue());
assertThat(response.getCustomCommand(), is("test response"));
assertThat(response.getData(), is(request.getData()));
}
@Test(timeout = 5_000)
@Test
public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception {
peerInventory.init(
"V4Pubkey.payload",
@ -191,7 +220,7 @@ public class NetworkHandlerTest {
"V4Pubkey.payload"
);
Future<?> future = networkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
mock(NetworkHandler.MessageListener.class),
10);
future.get();
@ -199,7 +228,7 @@ public class NetworkHandlerTest {
assertInventorySize(3, peerInventory);
}
@Test(timeout = 5_000)
@Test
public void ensureObjectsAreSynchronizedIfOnlyPeerHasObjects() throws Exception {
peerInventory.init(
"V4Pubkey.payload",
@ -208,7 +237,7 @@ public class NetworkHandlerTest {
nodeInventory.init();
Future<?> future = networkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
mock(NetworkHandler.MessageListener.class),
10);
future.get();
@ -216,7 +245,7 @@ public class NetworkHandlerTest {
assertInventorySize(2, peerInventory);
}
@Test(timeout = 5_000)
@Test
public void ensureObjectsAreSynchronizedIfOnlyNodeHasObjects() throws Exception {
peerInventory.init();
@ -224,7 +253,7 @@ public class NetworkHandlerTest {
"V1Msg.payload"
);
Future<?> future = networkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
mock(NetworkHandler.MessageListener.class),
10);
future.get();