diff --git a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index b06cf9a..9e155ad 100644 --- a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -62,16 +62,16 @@ public class BitmessageContext { private final InternalContext ctx; private final Labeler labeler; - private final NetworkHandler.MessageListener networkListener; private final boolean sendPubkeyOnIdentityCreation; private BitmessageContext(Builder builder) { + if (builder.listener instanceof Listener.WithContext) { + ((Listener.WithContext) builder.listener).setContext(this); + } ctx = new InternalContext(builder); labeler = builder.labeler; ctx.getProofOfWorkService().doMissingProofOfWork(30_000); // TODO: this should be configurable - - networkListener = new DefaultMessageListener(ctx, labeler, builder.listener); sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation; } @@ -89,11 +89,11 @@ public class BitmessageContext { public BitmessageAddress createIdentity(boolean shorter, Feature... features) { final BitmessageAddress identity = new BitmessageAddress(new PrivateKey( - shorter, - ctx.getStreams()[0], - NETWORK_NONCE_TRIALS_PER_BYTE, - NETWORK_EXTRA_BYTES, - features + shorter, + ctx.getStreams()[0], + NETWORK_NONCE_TRIALS_PER_BYTE, + NETWORK_EXTRA_BYTES, + features )); ctx.getAddressRepository().save(identity); if (sendPubkeyOnIdentityCreation) { @@ -117,9 +117,9 @@ public class BitmessageContext { } public List createDeterministicAddresses( - String passphrase, int numberOfAddresses, long version, long stream, boolean shorter) { + String passphrase, int numberOfAddresses, long version, long stream, boolean shorter) { List result = BitmessageAddress.deterministic( - passphrase, numberOfAddresses, version, stream, shorter); + passphrase, numberOfAddresses, version, stream, shorter); for (int i = 0; i < result.size(); i++) { BitmessageAddress address = result.get(i); address.setAlias("deterministic (" + (i + 1) + ")"); @@ -130,9 +130,9 @@ public class BitmessageContext { public void broadcast(final BitmessageAddress from, final String subject, final String message) { Plaintext msg = new Plaintext.Builder(BROADCAST) - .from(from) - .message(subject, message) - .build(); + .from(from) + .message(subject, message) + .build(); send(msg); } @@ -141,10 +141,10 @@ public class BitmessageContext { throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key."); } Plaintext msg = new Plaintext.Builder(MSG) - .from(from) - .to(to) - .message(subject, message) - .build(); + .from(from) + .to(to) + .message(subject, message) + .build(); send(msg); } @@ -170,17 +170,17 @@ public class BitmessageContext { ctx.send(msg); } else { ctx.send( - msg.getFrom(), - to, - Factory.getBroadcast(msg), - msg.getTTL() + msg.getFrom(), + to, + Factory.getBroadcast(msg), + msg.getTTL() ); } } } public void startup() { - ctx.getNetworkHandler().start(networkListener); + ctx.getNetworkHandler().start(); } public void shutdown() { @@ -195,7 +195,7 @@ public class BitmessageContext { * @param wait waits for the synchronization thread to finish */ public void synchronize(InetAddress host, int port, long timeoutInSeconds, boolean wait) { - Future future = ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds); + Future future = ctx.getNetworkHandler().synchronize(host, port, timeoutInSeconds); if (wait) { try { future.get(); @@ -271,7 +271,7 @@ public class BitmessageContext { broadcast.decrypt(address); // This decrypts it twice, but on the other hand it doesn't try to decrypt the objects with // other subscriptions and the interface stays as simple as possible. - networkListener.receive(object); + ctx.getNetworkListener().receive(object); } catch (DecryptionFailedException ignore) { } catch (Exception e) { LOG.debug(e.getMessage(), e); @@ -281,8 +281,8 @@ public class BitmessageContext { public Property status() { return new Property("status", null, - ctx.getNetworkHandler().getNetworkStatus(), - new Property("unacknowledged", ctx.getMessageRepository().findMessagesToResend().size()) + ctx.getNetworkHandler().getNetworkStatus(), + new Property("unacknowledged", ctx.getMessageRepository().findMessagesToResend().size()) ); } @@ -296,6 +296,13 @@ public class BitmessageContext { public interface Listener { void receive(Plaintext plaintext); + + /** + * A message listener that needs a {@link BitmessageContext}, i.e. for implementing some sort of chat bot. + */ + interface WithContext extends Listener { + void setContext(BitmessageContext ctx); + } } public static final class Builder { @@ -429,7 +436,7 @@ public class BitmessageContext { @Override public MessagePayload handle(CustomMessage request) { throw new IllegalStateException( - "Received custom request, but no custom command handler configured."); + "Received custom request, but no custom command handler configured."); } }; } diff --git a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java index b5d70d1..b61f747 100644 --- a/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java +++ b/core/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java @@ -24,7 +24,6 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.exception.DecryptionFailedException; import ch.dissem.bitmessage.ports.Labeler; import ch.dissem.bitmessage.ports.NetworkHandler; -import ch.dissem.bitmessage.utils.TTL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,21 +31,24 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import static ch.dissem.bitmessage.entity.Plaintext.Status.*; -import static ch.dissem.bitmessage.utils.UnixTime.DAY; +import static ch.dissem.bitmessage.entity.Plaintext.Status.PUBKEY_REQUESTED; -class DefaultMessageListener implements NetworkHandler.MessageListener { +class DefaultMessageListener implements NetworkHandler.MessageListener, InternalContext.ContextHolder { private final static Logger LOG = LoggerFactory.getLogger(DefaultMessageListener.class); - private final InternalContext ctx; private final Labeler labeler; private final BitmessageContext.Listener listener; + private InternalContext ctx; - public DefaultMessageListener(InternalContext context, Labeler labeler, BitmessageContext.Listener listener) { - this.ctx = context; + public DefaultMessageListener(Labeler labeler, BitmessageContext.Listener listener) { this.labeler = labeler; this.listener = listener; } + @Override + public void setContext(InternalContext context) { + this.ctx = context; + } + @Override @SuppressWarnings("ConstantConditions") public void receive(ObjectMessage object) throws IOException { diff --git a/core/src/main/java/ch/dissem/bitmessage/InternalContext.java b/core/src/main/java/ch/dissem/bitmessage/InternalContext.java index 057bfef..e01a90a 100644 --- a/core/src/main/java/ch/dissem/bitmessage/InternalContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/InternalContext.java @@ -56,6 +56,7 @@ public class InternalContext { private final CustomCommandHandler customCommandHandler; private final ProofOfWorkService proofOfWorkService; private final Labeler labeler; + private final NetworkHandler.MessageListener networkListener; private final TreeSet streams = new TreeSet<>(); private final int port; @@ -79,6 +80,7 @@ public class InternalContext { this.connectionLimit = builder.connectionLimit; this.connectionTTL = builder.connectionTTL; this.labeler = builder.labeler; + this.networkListener = new DefaultMessageListener(labeler, builder.listener); Singleton.initialize(cryptography); @@ -94,7 +96,8 @@ public class InternalContext { } init(cryptography, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, - proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler); + proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine, customCommandHandler, builder.labeler, + networkListener); for (BitmessageAddress identity : addressRepository.getIdentities()) { streams.add(identity.getStream()); } @@ -148,6 +151,10 @@ public class InternalContext { return labeler; } + public NetworkHandler.MessageListener getNetworkListener() { + return networkListener; + } + public long[] getStreams() { long[] result = new long[streams.size()]; int i = 0; @@ -178,10 +185,10 @@ public class InternalContext { long expires = UnixTime.now(+timeToLive); LOG.info("Expires at " + expires); final ObjectMessage object = new ObjectMessage.Builder() - .stream(recipient.getStream()) - .expiresTime(expires) - .payload(payload) - .build(); + .stream(recipient.getStream()) + .expiresTime(expires) + .payload(payload) + .build(); if (object.isSigned()) { object.sign(from.getPrivateKey()); } @@ -201,10 +208,10 @@ public class InternalContext { long expires = UnixTime.now(TTL.pubkey()); LOG.info("Expires at " + expires); final ObjectMessage response = new ObjectMessage.Builder() - .stream(targetStream) - .expiresTime(expires) - .payload(identity.getPubkey()) - .build(); + .stream(targetStream) + .expiresTime(expires) + .payload(identity.getPubkey()) + .build(); response.sign(identity.getPrivateKey()); response.encrypt(cryptography.createPublicKey(identity.getPublicDecryptionKey())); // TODO: remember that the pubkey is just about to be sent, and on which stream! @@ -239,10 +246,10 @@ public class InternalContext { long expires = UnixTime.now(TTL.getpubkey()); LOG.info("Expires at " + expires); final ObjectMessage request = new ObjectMessage.Builder() - .stream(contact.getStream()) - .expiresTime(expires) - .payload(new GetPubkey(contact)) - .build(); + .stream(contact.getStream()) + .expiresTime(expires) + .payload(new GetPubkey(contact)) + .build(); proofOfWorkService.doProofOfWork(request); } diff --git a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java index e9a27db..7c72bea 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java +++ b/core/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java @@ -11,6 +11,7 @@ import ch.dissem.bitmessage.ports.ProofOfWorkRepository.Item; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -42,7 +43,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC for (byte[] initialHash : items) { Item item = powRepo.getItem(initialHash); cryptography.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes, - ProofOfWorkService.this); + ProofOfWorkService.this); } } }, delayInMilliseconds); @@ -71,7 +72,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC final ObjectMessage ack = plaintext.getAckMessage(); messageRepo.save(plaintext); Item item = new Item(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, - expirationTime, plaintext); + expirationTime, plaintext); powRepo.putObject(item); cryptography.doProofOfWork(ack, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES, this); } @@ -89,15 +90,20 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC ctx.getLabeler().markAsSent(plaintext); messageRepo.save(plaintext); } + try { + ctx.getNetworkListener().receive(object); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } ctx.getInventory().storeObject(object); ctx.getNetworkHandler().offer(object.getInventoryVector()); } else { item.message.getAckMessage().setNonce(nonce); final ObjectMessage object = new ObjectMessage.Builder() - .stream(item.message.getStream()) - .expiresTime(item.expirationTime) - .payload(new Msg(item.message)) - .build(); + .stream(item.message.getStream()) + .expiresTime(item.expirationTime) + .payload(new Msg(item.message)) + .build(); if (object.isSigned()) { object.sign(item.message.getFrom().getPrivateKey()); } diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index 9597625..e9c164e 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -41,7 +41,7 @@ public interface NetworkHandler { * An implementation should disconnect if either the timeout is reached or the returned thread is interrupted. *

*/ - Future synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds); + Future synchronize(InetAddress server, int port, long timeoutInSeconds); /** * Send a custom message to a specific node (that should implement handling for this message type) and returns @@ -57,7 +57,7 @@ public interface NetworkHandler { /** * Start a full network node, accepting incoming connections and relaying objects. */ - void start(MessageListener listener); + void start(); /** * Stop the full network node. diff --git a/core/src/test/java/ch/dissem/bitmessage/DefaultMessageListenerTest.java b/core/src/test/java/ch/dissem/bitmessage/DefaultMessageListenerTest.java index 170cb93..a9bbd8c 100644 --- a/core/src/test/java/ch/dissem/bitmessage/DefaultMessageListenerTest.java +++ b/core/src/test/java/ch/dissem/bitmessage/DefaultMessageListenerTest.java @@ -68,7 +68,7 @@ public class DefaultMessageListenerTest extends TestBase { when(ctx.getNetworkHandler()).thenReturn(networkHandler); when(ctx.getLabeler()).thenReturn(mock(Labeler.class)); - listener = new DefaultMessageListener(ctx, mock(Labeler.class), mock(BitmessageContext.Listener.class)); + listener = new DefaultMessageListener(mock(Labeler.class), mock(BitmessageContext.Listener.class)); } @Test diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java index 5d0f1c5..73dce63 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/AbstractConnection.java @@ -71,14 +71,13 @@ public abstract class AbstractConnection { public AbstractConnection(InternalContext context, Mode mode, NetworkAddress node, - NetworkHandler.MessageListener listener, Set commonRequestedObjects, long syncTimeout) { this.ctx = context; this.mode = mode; this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.node = node; - this.listener = listener; + this.listener = context.getNetworkListener(); this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap(10_000)); this.ivCache = new ConcurrentHashMap<>(); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index e2c1856..64772e5 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -63,29 +63,29 @@ class Connection extends AbstractConnection { private OutputStream out; private boolean socketInitialized; - public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, + public Connection(InternalContext context, Mode mode, Socket socket, Set requestedObjectsMap) throws IOException { - this(context, mode, listener, socket, requestedObjectsMap, + this(context, mode, socket, requestedObjectsMap, new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), 0); } - public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, + public Connection(InternalContext context, Mode mode, NetworkAddress node, Set requestedObjectsMap) { - this(context, mode, listener, new Socket(), requestedObjectsMap, + this(context, mode, new Socket(), requestedObjectsMap, node, 0); } - private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, + private Connection(InternalContext context, Mode mode, Socket socket, Set commonRequestedObjects, NetworkAddress node, long syncTimeout) { - super(context, mode, node, listener, commonRequestedObjects, syncTimeout); + super(context, mode, node, commonRequestedObjects, syncTimeout); this.startTime = UnixTime.now(); this.socket = socket; } public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, long timeoutInSeconds) throws IOException { - return new Connection(ctx, SYNC, listener, new Socket(address, port), + return new Connection(ctx, SYNC, new Socket(address, port), new HashSet(), new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), timeoutInSeconds); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java index 5c976e2..42021f6 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java @@ -42,11 +42,10 @@ public class ConnectionOrganizer implements Runnable { private Connection initialConnection; public ConnectionOrganizer(InternalContext ctx, - DefaultNetworkHandler networkHandler, - NetworkHandler.MessageListener listener) { + DefaultNetworkHandler networkHandler) { this.ctx = ctx; this.networkHandler = networkHandler; - this.listener = listener; + this.listener = ctx.getNetworkListener(); } @Override @@ -91,8 +90,7 @@ public class ConnectionOrganizer implements Runnable { NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); boolean first = active == 0 && initialConnection == null; for (NetworkAddress address : addresses) { - Connection c = new Connection(ctx, CLIENT, address, listener, - networkHandler.requestedObjects); + Connection c = new Connection(ctx, CLIENT, address, networkHandler.requestedObjects); if (first) { initialConnection = c; first = false; diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index f5bfd47..762d1be 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -64,9 +64,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { } @Override - public Future synchronize(InetAddress server, int port, MessageListener listener, long timeoutInSeconds) { + public Future synchronize(InetAddress server, int port, long timeoutInSeconds) { try { - Connection connection = Connection.sync(ctx, server, port, listener, timeoutInSeconds); + Connection connection = Connection.sync(ctx, server, port, ctx.getNetworkListener(), timeoutInSeconds); Future reader = pool.submit(connection.getReader()); pool.execute(connection.getWriter()); return reader; @@ -97,19 +97,16 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { } @Override - public void start(final MessageListener listener) { - if (listener == null) { - throw new IllegalStateException("Listener must be set at start"); - } + public void start() { if (running) { throw new IllegalStateException("Network already running - you need to stop first."); } try { running = true; connections.clear(); - server = new ServerRunnable(ctx, this, listener); + server = new ServerRunnable(ctx, this); pool.execute(server); - pool.execute(new ConnectionOrganizer(ctx, this, listener)); + pool.execute(new ConnectionOrganizer(ctx, this)); } catch (IOException e) { throw new ApplicationException(e); } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java index 99b6a88..3c67b95 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java @@ -38,11 +38,10 @@ public class ServerRunnable implements Runnable, Closeable { private final DefaultNetworkHandler networkHandler; private final NetworkHandler.MessageListener listener; - public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, - NetworkHandler.MessageListener listener) throws IOException { + public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler) throws IOException { this.ctx = ctx; this.networkHandler = networkHandler; - this.listener = listener; + this.listener = ctx.getNetworkListener(); this.serverSocket = new ServerSocket(ctx.getPort()); } @@ -52,8 +51,7 @@ public class ServerRunnable implements Runnable, Closeable { try { Socket socket = serverSocket.accept(); socket.setSoTimeout(Connection.READ_TIMEOUT); - networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener, - networkHandler.requestedObjects)); + networkHandler.startConnection(new Connection(ctx, SERVER, socket, networkHandler.requestedObjects)); } catch (IOException e) { LOG.debug(e.getMessage(), e); } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java index 3e53eb4..2e24883 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/ConnectionInfo.java @@ -17,13 +17,15 @@ package ch.dissem.bitmessage.networking.nio; import ch.dissem.bitmessage.InternalContext; -import ch.dissem.bitmessage.entity.*; +import ch.dissem.bitmessage.entity.GetData; +import ch.dissem.bitmessage.entity.MessagePayload; +import ch.dissem.bitmessage.entity.NetworkMessage; +import ch.dissem.bitmessage.entity.Version; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.networking.AbstractConnection; -import ch.dissem.bitmessage.ports.NetworkHandler; import java.nio.ByteBuffer; import java.util.Iterator; @@ -43,10 +45,9 @@ public class ConnectionInfo extends AbstractConnection { private boolean syncFinished; private long lastUpdate = System.currentTimeMillis(); - public ConnectionInfo(InternalContext context, Mode mode, - NetworkAddress node, NetworkHandler.MessageListener listener, + public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, Set commonRequestedObjects, long syncTimeout) { - super(context, mode, node, listener, commonRequestedObjects, syncTimeout); + super(context, mode, node, commonRequestedObjects, syncTimeout); headerOut.flip(); if (mode == CLIENT || mode == SYNC) { send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build()); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java index a17f378..6baa68b 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/nio/NioNetworkHandler.java @@ -26,7 +26,7 @@ import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.V3MessageReader; import ch.dissem.bitmessage.ports.NetworkHandler; -import ch.dissem.bitmessage.utils.*; +import ch.dissem.bitmessage.utils.Property; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +37,6 @@ import java.net.NoRouteToHostException; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.Collections; import java.util.concurrent.*; import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.*; @@ -47,6 +46,7 @@ import static ch.dissem.bitmessage.utils.Collections.selectRandom; import static ch.dissem.bitmessage.utils.DebugUtils.inc; import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.nio.channels.SelectionKey.*; +import static java.util.Collections.newSetFromMap; /** * Network handler using java.nio, resulting in less threads. @@ -63,13 +63,14 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex private InternalContext ctx; private Selector selector; private ServerSocketChannel serverChannel; + private Queue connectionQueue = new ConcurrentLinkedQueue<>(); private Map connections = new ConcurrentHashMap<>(); - private final Set requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap(10_000)); + private final Set requestedObjects = newSetFromMap(new ConcurrentHashMap(10_000)); private Thread starter; @Override - public Future synchronize(final InetAddress server, final int port, final MessageListener listener, final long timeoutInSeconds) { + public Future synchronize(final InetAddress server, final int port, final long timeoutInSeconds) { return threadPool.submit(new Callable() { @Override public Void call() throws Exception { @@ -77,7 +78,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex channel.configureBlocking(false); ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), - listener, new HashSet(), timeoutInSeconds); + new HashSet(), timeoutInSeconds); while (channel.isConnected() && !connection.isSyncFinished()) { write(channel, connection); read(channel, connection); @@ -135,10 +136,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } @Override - public void start(final MessageListener listener) { - if (listener == null) { - throw new IllegalStateException("Listener must be set at start"); - } + public void start() { if (selector != null && selector.isOpen()) { throw new IllegalStateException("Network already running - you need to stop first."); } @@ -147,42 +145,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } catch (IOException e) { throw new ApplicationException(e); } - thread("connection listener", new Runnable() { - @Override - public void run() { - try { - serverChannel = ServerSocketChannel.open(); - serverChannel.socket().bind(new InetSocketAddress(ctx.getPort())); - while (selector.isOpen() && serverChannel.isOpen()) { - try { - SocketChannel accepted = serverChannel.accept(); - accepted.configureBlocking(false); - ConnectionInfo connection = new ConnectionInfo(ctx, SERVER, - new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), - listener, - requestedObjects, 0 - ); - connections.put( - connection, - accepted.register(selector, OP_READ | OP_WRITE, connection) - ); - } catch (AsynchronousCloseException ignore) { - LOG.trace(ignore.getMessage()); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - } catch (ClosedSelectorException | AsynchronousCloseException ignore) { - } catch (IOException e) { - throw new ApplicationException(e); - } catch (RuntimeException e) { - e.printStackTrace(); - throw e; - } - } - }); - starter = thread("connection starter", new Runnable() { + starter = thread("connection manager", new Runnable() { @Override public void run() { while (selector.isOpen()) { @@ -197,34 +161,8 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex List addresses = ctx.getNodeRegistry().getKnownAddresses(100, ctx.getStreams()); addresses = selectRandom(missing, addresses); for (NetworkAddress address : addresses) { - if (isConnectedTo(address)) { - continue; - } - try { - SocketChannel channel = SocketChannel.open(); - channel.configureBlocking(false); - channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort())); - ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, - address, - listener, - requestedObjects, 0 - ); - connections.put( - connection, - channel.register(selector, OP_CONNECT, connection) - ); - } catch (NoRouteToHostException ignore) { - // We'll try to connect to many offline nodes, so - // this is expected to happen quite a lot. - } catch (AsynchronousCloseException e) { - // The exception is expected if the network is being - // shut down, as we actually do asynchronously close - // the connections. - if (isRunning()) { - LOG.error(e.getMessage(), e); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); + if (!isConnectedTo(address)) { + connectionQueue.offer(address); } } } @@ -252,17 +190,47 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } }); - thread("processor", new Runnable() { + thread("selector worker", new Runnable() { @Override public void run() { try { + serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + serverChannel.socket().bind(new InetSocketAddress(ctx.getPort())); + serverChannel.register(selector, OP_ACCEPT, null); + while (selector.isOpen()) { selector.select(1000); Iterator keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); - if (key.attachment() instanceof ConnectionInfo) { + if (key.attachment() == null) { + try { + if (key.isAcceptable()) { + // handle accept + try { + SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept(); + accepted.configureBlocking(false); + ConnectionInfo connection = new ConnectionInfo(ctx, SERVER, + new NetworkAddress.Builder().address(accepted.getRemoteAddress()).stream(1).build(), + requestedObjects, 0 + ); + connections.put( + connection, + accepted.register(selector, OP_READ | OP_WRITE, connection) + ); + } catch (AsynchronousCloseException e) { + LOG.trace(e.getMessage()); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } catch (CancelledKeyException e) { + LOG.error(e.getMessage(), e); + } + } else { + // handle read/write SocketChannel channel = (SocketChannel) key.channel(); ConnectionInfo connection = (ConnectionInfo) key.attachment(); try { @@ -290,6 +258,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex } } } + // set interest ops for (Map.Entry e : connections.entrySet()) { if (e.getValue().isValid() && (e.getValue().interestOps() & OP_WRITE) == 0 @@ -298,6 +267,35 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex e.getValue().interestOps(OP_READ | OP_WRITE); } } + // start new connections + if (!connectionQueue.isEmpty()) { + NetworkAddress address = connectionQueue.poll(); + try { + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort())); + ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT, + address, + requestedObjects, 0 + ); + connections.put( + connection, + channel.register(selector, OP_CONNECT, connection) + ); + } catch (NoRouteToHostException ignore) { + // We'll try to connect to many offline nodes, so + // this is expected to happen quite a lot. + } catch (AsynchronousCloseException e) { + // The exception is expected if the network is being + // shut down, as we actually do asynchronously close + // the connections. + if (isRunning()) { + LOG.error(e.getMessage(), e); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } } selector.close(); } catch (ClosedSelectorException ignore) { @@ -387,7 +385,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex distribution.put(connection, new LinkedList()); } } - if (distribution.isEmpty()){ + if (distribution.isEmpty()) { return; } InventoryVector next = iterator.next(); diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index ba24bbd..48c8cd1 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -230,9 +230,7 @@ public class NetworkHandlerTest { "V4Pubkey.payload" ); - Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), - mock(NetworkHandler.MessageListener.class), - 10); + Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10); future.get(); assertInventorySize(3, nodeInventory); assertInventorySize(3, peerInventory); @@ -247,9 +245,7 @@ public class NetworkHandlerTest { nodeInventory.init(); - Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), - mock(NetworkHandler.MessageListener.class), - 10); + Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10); future.get(); assertInventorySize(2, nodeInventory); assertInventorySize(2, peerInventory); @@ -263,9 +259,7 @@ public class NetworkHandlerTest { "V1Msg.payload" ); - Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), - mock(NetworkHandler.MessageListener.class), - 10); + Future future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10); future.get(); assertInventorySize(1, nodeInventory); assertInventorySize(1, peerInventory); diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java index ae5c432..07d343a 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcNodeRegistry.java @@ -105,7 +105,7 @@ public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { } for (long stream : streams) { Set nodes = stableNodes.get(stream); - if (nodes != null) { + if (nodes != null && !nodes.isEmpty()) { result.add(Collections.selectRandom(nodes)); } }