From 511b3c1754d7dda5a584756987f1d25338ec8fc3 Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Wed, 14 Oct 2015 18:37:43 +0200 Subject: [PATCH] Connections now use two separate threads for writing and listening - this should avoid dead locks, specifically when connecting to Jabit :/ - also, Java 8 features are now allowed in modules not needed by Android clients --- demo/build.gradle | 2 + .../dissem/bitmessage/demo/Application.java | 2 +- .../ch/dissem/bitmessage/factory/Factory.java | 3 +- .../ch/dissem/bitmessage/ports/Inventory.java | 15 ++ domain/src/main/resources/nodes.txt | 1 - .../bitmessage/networking/Connection.java | 234 ++++++++++-------- .../networking/DefaultNetworkHandler.java | 95 +++---- repositories/build.gradle | 2 + .../bitmessage/repository/JdbcInventory.java | 72 +++--- 9 files changed, 246 insertions(+), 180 deletions(-) diff --git a/demo/build.gradle b/demo/build.gradle index b7f621d..e4c51a7 100644 --- a/demo/build.gradle +++ b/demo/build.gradle @@ -14,6 +14,8 @@ uploadArchives { } } +sourceCompatibility = 1.8 + task fatCapsule(type: FatCapsule) { applicationClass 'ch.dissem.bitmessage.demo.Main' } diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index 7644ed8..a065de9 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -52,7 +52,7 @@ public class Application { .messageRepo(new JdbcMessageRepository(jdbcConfig)) .networkHandler(new DefaultNetworkHandler()) .security(new BouncySecurity()) -// .port(48444) + .port(48444) .listener(new BitmessageContext.Listener() { @Override public void receive(Plaintext plaintext) { diff --git a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java index 5584ec7..33604ab 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/factory/Factory.java @@ -23,7 +23,6 @@ import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.valueobject.PrivateKey; import ch.dissem.bitmessage.exception.NodeException; -import ch.dissem.bitmessage.ports.Security; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,7 +154,7 @@ public class Factory { } } // fallback: just store the message - we don't really care what it is -// LOG.info("Unexpected object type: " + objectType); + LOG.trace("Unexpected object type: " + objectType); return GenericPayload.read(version, stream, streamNumber, length); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java index a10d50f..6af65c1 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java @@ -26,17 +26,32 @@ import java.util.List; * The Inventory stores and retrieves objects, cleans up outdated objects and can tell which objects are still missing. */ public interface Inventory { + /** + * Returns the IVs of all valid objects we have for the given streams + */ List getInventory(long... streams); + /** + * Returns the IVs of all objects in the offer that we don't have already. Implementations are allowed to + * ignore the streams parameter, but it must be set when calling this method. + */ List getMissing(List offer, long... streams); ObjectMessage getObject(InventoryVector vector); + /** + * This method is mainly used to search for public keys to newly added addresses or broadcasts from new + * subscriptions. + */ List getObjects(long stream, long version, ObjectType... types); void storeObject(ObjectMessage object); boolean contains(ObjectMessage object); + /** + * Deletes all objects that expired 5 minutes ago or earlier + * (so we don't accidentally request objects we just deleted) + */ void cleanup(); } diff --git a/domain/src/main/resources/nodes.txt b/domain/src/main/resources/nodes.txt index ab2b69c..e0a334e 100644 --- a/domain/src/main/resources/nodes.txt +++ b/domain/src/main/resources/nodes.txt @@ -12,7 +12,6 @@ 109.147.204.113:1195 178.11.46.221:8444 -# Add named nodes at the end, as resolving them might take time dissem.ch:8444 [stream 2] diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 6aa4210..6312557 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -49,7 +49,7 @@ import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; /** * A connection to a specific node */ -public class Connection implements Runnable { +public class Connection { public static final int READ_TIMEOUT = 2000; private final static Logger LOG = LoggerFactory.getLogger(Connection.class); private static final int CONNECT_TIMEOUT = 5000; @@ -63,13 +63,16 @@ public class Connection implements Runnable { private final Queue sendingQueue = new ConcurrentLinkedDeque<>(); private final Map requestedObjects; private final long syncTimeout; + private final ReaderRunnable reader = new ReaderRunnable(); + private final WriterRunnable writer = new WriterRunnable(); - private State state; + private volatile State state; private InputStream in; private OutputStream out; private int version; private long[] streams; private int readTimeoutCounter; + private boolean socketInitialized; public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, ConcurrentMap requestedObjectsMap) throws IOException { @@ -118,86 +121,6 @@ public class Connection implements Runnable { return node; } - @Override - public void run() { - try (Socket socket = this.socket) { - if (!socket.isConnected()) { - LOG.debug("Trying to connect to node " + node); - socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); - } - socket.setSoTimeout(READ_TIMEOUT); - this.in = socket.getInputStream(); - this.out = socket.getOutputStream(); - if (mode == CLIENT) { - send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); - } - while (state != DISCONNECTED) { - try { - NetworkMessage msg = Factory.getNetworkMessage(version, in); - if (msg == null) - continue; - switch (state) { - case ACTIVE: - receiveMessage(msg.getPayload()); - sendQueue(); - break; - - default: - switch (msg.getPayload().getCommand()) { - case VERSION: - Version payload = (Version) msg.getPayload(); - if (payload.getNonce() == ctx.getClientNonce()) { - LOG.info("Tried to connect to self, disconnecting."); - disconnect(); - } else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) { - this.version = payload.getVersion(); - this.streams = payload.getStreams(); - send(new VerAck()); - switch (mode) { - case SERVER: - send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); - break; - case CLIENT: - activateConnection(); - break; - } - } else { - LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting."); - disconnect(); - } - break; - case VERACK: - switch (mode) { - case SERVER: - activateConnection(); - break; - case CLIENT: - // NO OP - break; - } - break; - default: - throw new NodeException("Command 'version' or 'verack' expected, but was '" - + msg.getPayload().getCommand() + "'"); - } - } - if (socket.isClosed() || syncFinished(msg)) disconnect(); - } catch (SocketTimeoutException ignore) { - if (state == ACTIVE) { - sendQueue(); - if (syncFinished(null)) disconnect(); - } - } - } - } catch (IOException | NodeException e) { - disconnect(); - LOG.debug("Disconnected from node " + node + ": " + e.getMessage()); - } catch (RuntimeException e) { - disconnect(); - throw e; - } - } - @SuppressWarnings("RedundantIfStatement") private boolean syncFinished(NetworkMessage msg) { if (syncTimeout == 0 || state != ACTIVE) { @@ -229,15 +152,6 @@ public class Connection implements Runnable { ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node)); } - private void sendQueue() { - if (sendingQueue.size() > 0) { - LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node); - } - for (MessagePayload msg = sendingQueue.poll(); msg != null; msg = sendingQueue.poll()) { - send(msg); - } - } - private void cleanupIvCache() { Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); for (Map.Entry entry : ivCache.entrySet()) { @@ -310,10 +224,12 @@ public class Connection implements Runnable { case OBJECT: ObjectMessage objectMessage = (ObjectMessage) messagePayload; try { + if (ctx.getInventory().contains(objectMessage)) { + LOG.debug("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); + break; + } LOG.debug("Received object " + objectMessage.getInventoryVector()); security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); - if (ctx.getInventory().contains(objectMessage)) - break; listener.receive(objectMessage); ctx.getInventory().storeObject(objectMessage); // offer object to some random nodes so it gets distributed throughout the network: @@ -392,14 +308,136 @@ public class Connection implements Runnable { return Objects.hash(node); } - public void request(InventoryVector key) { - sendingQueue.offer(new GetData.Builder() - .addInventoryVector(key) - .build() - ); + private synchronized void initSocket(Socket socket) throws IOException { + if (!socketInitialized) { + if (!socket.isConnected()) { + LOG.debug("Trying to connect to node " + node); + socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); + } + socket.setSoTimeout(READ_TIMEOUT); + in = socket.getInputStream(); + out = socket.getOutputStream(); + if (!socket.isConnected()) { + LOG.debug("Trying to connect to node " + node); + socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); + } + socket.setSoTimeout(READ_TIMEOUT); + in = socket.getInputStream(); + out = socket.getOutputStream(); + socketInitialized = true; + } + } + + public ReaderRunnable getReader() { + return reader; + } + + public WriterRunnable getWriter() { + return writer; } public enum Mode {SERVER, CLIENT} public enum State {CONNECTING, ACTIVE, DISCONNECTED} + + public class ReaderRunnable implements Runnable { + @Override + public void run() { + try (Socket socket = Connection.this.socket) { + initSocket(socket); + if (mode == CLIENT) { + send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + } + while (state != DISCONNECTED) { + try { + NetworkMessage msg = Factory.getNetworkMessage(version, in); + if (msg == null) + continue; + switch (state) { + case ACTIVE: + receiveMessage(msg.getPayload()); + break; + + default: + switch (msg.getPayload().getCommand()) { + case VERSION: + Version payload = (Version) msg.getPayload(); + if (payload.getNonce() == ctx.getClientNonce()) { + LOG.info("Tried to connect to self, disconnecting."); + disconnect(); + } else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) { + version = payload.getVersion(); + streams = payload.getStreams(); + send(new VerAck()); + switch (mode) { + case SERVER: + send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); + break; + case CLIENT: + activateConnection(); + break; + } + } else { + LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting."); + disconnect(); + } + break; + case VERACK: + switch (mode) { + case SERVER: + activateConnection(); + break; + case CLIENT: + // NO OP + break; + } + break; + default: + throw new NodeException("Command 'version' or 'verack' expected, but was '" + + msg.getPayload().getCommand() + "'"); + } + } + if (socket.isClosed() || syncFinished(msg)) disconnect(); + } catch (SocketTimeoutException ignore) { + if (state == ACTIVE) { + if (syncFinished(null)) disconnect(); + } + } + Thread.yield(); + } + } catch (IOException | NodeException e) { + disconnect(); + LOG.debug("Reader disconnected from node " + node + ": " + e.getMessage()); + } catch (RuntimeException e) { + LOG.debug("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e); + disconnect(); + } finally { + try { + socket.close(); + } catch (Exception e) { + LOG.debug(e.getMessage(), e); + } + } + } + } + + public class WriterRunnable implements Runnable { + @Override + public void run() { + try (Socket socket = Connection.this.socket) { + initSocket(socket); + while (state != DISCONNECTED) { + if (sendingQueue.size() > 0) { + LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node); + send(sendingQueue.poll()); + } else { + Thread.sleep(100); + } + } + } catch (IOException | InterruptedException e) { + LOG.debug("Writer disconnected from node " + node + ": " + e.getMessage()); + disconnect(); + } + } + } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index 69e9e16..e563152 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -49,14 +49,17 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { public final static int NETWORK_MAGIC_NUMBER = 8; private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class); private final List connections = new LinkedList<>(); + private final ExecutorService pool; private InternalContext ctx; private ServerSocket serverSocket; - private ExecutorService pool; - private Thread serverThread; - private Thread connectionManager; + private volatile boolean running; private ConcurrentMap requestedObjects = new ConcurrentHashMap<>(); + public DefaultNetworkHandler() { + pool = Executors.newCachedThreadPool(); + } + @Override public void setContext(InternalContext context) { this.ctx = context; @@ -65,9 +68,12 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { @Override public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) { try { - Thread t = new Thread(Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds)); - t.start(); - return t; + Connection connection = Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds); + Thread tr = new Thread(connection.getReader()); + Thread tw = new Thread(connection.getWriter()); + tr.start(); + tw.start(); + return tr; } catch (IOException e) { throw new RuntimeException(e); } @@ -78,14 +84,14 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { if (listener == null) { throw new IllegalStateException("Listener must be set at start"); } - if (pool != null && !pool.isShutdown()) { + if (running) { throw new IllegalStateException("Network already running - you need to stop first."); } try { - pool = Executors.newCachedThreadPool(); + running = true; connections.clear(); serverSocket = new ServerSocket(ctx.getPort()); - serverThread = new Thread(new Runnable() { + pool.execute(new Runnable() { @Override public void run() { while (!serverSocket.isClosed()) { @@ -98,44 +104,46 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { } } } - }, "server"); - serverThread.start(); - connectionManager = new Thread(new Runnable() { + }); + pool.execute(new Runnable() { @Override public void run() { - while (!Thread.interrupted()) { - try { - int active = 0; - synchronized (connections) { - for (Iterator iterator = connections.iterator(); iterator.hasNext(); ) { - Connection c = iterator.next(); - if (c.getState() == DISCONNECTED) { - // Remove the current element from the iterator and the list. - iterator.remove(); - } - if (c.getState() == ACTIVE) { - active++; + try { + while (running) { + try { + int active = 0; + synchronized (connections) { + for (Iterator iterator = connections.iterator(); iterator.hasNext(); ) { + Connection c = iterator.next(); + if (c.getState() == DISCONNECTED) { + // Remove the current element from the iterator and the list. + iterator.remove(); + } + if (c.getState() == ACTIVE) { + active++; + } } } - } - if (active < NETWORK_MAGIC_NUMBER) { - List addresses = ctx.getNodeRegistry().getKnownAddresses( - NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); - for (NetworkAddress address : addresses) { - startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects)); + if (active < NETWORK_MAGIC_NUMBER) { + List addresses = ctx.getNodeRegistry().getKnownAddresses( + NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); + for (NetworkAddress address : addresses) { + startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects)); + } } + Thread.sleep(30000); + } catch (InterruptedException e) { + running = false; + } catch (Exception e) { + LOG.error("Error in connection manager. Ignored.", e); } - Thread.sleep(30000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOG.error("Error in connection manager. Ignored.", e); } + } finally { + LOG.debug("Connection manager shutting down."); + running = false; } - LOG.debug("Connection manager shutting down."); } - }, "connection-manager"); - connectionManager.start(); + }); } catch (IOException e) { throw new RuntimeException(e); } @@ -143,18 +151,17 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { @Override public boolean isRunning() { - return connectionManager != null && connectionManager.isAlive(); + return running; } @Override public void stop() { - connectionManager.interrupt(); + running = false; try { serverSocket.close(); } catch (IOException e) { LOG.debug(e.getMessage(), e); } - pool.shutdown(); synchronized (connections) { for (Connection c : connections) { c.disconnect(); @@ -170,7 +177,8 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { } connections.add(c); } - pool.execute(c); + pool.execute(c.getReader()); + pool.execute(c.getWriter()); } @Override @@ -222,8 +230,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { i++; } return new Property("network", null, - new Property("connectionManager", - connectionManager != null && connectionManager.isAlive() ? "running" : "stopped"), + new Property("connectionManager", running ? "running" : "stopped"), new Property("connections", null, streamProperties) ); } diff --git a/repositories/build.gradle b/repositories/build.gradle index 7fc6f25..fecebce 100644 --- a/repositories/build.gradle +++ b/repositories/build.gradle @@ -10,6 +10,8 @@ uploadArchives { } } +sourceCompatibility = 1.8 + dependencies { compile project(':domain') compile 'org.flywaydb:flyway-core:3.2.1' diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcInventory.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcInventory.java index 4adc532..9c9c9e7 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcInventory.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcInventory.java @@ -27,12 +27,17 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import static ch.dissem.bitmessage.utils.UnixTime.MINUTE; import static ch.dissem.bitmessage.utils.UnixTime.now; public class JdbcInventory extends JdbcHelper implements Inventory { private static final Logger LOG = LoggerFactory.getLogger(JdbcInventory.class); + private final Map> cache = new ConcurrentHashMap<>(); + public JdbcInventory(JdbcConfig config) { super(config); } @@ -40,36 +45,43 @@ public class JdbcInventory extends JdbcHelper implements Inventory { @Override public List getInventory(long... streams) { List result = new LinkedList<>(); - try (Connection connection = config.getConnection()) { - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE expires > " + now() + - " AND stream IN (" + join(streams) + ")"); - while (rs.next()) { - result.add(new InventoryVector(rs.getBytes("hash"))); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); + for (long stream : streams) { + getCache(stream).entrySet().stream() + .filter(e -> e.getValue() > now()) + .forEach(e -> result.add(e.getKey())); } return result; } - private List getFullInventory(long... streams) { - List result = new LinkedList<>(); - try (Connection connection = config.getConnection()) { - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE stream IN (" + join(streams) + ")"); - while (rs.next()) { - result.add(new InventoryVector(rs.getBytes("hash"))); + private Map getCache(long stream) { + Map result = cache.get(stream); + if (result == null) { + synchronized (cache) { + if (cache.get(stream) == null) { + result = new ConcurrentHashMap<>(); + cache.put(stream, result); + + try (Connection connection = config.getConnection()) { + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT hash, expires FROM Inventory WHERE expires > " + + now(-5 * MINUTE) + " AND stream = " + stream); + while (rs.next()) { + result.put(new InventoryVector(rs.getBytes("hash")), rs.getLong("expires")); + } + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + } } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); } return result; } @Override public List getMissing(List offer, long... streams) { - offer.removeAll(getFullInventory(streams)); + for (long stream : streams) { + getCache(stream).forEach((iv, t) -> offer.remove(iv)); + } return offer; } @@ -131,6 +143,7 @@ public class JdbcInventory extends JdbcHelper implements Inventory { ps.setLong(5, object.getType()); ps.setLong(6, object.getVersion()); ps.executeUpdate(); + getCache(object.getStream()).put(iv, object.getExpiresTime()); } catch (SQLException e) { LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); } catch (Exception e) { @@ -140,28 +153,19 @@ public class JdbcInventory extends JdbcHelper implements Inventory { @Override public boolean contains(ObjectMessage object) { - try (Connection connection = config.getConnection()) { - Statement stmt = connection.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT count(1) FROM Inventory WHERE hash = X'" - + object.getInventoryVector() + "'"); - if (rs.next()) { - return rs.getInt(1) > 0; - } else { - throw new RuntimeException("Couldn't query if inventory contains " + object.getInventoryVector()); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e); - } + return getCache(object.getStream()).entrySet().stream() + .anyMatch(x -> x.getKey().equals(object.getInventoryVector())); } @Override public void cleanup() { try (Connection connection = config.getConnection()) { - // We delete only objects that expired 5 minutes ago or earlier, so we don't request objects we just deleted - connection.createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300)); + connection.createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + now(-5 * MINUTE)); } catch (SQLException e) { LOG.debug(e.getMessage(), e); } + for (Map c : cache.values()) { + c.entrySet().removeIf(e -> e.getValue() < now(-5 * MINUTE)); + } } }