Connections now use two separate threads for writing and listening
- this should avoid dead locks, specifically when connecting to Jabit :/ - also, Java 8 features are now allowed in modules not needed by Android clients
This commit is contained in:
@ -49,7 +49,7 @@ import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
|
||||
/**
|
||||
* A connection to a specific node
|
||||
*/
|
||||
public class Connection implements Runnable {
|
||||
public class Connection {
|
||||
public static final int READ_TIMEOUT = 2000;
|
||||
private final static Logger LOG = LoggerFactory.getLogger(Connection.class);
|
||||
private static final int CONNECT_TIMEOUT = 5000;
|
||||
@ -63,13 +63,16 @@ public class Connection implements Runnable {
|
||||
private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
|
||||
private final Map<InventoryVector, Long> requestedObjects;
|
||||
private final long syncTimeout;
|
||||
private final ReaderRunnable reader = new ReaderRunnable();
|
||||
private final WriterRunnable writer = new WriterRunnable();
|
||||
|
||||
private State state;
|
||||
private volatile State state;
|
||||
private InputStream in;
|
||||
private OutputStream out;
|
||||
private int version;
|
||||
private long[] streams;
|
||||
private int readTimeoutCounter;
|
||||
private boolean socketInitialized;
|
||||
|
||||
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
|
||||
ConcurrentMap<InventoryVector, Long> requestedObjectsMap) throws IOException {
|
||||
@ -118,86 +121,6 @@ public class Connection implements Runnable {
|
||||
return node;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (Socket socket = this.socket) {
|
||||
if (!socket.isConnected()) {
|
||||
LOG.debug("Trying to connect to node " + node);
|
||||
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
|
||||
}
|
||||
socket.setSoTimeout(READ_TIMEOUT);
|
||||
this.in = socket.getInputStream();
|
||||
this.out = socket.getOutputStream();
|
||||
if (mode == CLIENT) {
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
}
|
||||
while (state != DISCONNECTED) {
|
||||
try {
|
||||
NetworkMessage msg = Factory.getNetworkMessage(version, in);
|
||||
if (msg == null)
|
||||
continue;
|
||||
switch (state) {
|
||||
case ACTIVE:
|
||||
receiveMessage(msg.getPayload());
|
||||
sendQueue();
|
||||
break;
|
||||
|
||||
default:
|
||||
switch (msg.getPayload().getCommand()) {
|
||||
case VERSION:
|
||||
Version payload = (Version) msg.getPayload();
|
||||
if (payload.getNonce() == ctx.getClientNonce()) {
|
||||
LOG.info("Tried to connect to self, disconnecting.");
|
||||
disconnect();
|
||||
} else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) {
|
||||
this.version = payload.getVersion();
|
||||
this.streams = payload.getStreams();
|
||||
send(new VerAck());
|
||||
switch (mode) {
|
||||
case SERVER:
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
break;
|
||||
case CLIENT:
|
||||
activateConnection();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
|
||||
disconnect();
|
||||
}
|
||||
break;
|
||||
case VERACK:
|
||||
switch (mode) {
|
||||
case SERVER:
|
||||
activateConnection();
|
||||
break;
|
||||
case CLIENT:
|
||||
// NO OP
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new NodeException("Command 'version' or 'verack' expected, but was '"
|
||||
+ msg.getPayload().getCommand() + "'");
|
||||
}
|
||||
}
|
||||
if (socket.isClosed() || syncFinished(msg)) disconnect();
|
||||
} catch (SocketTimeoutException ignore) {
|
||||
if (state == ACTIVE) {
|
||||
sendQueue();
|
||||
if (syncFinished(null)) disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException | NodeException e) {
|
||||
disconnect();
|
||||
LOG.debug("Disconnected from node " + node + ": " + e.getMessage());
|
||||
} catch (RuntimeException e) {
|
||||
disconnect();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("RedundantIfStatement")
|
||||
private boolean syncFinished(NetworkMessage msg) {
|
||||
if (syncTimeout == 0 || state != ACTIVE) {
|
||||
@ -229,15 +152,6 @@ public class Connection implements Runnable {
|
||||
ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
|
||||
}
|
||||
|
||||
private void sendQueue() {
|
||||
if (sendingQueue.size() > 0) {
|
||||
LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node);
|
||||
}
|
||||
for (MessagePayload msg = sendingQueue.poll(); msg != null; msg = sendingQueue.poll()) {
|
||||
send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupIvCache() {
|
||||
Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE);
|
||||
for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) {
|
||||
@ -310,10 +224,12 @@ public class Connection implements Runnable {
|
||||
case OBJECT:
|
||||
ObjectMessage objectMessage = (ObjectMessage) messagePayload;
|
||||
try {
|
||||
if (ctx.getInventory().contains(objectMessage)) {
|
||||
LOG.debug("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
|
||||
break;
|
||||
}
|
||||
LOG.debug("Received object " + objectMessage.getInventoryVector());
|
||||
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
|
||||
if (ctx.getInventory().contains(objectMessage))
|
||||
break;
|
||||
listener.receive(objectMessage);
|
||||
ctx.getInventory().storeObject(objectMessage);
|
||||
// offer object to some random nodes so it gets distributed throughout the network:
|
||||
@ -392,14 +308,136 @@ public class Connection implements Runnable {
|
||||
return Objects.hash(node);
|
||||
}
|
||||
|
||||
public void request(InventoryVector key) {
|
||||
sendingQueue.offer(new GetData.Builder()
|
||||
.addInventoryVector(key)
|
||||
.build()
|
||||
);
|
||||
private synchronized void initSocket(Socket socket) throws IOException {
|
||||
if (!socketInitialized) {
|
||||
if (!socket.isConnected()) {
|
||||
LOG.debug("Trying to connect to node " + node);
|
||||
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
|
||||
}
|
||||
socket.setSoTimeout(READ_TIMEOUT);
|
||||
in = socket.getInputStream();
|
||||
out = socket.getOutputStream();
|
||||
if (!socket.isConnected()) {
|
||||
LOG.debug("Trying to connect to node " + node);
|
||||
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
|
||||
}
|
||||
socket.setSoTimeout(READ_TIMEOUT);
|
||||
in = socket.getInputStream();
|
||||
out = socket.getOutputStream();
|
||||
socketInitialized = true;
|
||||
}
|
||||
}
|
||||
|
||||
public ReaderRunnable getReader() {
|
||||
return reader;
|
||||
}
|
||||
|
||||
public WriterRunnable getWriter() {
|
||||
return writer;
|
||||
}
|
||||
|
||||
public enum Mode {SERVER, CLIENT}
|
||||
|
||||
public enum State {CONNECTING, ACTIVE, DISCONNECTED}
|
||||
|
||||
public class ReaderRunnable implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try (Socket socket = Connection.this.socket) {
|
||||
initSocket(socket);
|
||||
if (mode == CLIENT) {
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
}
|
||||
while (state != DISCONNECTED) {
|
||||
try {
|
||||
NetworkMessage msg = Factory.getNetworkMessage(version, in);
|
||||
if (msg == null)
|
||||
continue;
|
||||
switch (state) {
|
||||
case ACTIVE:
|
||||
receiveMessage(msg.getPayload());
|
||||
break;
|
||||
|
||||
default:
|
||||
switch (msg.getPayload().getCommand()) {
|
||||
case VERSION:
|
||||
Version payload = (Version) msg.getPayload();
|
||||
if (payload.getNonce() == ctx.getClientNonce()) {
|
||||
LOG.info("Tried to connect to self, disconnecting.");
|
||||
disconnect();
|
||||
} else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) {
|
||||
version = payload.getVersion();
|
||||
streams = payload.getStreams();
|
||||
send(new VerAck());
|
||||
switch (mode) {
|
||||
case SERVER:
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
break;
|
||||
case CLIENT:
|
||||
activateConnection();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
|
||||
disconnect();
|
||||
}
|
||||
break;
|
||||
case VERACK:
|
||||
switch (mode) {
|
||||
case SERVER:
|
||||
activateConnection();
|
||||
break;
|
||||
case CLIENT:
|
||||
// NO OP
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new NodeException("Command 'version' or 'verack' expected, but was '"
|
||||
+ msg.getPayload().getCommand() + "'");
|
||||
}
|
||||
}
|
||||
if (socket.isClosed() || syncFinished(msg)) disconnect();
|
||||
} catch (SocketTimeoutException ignore) {
|
||||
if (state == ACTIVE) {
|
||||
if (syncFinished(null)) disconnect();
|
||||
}
|
||||
}
|
||||
Thread.yield();
|
||||
}
|
||||
} catch (IOException | NodeException e) {
|
||||
disconnect();
|
||||
LOG.debug("Reader disconnected from node " + node + ": " + e.getMessage());
|
||||
} catch (RuntimeException e) {
|
||||
LOG.debug("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e);
|
||||
disconnect();
|
||||
} finally {
|
||||
try {
|
||||
socket.close();
|
||||
} catch (Exception e) {
|
||||
LOG.debug(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class WriterRunnable implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try (Socket socket = Connection.this.socket) {
|
||||
initSocket(socket);
|
||||
while (state != DISCONNECTED) {
|
||||
if (sendingQueue.size() > 0) {
|
||||
LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node);
|
||||
send(sendingQueue.poll());
|
||||
} else {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
LOG.debug("Writer disconnected from node " + node + ": " + e.getMessage());
|
||||
disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -49,14 +49,17 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
|
||||
public final static int NETWORK_MAGIC_NUMBER = 8;
|
||||
private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class);
|
||||
private final List<Connection> connections = new LinkedList<>();
|
||||
private final ExecutorService pool;
|
||||
private InternalContext ctx;
|
||||
private ServerSocket serverSocket;
|
||||
private ExecutorService pool;
|
||||
private Thread serverThread;
|
||||
private Thread connectionManager;
|
||||
private volatile boolean running;
|
||||
|
||||
private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>();
|
||||
|
||||
public DefaultNetworkHandler() {
|
||||
pool = Executors.newCachedThreadPool();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContext(InternalContext context) {
|
||||
this.ctx = context;
|
||||
@ -65,9 +68,12 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
|
||||
@Override
|
||||
public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) {
|
||||
try {
|
||||
Thread t = new Thread(Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds));
|
||||
t.start();
|
||||
return t;
|
||||
Connection connection = Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds);
|
||||
Thread tr = new Thread(connection.getReader());
|
||||
Thread tw = new Thread(connection.getWriter());
|
||||
tr.start();
|
||||
tw.start();
|
||||
return tr;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -78,14 +84,14 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
|
||||
if (listener == null) {
|
||||
throw new IllegalStateException("Listener must be set at start");
|
||||
}
|
||||
if (pool != null && !pool.isShutdown()) {
|
||||
if (running) {
|
||||
throw new IllegalStateException("Network already running - you need to stop first.");
|
||||
}
|
||||
try {
|
||||
pool = Executors.newCachedThreadPool();
|
||||
running = true;
|
||||
connections.clear();
|
||||
serverSocket = new ServerSocket(ctx.getPort());
|
||||
serverThread = new Thread(new Runnable() {
|
||||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!serverSocket.isClosed()) {
|
||||
@ -98,44 +104,46 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, "server");
|
||||
serverThread.start();
|
||||
connectionManager = new Thread(new Runnable() {
|
||||
});
|
||||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
int active = 0;
|
||||
synchronized (connections) {
|
||||
for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) {
|
||||
Connection c = iterator.next();
|
||||
if (c.getState() == DISCONNECTED) {
|
||||
// Remove the current element from the iterator and the list.
|
||||
iterator.remove();
|
||||
}
|
||||
if (c.getState() == ACTIVE) {
|
||||
active++;
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
int active = 0;
|
||||
synchronized (connections) {
|
||||
for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) {
|
||||
Connection c = iterator.next();
|
||||
if (c.getState() == DISCONNECTED) {
|
||||
// Remove the current element from the iterator and the list.
|
||||
iterator.remove();
|
||||
}
|
||||
if (c.getState() == ACTIVE) {
|
||||
active++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (active < NETWORK_MAGIC_NUMBER) {
|
||||
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
|
||||
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
|
||||
for (NetworkAddress address : addresses) {
|
||||
startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects));
|
||||
if (active < NETWORK_MAGIC_NUMBER) {
|
||||
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
|
||||
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
|
||||
for (NetworkAddress address : addresses) {
|
||||
startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects));
|
||||
}
|
||||
}
|
||||
Thread.sleep(30000);
|
||||
} catch (InterruptedException e) {
|
||||
running = false;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in connection manager. Ignored.", e);
|
||||
}
|
||||
Thread.sleep(30000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in connection manager. Ignored.", e);
|
||||
}
|
||||
} finally {
|
||||
LOG.debug("Connection manager shutting down.");
|
||||
running = false;
|
||||
}
|
||||
LOG.debug("Connection manager shutting down.");
|
||||
}
|
||||
}, "connection-manager");
|
||||
connectionManager.start();
|
||||
});
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -143,18 +151,17 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return connectionManager != null && connectionManager.isAlive();
|
||||
return running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
connectionManager.interrupt();
|
||||
running = false;
|
||||
try {
|
||||
serverSocket.close();
|
||||
} catch (IOException e) {
|
||||
LOG.debug(e.getMessage(), e);
|
||||
}
|
||||
pool.shutdown();
|
||||
synchronized (connections) {
|
||||
for (Connection c : connections) {
|
||||
c.disconnect();
|
||||
@ -170,7 +177,8 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
|
||||
}
|
||||
connections.add(c);
|
||||
}
|
||||
pool.execute(c);
|
||||
pool.execute(c.getReader());
|
||||
pool.execute(c.getWriter());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -222,8 +230,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
|
||||
i++;
|
||||
}
|
||||
return new Property("network", null,
|
||||
new Property("connectionManager",
|
||||
connectionManager != null && connectionManager.isAlive() ? "running" : "stopped"),
|
||||
new Property("connectionManager", running ? "running" : "stopped"),
|
||||
new Property("connections", null, streamProperties)
|
||||
);
|
||||
}
|
||||
|
Reference in New Issue
Block a user