Implemented methods offer and request, system test works now but synchronization is still broken.

This commit is contained in:
Christian Basler 2016-07-08 18:14:41 +02:00
parent abc2f63aa6
commit d130080df2
4 changed files with 100 additions and 38 deletions

View File

@ -82,13 +82,12 @@ public abstract class AbstractConnection {
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
if (threadsafe) { if (threadsafe) {
this.ivCache = new ConcurrentHashMap<>(); this.ivCache = new ConcurrentHashMap<>();
this.sendingQueue = new ConcurrentLinkedDeque<>();
this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)); this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000));
} else { } else {
this.ivCache = new HashMap<>(); this.ivCache = new HashMap<>();
this.sendingQueue = new LinkedList<>();
this.requestedObjects = new HashSet<>(); this.requestedObjects = new HashSet<>();
} }
this.sendingQueue = new ConcurrentLinkedDeque<>();
this.state = CONNECTING; this.state = CONNECTING;
this.commonRequestedObjects = commonRequestedObjects; this.commonRequestedObjects = commonRequestedObjects;
} }

View File

@ -37,17 +37,21 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED; import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.Collections.selectRandom;
import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE; import static java.nio.channels.SelectionKey.OP_WRITE;
import static java.util.Collections.newSetFromMap;
import static java.util.Collections.synchronizedSet;
/** /**
* Network handler using java.nio, resulting in less threads. * Network handler using java.nio, resulting in less threads.
@ -64,6 +68,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
private InternalContext ctx; private InternalContext ctx;
private Selector selector; private Selector selector;
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private Set<ConnectionInfo> connections = synchronizedSet(newSetFromMap(new WeakHashMap<ConnectionInfo, Boolean>()));
@Override @Override
public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) { public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) {
@ -77,6 +82,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
listener, new HashSet<InventoryVector>(), timeoutInSeconds); listener, new HashSet<InventoryVector>(), timeoutInSeconds);
connections.add(connection);
while (channel.isConnected() && !connection.isSyncFinished()) { while (channel.isConnected() && !connection.isSyncFinished()) {
write(requestedObjects, channel, connection); write(requestedObjects, channel, connection);
read(channel, connection); read(channel, connection);
@ -121,8 +127,12 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) { if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) {
return (CustomMessage) networkMessage.getPayload(); return (CustomMessage) networkMessage.getPayload();
} else { } else {
throw new NodeException("Unexpected response from node " + if (networkMessage == null || networkMessage.getPayload() == null) {
server + ": " + networkMessage.getPayload().getCommand()); throw new NodeException("Empty response from node " + server);
} else {
throw new NodeException("Unexpected response from node " + server + ": "
+ networkMessage.getPayload().getClass());
}
} }
} catch (IOException e) { } catch (IOException e) {
throw new ApplicationException(e); throw new ApplicationException(e);
@ -153,12 +163,13 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
try { try {
SocketChannel accepted = serverChannel.accept(); SocketChannel accepted = serverChannel.accept();
accepted.configureBlocking(false); accepted.configureBlocking(false);
accepted.register(selector, OP_READ | OP_WRITE, ConnectionInfo connection = new ConnectionInfo(ctx, SERVER,
new ConnectionInfo(ctx, SERVER, new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(),
new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), listener,
listener, requestedObjects, 0
requestedObjects, 0 );
)); accepted.register(selector, OP_READ | OP_WRITE, connection);
connections.add(connection);
} catch (AsynchronousCloseException ignore) { } catch (AsynchronousCloseException ignore) {
LOG.trace(ignore.getMessage()); LOG.trace(ignore.getMessage());
} catch (IOException e) { } catch (IOException e) {
@ -186,12 +197,13 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
SocketChannel channel = SocketChannel.open( SocketChannel channel = SocketChannel.open(
new InetSocketAddress(address.toInetAddress(), address.getPort())); new InetSocketAddress(address.toInetAddress(), address.getPort()));
channel.configureBlocking(false); channel.configureBlocking(false);
channel.register(selector, OP_READ | OP_WRITE, ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT,
new ConnectionInfo(ctx, CLIENT, address,
address, listener,
listener, requestedObjects, 0
requestedObjects, 0 );
)); channel.register(selector, OP_READ | OP_WRITE, connection);
connections.add(connection);
} catch (AsynchronousCloseException ignore) { } catch (AsynchronousCloseException ignore) {
} catch (IOException e) { } catch (IOException e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
@ -235,6 +247,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} else { } else {
key.interestOps(OP_READ | OP_WRITE); key.interestOps(OP_READ | OP_WRITE);
} }
if (connection.getState() == DISCONNECTED) {
connections.remove(connection);
}
} }
keyIterator.remove(); keyIterator.remove();
} }
@ -292,9 +307,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
public void stop() { public void stop() {
try { try {
serverChannel.socket().close(); serverChannel.socket().close();
Iterator<SelectionKey> iterator = selector.keys().iterator(); for (SelectionKey selectionKey : selector.keys()) {
while (iterator.hasNext()) { selectionKey.channel().close();
iterator.next().channel().close();
} }
selector.close(); selector.close();
} catch (IOException e) { } catch (IOException e) {
@ -304,12 +318,64 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
@Override @Override
public void offer(InventoryVector iv) { public void offer(InventoryVector iv) {
// TODO List<ConnectionInfo> target = new LinkedList<>();
for (ConnectionInfo connection : connections) {
if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
target.add(connection);
}
}
List<ConnectionInfo> randomSubset = selectRandom(NETWORK_MAGIC_NUMBER, target);
for (ConnectionInfo connection : randomSubset) {
connection.offer(iv);
}
} }
@Override @Override
public void request(Collection<InventoryVector> inventoryVectors) { public void request(Collection<InventoryVector> inventoryVectors) {
// TODO if (!isRunning()) return;
Iterator<InventoryVector> iterator = inventoryVectors.iterator();
if (!iterator.hasNext()) {
return;
}
Map<ConnectionInfo, List<InventoryVector>> distribution = new HashMap<>();
for (ConnectionInfo connection : connections) {
if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>());
}
}
InventoryVector next = iterator.next();
ConnectionInfo previous = null;
do {
for (ConnectionInfo connection : distribution.keySet()) {
if (connection == previous) {
next = iterator.next();
}
if (connection.knowsOf(next)) {
List<InventoryVector> ivs = distribution.get(connection);
if (ivs.size() == GetData.MAX_INVENTORY_SIZE) {
connection.send(new GetData.Builder().inventory(ivs).build());
ivs.clear();
}
ivs.add(next);
iterator.remove();
if (iterator.hasNext()) {
next = iterator.next();
previous = connection;
} else {
break;
}
}
}
} while (iterator.hasNext());
for (ConnectionInfo connection : distribution.keySet()) {
List<InventoryVector> ivs = distribution.get(connection);
if (!ivs.isEmpty()) {
connection.send(new GetData.Builder().inventory(ivs).build());
}
}
} }
@Override @Override
@ -318,17 +384,14 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
TreeMap<Long, Integer> incomingConnections = new TreeMap<>(); TreeMap<Long, Integer> incomingConnections = new TreeMap<>();
TreeMap<Long, Integer> outgoingConnections = new TreeMap<>(); TreeMap<Long, Integer> outgoingConnections = new TreeMap<>();
for (SelectionKey key : selector.keys()) { for (ConnectionInfo connection : connections) {
if (key.attachment() instanceof ConnectionInfo) { if (connection.getState() == ACTIVE) {
ConnectionInfo connection = (ConnectionInfo) key.attachment(); long stream = connection.getNode().getStream();
if (connection.getState() == ACTIVE) { streams.add(stream);
long stream = connection.getNode().getStream(); if (connection.getMode() == SERVER) {
streams.add(stream); inc(incomingConnections, stream);
if (connection.getMode() == SERVER) { } else {
inc(incomingConnections, stream); inc(outgoingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
} }
} }
} }

View File

@ -66,7 +66,7 @@ public class NetworkHandlerTest {
private final NetworkHandler nodeNetworkHandler; private final NetworkHandler nodeNetworkHandler;
@Rule @Rule
public final TestRule timeout = new DisableOnDebug(Timeout.seconds(5)); public final TestRule timeout = new DisableOnDebug(Timeout.seconds(10));
public NetworkHandlerTest(NetworkHandler peer, NetworkHandler node) { public NetworkHandlerTest(NetworkHandler peer, NetworkHandler node) {
this.peerNetworkHandler = peer; this.peerNetworkHandler = peer;

View File

@ -121,7 +121,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
builder.retries(rs.getInt("retries")); builder.retries(rs.getInt("retries"));
builder.nextTry(rs.getLong("next_try")); builder.nextTry(rs.getLong("next_try"));
builder.labels(findLabels(connection, builder.labels(findLabels(connection,
"WHERE id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord")); "id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord"));
Plaintext message = builder.build(); Plaintext message = builder.build();
message.setInitialHash(rs.getBytes("initial_hash")); message.setInitialHash(rs.getBytes("initial_hash"));
result.add(message); result.add(message);