Refactored use of the DefaultMessageListener so it's retrieved from the InternalContext

This commit is contained in:
Christian Basler 2016-09-12 08:18:30 +02:00
parent a240606909
commit 489b8968e0
15 changed files with 180 additions and 173 deletions

View File

@ -62,16 +62,16 @@ public class BitmessageContext {
private final InternalContext ctx; private final InternalContext ctx;
private final Labeler labeler; private final Labeler labeler;
private final NetworkHandler.MessageListener networkListener;
private final boolean sendPubkeyOnIdentityCreation; private final boolean sendPubkeyOnIdentityCreation;
private BitmessageContext(Builder builder) { private BitmessageContext(Builder builder) {
if (builder.listener instanceof Listener.WithContext) {
((Listener.WithContext) builder.listener).setContext(this);
}
ctx = new InternalContext(builder); ctx = new InternalContext(builder);
labeler = builder.labeler; labeler = builder.labeler;
ctx.getProofOfWorkService().doMissingProofOfWork(30_000); // TODO: this should be configurable ctx.getProofOfWorkService().doMissingProofOfWork(30_000); // TODO: this should be configurable
networkListener = new DefaultMessageListener(ctx, labeler, builder.listener);
sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation; sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation;
} }
@ -180,7 +180,7 @@ public class BitmessageContext {
} }
public void startup() { public void startup() {
ctx.getNetworkHandler().start(networkListener); ctx.getNetworkHandler().start();
} }
public void shutdown() { public void shutdown() {
@ -195,7 +195,7 @@ public class BitmessageContext {
* @param wait waits for the synchronization thread to finish * @param wait waits for the synchronization thread to finish
*/ */
public void synchronize(InetAddress host, int port, long timeoutInSeconds, boolean wait) { public void synchronize(InetAddress host, int port, long timeoutInSeconds, boolean wait) {
Future<?> future = ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds); Future<?> future = ctx.getNetworkHandler().synchronize(host, port, timeoutInSeconds);
if (wait) { if (wait) {
try { try {
future.get(); future.get();
@ -271,7 +271,7 @@ public class BitmessageContext {
broadcast.decrypt(address); broadcast.decrypt(address);
// This decrypts it twice, but on the other hand it doesn't try to decrypt the objects with // This decrypts it twice, but on the other hand it doesn't try to decrypt the objects with
// other subscriptions and the interface stays as simple as possible. // other subscriptions and the interface stays as simple as possible.
networkListener.receive(object); ctx.getNetworkListener().receive(object);
} catch (DecryptionFailedException ignore) { } catch (DecryptionFailedException ignore) {
} catch (Exception e) { } catch (Exception e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);
@ -296,6 +296,13 @@ public class BitmessageContext {
public interface Listener { public interface Listener {
void receive(Plaintext plaintext); void receive(Plaintext plaintext);
/**
* A message listener that needs a {@link BitmessageContext}, i.e. for implementing some sort of chat bot.
*/
interface WithContext extends Listener {
void setContext(BitmessageContext ctx);
}
} }
public static final class Builder { public static final class Builder {

View File

@ -24,7 +24,6 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.exception.DecryptionFailedException; import ch.dissem.bitmessage.exception.DecryptionFailedException;
import ch.dissem.bitmessage.ports.Labeler; import ch.dissem.bitmessage.ports.Labeler;
import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.TTL;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -32,21 +31,24 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import static ch.dissem.bitmessage.entity.Plaintext.Status.*; import static ch.dissem.bitmessage.entity.Plaintext.Status.PUBKEY_REQUESTED;
import static ch.dissem.bitmessage.utils.UnixTime.DAY;
class DefaultMessageListener implements NetworkHandler.MessageListener { class DefaultMessageListener implements NetworkHandler.MessageListener, InternalContext.ContextHolder {
private final static Logger LOG = LoggerFactory.getLogger(DefaultMessageListener.class); private final static Logger LOG = LoggerFactory.getLogger(DefaultMessageListener.class);
private final InternalContext ctx;
private final Labeler labeler; private final Labeler labeler;
private final BitmessageContext.Listener listener; private final BitmessageContext.Listener listener;
private InternalContext ctx;
public DefaultMessageListener(InternalContext context, Labeler labeler, BitmessageContext.Listener listener) { public DefaultMessageListener(Labeler labeler, BitmessageContext.Listener listener) {
this.ctx = context;
this.labeler = labeler; this.labeler = labeler;
this.listener = listener; this.listener = listener;
} }
@Override
public void setContext(InternalContext context) {
this.ctx = context;
}
@Override @Override
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
public void receive(ObjectMessage object) throws IOException { public void receive(ObjectMessage object) throws IOException {

View File

@ -56,6 +56,7 @@ public class InternalContext {
private final CustomCommandHandler customCommandHandler; private final CustomCommandHandler customCommandHandler;
private final ProofOfWorkService proofOfWorkService; private final ProofOfWorkService proofOfWorkService;
private final Labeler labeler; private final Labeler labeler;
private final NetworkHandler.MessageListener networkListener;
private final TreeSet<Long> streams = new TreeSet<>(); private final TreeSet<Long> streams = new TreeSet<>();
private final int port; private final int port;
@ -79,6 +80,7 @@ public class InternalContext {
this.connectionLimit = builder.connectionLimit; this.connectionLimit = builder.connectionLimit;
this.connectionTTL = builder.connectionTTL; this.connectionTTL = builder.connectionTTL;
this.labeler = builder.labeler; this.labeler = builder.labeler;
this.networkListener = new DefaultMessageListener(labeler, builder.listener);
Singleton.initialize(cryptography); Singleton.initialize(cryptography);
@ -94,7 +96,8 @@ public class InternalContext {
} }
init(cryptography, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, init(cryptography, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository,
proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler); proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler,
networkListener);
for (BitmessageAddress identity : addressRepository.getIdentities()) { for (BitmessageAddress identity : addressRepository.getIdentities()) {
streams.add(identity.getStream()); streams.add(identity.getStream());
} }
@ -148,6 +151,10 @@ public class InternalContext {
return labeler; return labeler;
} }
public NetworkHandler.MessageListener getNetworkListener() {
return networkListener;
}
public long[] getStreams() { public long[] getStreams() {
long[] result = new long[streams.size()]; long[] result = new long[streams.size()];
int i = 0; int i = 0;

View File

@ -11,6 +11,7 @@ import ch.dissem.bitmessage.ports.ProofOfWorkRepository.Item;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
@ -89,6 +90,11 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
ctx.getLabeler().markAsSent(plaintext); ctx.getLabeler().markAsSent(plaintext);
messageRepo.save(plaintext); messageRepo.save(plaintext);
} }
try {
ctx.getNetworkListener().receive(object);
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
ctx.getInventory().storeObject(object); ctx.getInventory().storeObject(object);
ctx.getNetworkHandler().offer(object.getInventoryVector()); ctx.getNetworkHandler().offer(object.getInventoryVector());
} else { } else {

View File

@ -41,7 +41,7 @@ public interface NetworkHandler {
* An implementation should disconnect if either the timeout is reached or the returned thread is interrupted. * An implementation should disconnect if either the timeout is reached or the returned thread is interrupted.
* </p> * </p>
*/ */
Future<?> synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds); Future<?> synchronize(InetAddress server, int port, long timeoutInSeconds);
/** /**
* Send a custom message to a specific node (that should implement handling for this message type) and returns * Send a custom message to a specific node (that should implement handling for this message type) and returns
@ -57,7 +57,7 @@ public interface NetworkHandler {
/** /**
* Start a full network node, accepting incoming connections and relaying objects. * Start a full network node, accepting incoming connections and relaying objects.
*/ */
void start(MessageListener listener); void start();
/** /**
* Stop the full network node. * Stop the full network node.

View File

@ -68,7 +68,7 @@ public class DefaultMessageListenerTest extends TestBase {
when(ctx.getNetworkHandler()).thenReturn(networkHandler); when(ctx.getNetworkHandler()).thenReturn(networkHandler);
when(ctx.getLabeler()).thenReturn(mock(Labeler.class)); when(ctx.getLabeler()).thenReturn(mock(Labeler.class));
listener = new DefaultMessageListener(ctx, mock(Labeler.class), mock(BitmessageContext.Listener.class)); listener = new DefaultMessageListener(mock(Labeler.class), mock(BitmessageContext.Listener.class));
} }
@Test @Test

View File

@ -71,14 +71,13 @@ public abstract class AbstractConnection {
public AbstractConnection(InternalContext context, Mode mode, public AbstractConnection(InternalContext context, Mode mode,
NetworkAddress node, NetworkAddress node,
NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects, Set<InventoryVector> commonRequestedObjects,
long syncTimeout) { long syncTimeout) {
this.ctx = context; this.ctx = context;
this.mode = mode; this.mode = mode;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node; this.node = node;
this.listener = listener; this.listener = context.getNetworkListener();
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0);
this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)); this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000));
this.ivCache = new ConcurrentHashMap<>(); this.ivCache = new ConcurrentHashMap<>();

View File

@ -63,29 +63,29 @@ class Connection extends AbstractConnection {
private OutputStream out; private OutputStream out;
private boolean socketInitialized; private boolean socketInitialized;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, public Connection(InternalContext context, Mode mode, Socket socket,
Set<InventoryVector> requestedObjectsMap) throws IOException { Set<InventoryVector> requestedObjectsMap) throws IOException {
this(context, mode, listener, socket, requestedObjectsMap, this(context, mode, socket, requestedObjectsMap,
new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(),
0); 0);
} }
public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, public Connection(InternalContext context, Mode mode, NetworkAddress node,
Set<InventoryVector> requestedObjectsMap) { Set<InventoryVector> requestedObjectsMap) {
this(context, mode, listener, new Socket(), requestedObjectsMap, this(context, mode, new Socket(), requestedObjectsMap,
node, 0); node, 0);
} }
private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, private Connection(InternalContext context, Mode mode, Socket socket,
Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) {
super(context, mode, node, listener, commonRequestedObjects, syncTimeout); super(context, mode, node, commonRequestedObjects, syncTimeout);
this.startTime = UnixTime.now(); this.startTime = UnixTime.now();
this.socket = socket; this.socket = socket;
} }
public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener,
long timeoutInSeconds) throws IOException { long timeoutInSeconds) throws IOException {
return new Connection(ctx, SYNC, listener, new Socket(address, port), return new Connection(ctx, SYNC, new Socket(address, port),
new HashSet<InventoryVector>(), new HashSet<InventoryVector>(),
new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), new NetworkAddress.Builder().ip(address).port(port).stream(1).build(),
timeoutInSeconds); timeoutInSeconds);

View File

@ -42,11 +42,10 @@ public class ConnectionOrganizer implements Runnable {
private Connection initialConnection; private Connection initialConnection;
public ConnectionOrganizer(InternalContext ctx, public ConnectionOrganizer(InternalContext ctx,
DefaultNetworkHandler networkHandler, DefaultNetworkHandler networkHandler) {
NetworkHandler.MessageListener listener) {
this.ctx = ctx; this.ctx = ctx;
this.networkHandler = networkHandler; this.networkHandler = networkHandler;
this.listener = listener; this.listener = ctx.getNetworkListener();
} }
@Override @Override
@ -91,8 +90,7 @@ public class ConnectionOrganizer implements Runnable {
NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
boolean first = active == 0 && initialConnection == null; boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
Connection c = new Connection(ctx, CLIENT, address, listener, Connection c = new Connection(ctx, CLIENT, address, networkHandler.requestedObjects);
networkHandler.requestedObjects);
if (first) { if (first) {
initialConnection = c; initialConnection = c;
first = false; first = false;

View File

@ -64,9 +64,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
@Override @Override
public Future<?> synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds) { public Future<?> synchronize(InetAddress server, int port, long timeoutInSeconds) {
try { try {
Connection connection = Connection.sync(ctx, server, port, listener, timeoutInSeconds); Connection connection = Connection.sync(ctx, server, port, ctx.getNetworkListener(), timeoutInSeconds);
Future<?> reader = pool.submit(connection.getReader()); Future<?> reader = pool.submit(connection.getReader());
pool.execute(connection.getWriter()); pool.execute(connection.getWriter());
return reader; return reader;
@ -97,19 +97,16 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
@Override @Override
public void start(final MessageListener listener) { public void start() {
if (listener == null) {
throw new IllegalStateException("Listener must be set at start");
}
if (running) { if (running) {
throw new IllegalStateException("Network already running - you need to stop first."); throw new IllegalStateException("Network already running - you need to stop first.");
} }
try { try {
running = true; running = true;
connections.clear(); connections.clear();
server = new ServerRunnable(ctx, this, listener); server = new ServerRunnable(ctx, this);
pool.execute(server); pool.execute(server);
pool.execute(new ConnectionOrganizer(ctx, this, listener)); pool.execute(new ConnectionOrganizer(ctx, this));
} catch (IOException e) { } catch (IOException e) {
throw new ApplicationException(e); throw new ApplicationException(e);
} }

View File

@ -38,11 +38,10 @@ public class ServerRunnable implements Runnable, Closeable {
private final DefaultNetworkHandler networkHandler; private final DefaultNetworkHandler networkHandler;
private final NetworkHandler.MessageListener listener; private final NetworkHandler.MessageListener listener;
public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler) throws IOException {
NetworkHandler.MessageListener listener) throws IOException {
this.ctx = ctx; this.ctx = ctx;
this.networkHandler = networkHandler; this.networkHandler = networkHandler;
this.listener = listener; this.listener = ctx.getNetworkListener();
this.serverSocket = new ServerSocket(ctx.getPort()); this.serverSocket = new ServerSocket(ctx.getPort());
} }
@ -52,8 +51,7 @@ public class ServerRunnable implements Runnable, Closeable {
try { try {
Socket socket = serverSocket.accept(); Socket socket = serverSocket.accept();
socket.setSoTimeout(Connection.READ_TIMEOUT); socket.setSoTimeout(Connection.READ_TIMEOUT);
networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener, networkHandler.startConnection(new Connection(ctx, SERVER, socket, networkHandler.requestedObjects));
networkHandler.requestedObjects));
} catch (IOException e) { } catch (IOException e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);
} }

View File

@ -17,13 +17,15 @@
package ch.dissem.bitmessage.networking.nio; package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.factory.V3MessageReader;
import ch.dissem.bitmessage.networking.AbstractConnection; import ch.dissem.bitmessage.networking.AbstractConnection;
import ch.dissem.bitmessage.ports.NetworkHandler;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
@ -43,10 +45,9 @@ public class ConnectionInfo extends AbstractConnection {
private boolean syncFinished; private boolean syncFinished;
private long lastUpdate = System.currentTimeMillis(); private long lastUpdate = System.currentTimeMillis();
public ConnectionInfo(InternalContext context, Mode mode, public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node,
NetworkAddress node, NetworkHandler.MessageListener listener,
Set<InventoryVector> commonRequestedObjects, long syncTimeout) { Set<InventoryVector> commonRequestedObjects, long syncTimeout) {
super(context, mode, node, listener, commonRequestedObjects, syncTimeout); super(context, mode, node, commonRequestedObjects, syncTimeout);
headerOut.flip(); headerOut.flip();
if (mode == CLIENT || mode == SYNC) { if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());

View File

@ -26,7 +26,7 @@ import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.factory.V3MessageReader;
import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.*; import ch.dissem.bitmessage.utils.Property;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -37,7 +37,6 @@ import java.net.NoRouteToHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.Collections;
import java.util.concurrent.*; import java.util.concurrent.*;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*;
@ -47,6 +46,7 @@ import static ch.dissem.bitmessage.utils.Collections.selectRandom;
import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.*; import static java.nio.channels.SelectionKey.*;
import static java.util.Collections.newSetFromMap;
/** /**
* Network handler using java.nio, resulting in less threads. * Network handler using java.nio, resulting in less threads.
@ -63,13 +63,14 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
private InternalContext ctx; private InternalContext ctx;
private Selector selector; private Selector selector;
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private Queue<NetworkAddress> connectionQueue = new ConcurrentLinkedQueue<>();
private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>(); private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>();
private final Set<InventoryVector> requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)); private final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000));
private Thread starter; private Thread starter;
@Override @Override
public Future<Void> synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) { public Future<Void> synchronize(final InetAddress server, final int port, final long timeoutInSeconds) {
return threadPool.submit(new Callable<Void>() { return threadPool.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
@ -77,7 +78,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
channel.configureBlocking(false); channel.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
listener, new HashSet<InventoryVector>(), timeoutInSeconds); new HashSet<InventoryVector>(), timeoutInSeconds);
while (channel.isConnected() && !connection.isSyncFinished()) { while (channel.isConnected() && !connection.isSyncFinished()) {
write(channel, connection); write(channel, connection);
read(channel, connection); read(channel, connection);
@ -135,10 +136,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
@Override @Override
public void start(final MessageListener listener) { public void start() {
if (listener == null) {
throw new IllegalStateException("Listener must be set at start");
}
if (selector != null && selector.isOpen()) { if (selector != null && selector.isOpen()) {
throw new IllegalStateException("Network already running - you need to stop first."); throw new IllegalStateException("Network already running - you need to stop first.");
} }
@ -147,42 +145,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} catch (IOException e) { } catch (IOException e) {
throw new ApplicationException(e); throw new ApplicationException(e);
} }
thread("connection listener", new Runnable() {
@Override
public void run() {
try {
serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(ctx.getPort()));
while (selector.isOpen() && serverChannel.isOpen()) {
try {
SocketChannel accepted = serverChannel.accept();
accepted.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SERVER,
new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(),
listener,
requestedObjects, 0
);
connections.put(
connection,
accepted.register(selector, OP_READ | OP_WRITE, connection)
);
} catch (AsynchronousCloseException ignore) {
LOG.trace(ignore.getMessage());
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
} catch (ClosedSelectorException | AsynchronousCloseException ignore) {
} catch (IOException e) {
throw new ApplicationException(e);
} catch (RuntimeException e) {
e.printStackTrace();
throw e;
}
}
});
starter = thread("connection starter", new Runnable() { starter = thread("connection manager", new Runnable() {
@Override @Override
public void run() { public void run() {
while (selector.isOpen()) { while (selector.isOpen()) {
@ -197,34 +161,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(100, ctx.getStreams()); List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(100, ctx.getStreams());
addresses = selectRandom(missing, addresses); addresses = selectRandom(missing, addresses);
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
if (isConnectedTo(address)) { if (!isConnectedTo(address)) {
continue; connectionQueue.offer(address);
}
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort()));
ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT,
address,
listener,
requestedObjects, 0
);
connections.put(
connection,
channel.register(selector, OP_CONNECT, connection)
);
} catch (NoRouteToHostException ignore) {
// We'll try to connect to many offline nodes, so
// this is expected to happen quite a lot.
} catch (AsynchronousCloseException e) {
// The exception is expected if the network is being
// shut down, as we actually do asynchronously close
// the connections.
if (isRunning()) {
LOG.error(e.getMessage(), e);
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
} }
} }
} }
@ -252,17 +190,47 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
}); });
thread("processor", new Runnable() { thread("selector worker", new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(ctx.getPort()));
serverChannel.register(selector, OP_ACCEPT, null);
while (selector.isOpen()) { while (selector.isOpen()) {
selector.select(1000); selector.select(1000);
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next(); SelectionKey key = keyIterator.next();
keyIterator.remove(); keyIterator.remove();
if (key.attachment() instanceof ConnectionInfo) { if (key.attachment() == null) {
try {
if (key.isAcceptable()) {
// handle accept
try {
SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept();
accepted.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SERVER,
new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(),
requestedObjects, 0
);
connections.put(
connection,
accepted.register(selector, OP_READ | OP_WRITE, connection)
);
} catch (AsynchronousCloseException e) {
LOG.trace(e.getMessage());
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
} catch (CancelledKeyException e) {
LOG.error(e.getMessage(), e);
}
} else {
// handle read/write
SocketChannel channel = (SocketChannel) key.channel(); SocketChannel channel = (SocketChannel) key.channel();
ConnectionInfo connection = (ConnectionInfo) key.attachment(); ConnectionInfo connection = (ConnectionInfo) key.attachment();
try { try {
@ -290,6 +258,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
} }
} }
} }
// set interest ops
for (Map.Entry<ConnectionInfo, SelectionKey> e : connections.entrySet()) { for (Map.Entry<ConnectionInfo, SelectionKey> e : connections.entrySet()) {
if (e.getValue().isValid() if (e.getValue().isValid()
&& (e.getValue().interestOps() & OP_WRITE) == 0 && (e.getValue().interestOps() & OP_WRITE) == 0
@ -298,6 +267,35 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex
e.getValue().interestOps(OP_READ | OP_WRITE); e.getValue().interestOps(OP_READ | OP_WRITE);
} }
} }
// start new connections
if (!connectionQueue.isEmpty()) {
NetworkAddress address = connectionQueue.poll();
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort()));
ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT,
address,
requestedObjects, 0
);
connections.put(
connection,
channel.register(selector, OP_CONNECT, connection)
);
} catch (NoRouteToHostException ignore) {
// We'll try to connect to many offline nodes, so
// this is expected to happen quite a lot.
} catch (AsynchronousCloseException e) {
// The exception is expected if the network is being
// shut down, as we actually do asynchronously close
// the connections.
if (isRunning()) {
LOG.error(e.getMessage(), e);
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
} }
selector.close(); selector.close();
} catch (ClosedSelectorException ignore) { } catch (ClosedSelectorException ignore) {

View File

@ -230,9 +230,7 @@ public class NetworkHandlerTest {
"V4Pubkey.payload" "V4Pubkey.payload"
); );
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
mock(NetworkHandler.MessageListener.class),
10);
future.get(); future.get();
assertInventorySize(3, nodeInventory); assertInventorySize(3, nodeInventory);
assertInventorySize(3, peerInventory); assertInventorySize(3, peerInventory);
@ -247,9 +245,7 @@ public class NetworkHandlerTest {
nodeInventory.init(); nodeInventory.init();
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
mock(NetworkHandler.MessageListener.class),
10);
future.get(); future.get();
assertInventorySize(2, nodeInventory); assertInventorySize(2, nodeInventory);
assertInventorySize(2, peerInventory); assertInventorySize(2, peerInventory);
@ -263,9 +259,7 @@ public class NetworkHandlerTest {
"V1Msg.payload" "V1Msg.payload"
); );
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
mock(NetworkHandler.MessageListener.class),
10);
future.get(); future.get();
assertInventorySize(1, nodeInventory); assertInventorySize(1, nodeInventory);
assertInventorySize(1, peerInventory); assertInventorySize(1, peerInventory);

View File

@ -105,7 +105,7 @@ public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry {
} }
for (long stream : streams) { for (long stream : streams) {
Set<NetworkAddress> nodes = stableNodes.get(stream); Set<NetworkAddress> nodes = stableNodes.get(stream);
if (nodes != null) { if (nodes != null && !nodes.isEmpty()) {
result.add(Collections.selectRandom(nodes)); result.add(Collections.selectRandom(nodes));
} }
} }