Some further fixes and improvements, not all tests working yet

This commit is contained in:
Christian Basler 2016-06-20 16:33:47 +02:00
parent ae2120675f
commit abc2f63aa6
5 changed files with 58 additions and 19 deletions

View File

@ -4,17 +4,23 @@ import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography;
import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler; import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.DefaultLabeler; import ch.dissem.bitmessage.ports.DefaultLabeler;
import ch.dissem.bitmessage.ports.Labeler; import ch.dissem.bitmessage.ports.Labeler;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.repository.*;
import ch.dissem.bitmessage.utils.TTL; import ch.dissem.bitmessage.utils.TTL;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -27,8 +33,11 @@ import static org.mockito.Matchers.any;
/** /**
* @author Christian Basler * @author Christian Basler
*/ */
@RunWith(Parameterized.class)
public class SystemTest { public class SystemTest {
private static int port = 6000; private static int port = 6000;
private final NetworkHandler aliceNetworkHandler;
private final NetworkHandler bobNetworkHandler;
private BitmessageContext alice; private BitmessageContext alice;
private TestListener aliceListener = new TestListener(); private TestListener aliceListener = new TestListener();
@ -39,6 +48,19 @@ public class SystemTest {
private TestListener bobListener = new TestListener(); private TestListener bobListener = new TestListener();
private BitmessageAddress bobIdentity; private BitmessageAddress bobIdentity;
public SystemTest(NetworkHandler peer, NetworkHandler node) {
this.aliceNetworkHandler = peer;
this.bobNetworkHandler = node;
}
@Parameterized.Parameters
public static List<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{new NioNetworkHandler(), new DefaultNetworkHandler()},
{new NioNetworkHandler(), new NioNetworkHandler()}
});
}
@Before @Before
public void setUp() { public void setUp() {
int alicePort = port++; int alicePort = port++;
@ -54,7 +76,7 @@ public class SystemTest {
.powRepo(new JdbcProofOfWorkRepository(aliceDB)) .powRepo(new JdbcProofOfWorkRepository(aliceDB))
.port(alicePort) .port(alicePort)
.nodeRegistry(new TestNodeRegistry(bobPort)) .nodeRegistry(new TestNodeRegistry(bobPort))
.networkHandler(new DefaultNetworkHandler()) .networkHandler(aliceNetworkHandler)
.cryptography(new BouncyCryptography()) .cryptography(new BouncyCryptography())
.listener(aliceListener) .listener(aliceListener)
.labeler(aliceLabeler) .labeler(aliceLabeler)
@ -70,7 +92,7 @@ public class SystemTest {
.powRepo(new JdbcProofOfWorkRepository(bobDB)) .powRepo(new JdbcProofOfWorkRepository(bobDB))
.port(bobPort) .port(bobPort)
.nodeRegistry(new TestNodeRegistry(alicePort)) .nodeRegistry(new TestNodeRegistry(alicePort))
.networkHandler(new DefaultNetworkHandler()) .networkHandler(bobNetworkHandler)
.cryptography(new BouncyCryptography()) .cryptography(new BouncyCryptography())
.listener(bobListener) .listener(bobListener)
.labeler(new DebugLabeler("Bob")) .labeler(new DebugLabeler("Bob"))

View File

@ -305,8 +305,11 @@ public abstract class AbstractConnection {
LOG.info("Synchronization timed out"); LOG.info("Synchronization timed out");
return true; return true;
} }
if (!sendingQueue.isEmpty()) {
return false;
}
if (msg == null) { if (msg == null) {
if (requestedObjects.isEmpty() && sendingQueue.isEmpty()) if (requestedObjects.isEmpty())
return true; return true;
readTimeoutCounter++; readTimeoutCounter++;

View File

@ -88,6 +88,12 @@ public class ConnectionInfo extends AbstractConnection {
} }
} }
public void updateSyncStatus() {
if (!syncFinished) {
syncFinished = reader.getMessages().isEmpty() && syncFinished(null);
}
}
public boolean isSyncFinished() { public boolean isSyncFinished() {
return syncFinished; return syncFinished;
} }

View File

@ -77,12 +77,12 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
listener, new HashSet<InventoryVector>(), timeoutInSeconds); listener, new HashSet<InventoryVector>(), timeoutInSeconds);
while (channel.isConnected() && while (channel.isConnected() && !connection.isSyncFinished()) {
(connection.getState() != ACTIVE || connection.isSyncFinished())) {
write(requestedObjects, channel, connection); write(requestedObjects, channel, connection);
read(channel, connection); read(channel, connection);
Thread.sleep(10); Thread.sleep(10);
} }
LOG.info("Synchronization finished");
} }
return null; return null;
} }
@ -95,29 +95,35 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
channel.configureBlocking(true); channel.configureBlocking(true);
ByteBuffer buffer = ByteBuffer.allocate(MAX_MESSAGE_SIZE); ByteBuffer buffer = ByteBuffer.allocate(MAX_MESSAGE_SIZE);
new NetworkMessage(request).write(buffer); new NetworkMessage(request).write(buffer);
buffer.flip();
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
channel.write(buffer); channel.write(buffer);
} }
buffer.clear(); buffer.clear();
V3MessageReader reader = new V3MessageReader(); V3MessageReader reader = new V3MessageReader();
while (reader.getMessages().isEmpty()) { while (channel.isConnected() && reader.getMessages().isEmpty()) {
channel.read(buffer); if (channel.read(buffer) > 0) {
buffer.flip(); buffer.flip();
reader.update(buffer); reader.update(buffer);
buffer.compact();
} else {
throw new NodeException("No response from node " + server);
}
}
NetworkMessage networkMessage;
if (reader.getMessages().isEmpty()) {
throw new NodeException("No response from node " + server);
} else {
networkMessage = reader.getMessages().get(0);
} }
NetworkMessage networkMessage = reader.getMessages().get(0);
if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) { if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) {
return (CustomMessage) networkMessage.getPayload(); return (CustomMessage) networkMessage.getPayload();
} else {
if (networkMessage == null) {
throw new NodeException("No response from node " + server);
} else { } else {
throw new NodeException("Unexpected response from node " + throw new NodeException("Unexpected response from node " +
server + ": " + networkMessage.getPayload().getCommand()); server + ": " + networkMessage.getPayload().getCommand());
} }
}
} catch (IOException e) { } catch (IOException e) {
throw new ApplicationException(e); throw new ApplicationException(e);
} }
@ -272,6 +278,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
connection.updateReader(); connection.updateReader();
buffer.compact(); buffer.compact();
} }
connection.updateSyncStatus();
} }
private void thread(String threadName, Runnable runnable) { private void thread(String threadName, Runnable runnable) {
@ -285,8 +292,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
public void stop() { public void stop() {
try { try {
serverChannel.socket().close(); serverChannel.socket().close();
for (SelectionKey key : selector.keys()) { Iterator<SelectionKey> iterator = selector.keys().iterator();
key.channel().close(); while (iterator.hasNext()) {
iterator.next().channel().close();
} }
selector.close(); selector.close();
} catch (IOException e) { } catch (IOException e) {

View File

@ -254,7 +254,7 @@ public class NetworkHandlerTest {
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(),
mock(NetworkHandler.MessageListener.class), mock(NetworkHandler.MessageListener.class),
10); 100);
future.get(); future.get();
assertInventorySize(1, nodeInventory); assertInventorySize(1, nodeInventory);
assertInventorySize(1, peerInventory); assertInventorySize(1, peerInventory);