diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index d3a1329..9f4d9a3 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -33,8 +33,7 @@ import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.util.Arrays; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import static ch.dissem.bitmessage.entity.Plaintext.Status.*; import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST; @@ -201,13 +200,15 @@ public class BitmessageContext { * @param wait waits for the synchronization thread to finish */ public void synchronize(InetAddress host, int port, long timeoutInSeconds, boolean wait) { - Thread t = ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds); + Future future = ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds); if (wait) { try { - t.join(); + future.get(); } catch (InterruptedException e) { LOG.info("Thread was interrupted. Trying to shut down synchronization and returning."); - t.interrupt(); + future.cancel(true); + } catch (CancellationException | ExecutionException e) { + LOG.debug(e.getMessage(), e); } } } @@ -241,7 +242,7 @@ public class BitmessageContext { ctx.getAddressRepo().save(address); break; } else { - LOG.debug("Found pubkey for " + address + " but signature is invalid"); + LOG.info("Found pubkey for " + address + " but signature is invalid"); } } } else { diff --git a/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java b/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java index 105cc96..e069704 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java +++ b/domain/src/main/java/ch/dissem/bitmessage/DefaultMessageListener.java @@ -71,7 +71,8 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { protected void receive(ObjectMessage object, GetPubkey getPubkey) { BitmessageAddress identity = ctx.getAddressRepo().findIdentity(getPubkey.getRipeTag()); if (identity != null && identity.getPrivateKey() != null) { - LOG.debug("Got pubkey request for identity " + identity); + LOG.info("Got pubkey request for identity " + identity); + // FIXME: only send pubkey if it wasn't sent in the last 28 days ctx.sendPubkey(identity, object.getStream()); } } @@ -90,10 +91,10 @@ class DefaultMessageListener implements NetworkHandler.MessageListener { } if (address != null) { address.setPubkey(pubkey); - LOG.debug("Got pubkey for contact " + address); + LOG.info("Got pubkey for contact " + address); ctx.getAddressRepo().save(address); List messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address); - LOG.debug("Sending " + messages.size() + " messages for contact " + address); + LOG.info("Sending " + messages.size() + " messages for contact " + address); for (Plaintext msg : messages) { msg.setStatus(DOING_PROOF_OF_WORK); ctx.getMessageRepository().save(msg); diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/CryptoBox.java b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/CryptoBox.java index 7f994d1..a870b32 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/payload/CryptoBox.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/payload/CryptoBox.java @@ -157,7 +157,7 @@ public class CryptoBox implements Streamable { } public Builder curveType(int curveType) { - if (curveType != 0x2CA) LOG.debug("Unexpected curve type " + curveType); + if (curveType != 0x2CA) LOG.trace("Unexpected curve type " + curveType); this.curveType = curveType; return this; } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/AbstractSecurity.java b/domain/src/main/java/ch/dissem/bitmessage/ports/AbstractSecurity.java index 1c4b4dc..bd55180 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/AbstractSecurity.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/AbstractSecurity.java @@ -123,7 +123,6 @@ public abstract class AbstractSecurity implements Security, InternalContext.Cont private byte[] getProofOfWorkTarget(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException { BigInteger TTL = BigInteger.valueOf(object.getExpiresTime() - UnixTime.now()); - LOG.debug("TTL: " + TTL + "s"); BigInteger numerator = TWO.pow(64); BigInteger powLength = BigInteger.valueOf(object.getPayloadBytesWithoutNonce().length + extraBytes); BigInteger denominator = BigInteger.valueOf(nonceTrialsPerByte).multiply(powLength.add(powLength.multiply(TTL).divide(BigInteger.valueOf(2).pow(16)))); diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java b/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java index 9c97b87..8d43423 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java @@ -37,40 +37,42 @@ public class MemoryNodeRegistry implements NodeRegistry { private final Map<Long, Set<NetworkAddress>> stableNodes = new ConcurrentHashMap<>(); private final Map<Long, Set<NetworkAddress>> knownNodes = new ConcurrentHashMap<>(); - public MemoryNodeRegistry() { - new Thread(new Runnable() { - @Override - public void run() { - try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { - Scanner scanner = new Scanner(in); - long stream = 0; - Set<NetworkAddress> streamSet = null; - while (scanner.hasNext()) { - try { - String line = scanner.nextLine().trim(); - if (line.startsWith("#") || line.isEmpty()) { - // Ignore - continue; - } - if (line.startsWith("[stream")) { - stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); - streamSet = new HashSet<>(); - stableNodes.put(stream, streamSet); - } else if (streamSet != null) { - int portIndex = line.lastIndexOf(':'); - InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex)); - int port = Integer.valueOf(line.substring(portIndex + 1)); - streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); - } - } catch (IOException e) { - LOG.warn(e.getMessage(), e); + private void loadStableNodes() { + try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { + Scanner scanner = new Scanner(in); + long stream = 0; + Set<NetworkAddress> streamSet = null; + while (scanner.hasNext()) { + try { + String line = scanner.nextLine().trim(); + if (line.startsWith("#") || line.isEmpty()) { + // Ignore + continue; + } + if (line.startsWith("[stream")) { + stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); + streamSet = new HashSet<>(); + stableNodes.put(stream, streamSet); + } else if (streamSet != null) { + int portIndex = line.lastIndexOf(':'); + InetAddress[] inetAddresses = InetAddress.getAllByName(line.substring(0, portIndex)); + int port = Integer.valueOf(line.substring(portIndex + 1)); + for (InetAddress inetAddress : inetAddresses) { + streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); } } } catch (IOException e) { - throw new RuntimeException(e); + LOG.warn(e.getMessage(), e); } } - }, "node registry initializer").start(); + if (LOG.isDebugEnabled()) { + for (Map.Entry<Long, Set<NetworkAddress>> e : stableNodes.entrySet()) { + LOG.debug("Stream " + e.getKey() + ": loaded " + e.getValue().size() + " bootstrap nodes."); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override @@ -86,9 +88,16 @@ public class MemoryNodeRegistry implements NodeRegistry { known.remove(node); } } - } else if (stableNodes.containsKey(stream)) { - // To reduce load on stable nodes, only return one - result.add(selectRandom(stableNodes.get(stream))); + } else { + Set<NetworkAddress> nodes = stableNodes.get(stream); + if (nodes == null || nodes.isEmpty()) { + loadStableNodes(); + nodes = stableNodes.get(stream); + } + if (nodes != null && !nodes.isEmpty()) { + // To reduce load on stable nodes, only return one + result.add(selectRandom(nodes)); + } } } return selectRandom(limit, result); diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index 2c79b1f..e2fd170 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Property; import java.io.IOException; import java.net.InetAddress; +import java.util.concurrent.Future; /** * Handles incoming messages @@ -33,7 +34,7 @@ public interface NetworkHandler { * An implementation should disconnect if either the timeout is reached or the returned thread is interrupted. * </p> */ - Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds); + Future<?> synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds); /** * Start a full network node, accepting incoming connections and relaying objects. diff --git a/domain/src/main/resources/nodes.txt b/domain/src/main/resources/nodes.txt index e0a334e..9466a85 100644 --- a/domain/src/main/resources/nodes.txt +++ b/domain/src/main/resources/nodes.txt @@ -1,18 +1,8 @@ [stream 1] -5.45.99.75:8444 -75.167.159.54:8444 -95.165.168.168:8444 -85.180.139.241:8444 -178.62.12.187:8448 - -[2604:2000:1380:9f:82e:148b:2746:d0c7]:8080 -158.222.211.81:8080 -24.188.198.204:8111 -109.147.204.113:1195 -178.11.46.221:8444 - dissem.ch:8444 +bootstrap8080.bitmessage.org:8080 +bootstrap8444.bitmessage.org:8444 [stream 2] # none yet \ No newline at end of file diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index b2c2e40..8ed2fb4 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -235,10 +235,9 @@ public class Connection { ObjectMessage objectMessage = (ObjectMessage) messagePayload; try { if (ctx.getInventory().contains(objectMessage)) { - LOG.debug("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); + LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); break; } - LOG.debug("Received object " + objectMessage.getInventoryVector()); security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); listener.receive(objectMessage); ctx.getInventory().storeObject(objectMessage); @@ -294,7 +293,6 @@ public class Connection { } public void offer(InventoryVector iv) { - LOG.debug("Offering " + iv + " to node " + node.toString()); sendingQueue.offer(new Inv.Builder() .addInventoryVector(iv) .build()); @@ -321,14 +319,7 @@ public class Connection { private synchronized void initSocket(Socket socket) throws IOException { if (!socketInitialized) { if (!socket.isConnected()) { - LOG.debug("Trying to connect to node " + node); - socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); - } - socket.setSoTimeout(READ_TIMEOUT); - in = socket.getInputStream(); - out = socket.getOutputStream(); - if (!socket.isConnected()) { - LOG.debug("Trying to connect to node " + node); + LOG.trace("Trying to connect to node " + node); socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); } socket.setSoTimeout(READ_TIMEOUT); @@ -359,6 +350,7 @@ public class Connection { send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); } while (state != DISCONNECTED) { + Thread.sleep(100); try { NetworkMessage msg = Factory.getNetworkMessage(version, in); if (msg == null) @@ -413,15 +405,13 @@ public class Connection { if (syncFinished(null)) disconnect(); } } - Thread.yield(); } - } catch (IOException | NodeException e) { - disconnect(); - LOG.debug("Reader disconnected from node " + node + ": " + e.getMessage()); + } catch (InterruptedException | IOException | NodeException e) { + LOG.trace("Reader disconnected from node " + node + ": " + e.getMessage()); } catch (RuntimeException e) { - LOG.debug("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e); - disconnect(); + LOG.trace("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e); } finally { + disconnect(); try { socket.close(); } catch (Exception e) { @@ -438,14 +428,13 @@ public class Connection { initSocket(socket); while (state != DISCONNECTED) { if (sendingQueue.size() > 0) { - LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node); send(sendingQueue.poll()); } else { Thread.sleep(100); } } } catch (IOException | InterruptedException e) { - LOG.debug("Writer disconnected from node " + node + ": " + e.getMessage()); + LOG.trace("Writer disconnected from node " + node + ": " + e.getMessage()); disconnect(); } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index 7b49978..3944378 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -32,10 +32,7 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; @@ -58,7 +55,14 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(); public DefaultNetworkHandler() { - pool = Executors.newCachedThreadPool(); + pool = Executors.newCachedThreadPool(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setPriority(Thread.MIN_PRIORITY); + return thread; + } + }); } @Override @@ -67,14 +71,12 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { } @Override - public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) { + public Future<?> synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) { try { Connection connection = Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds); - Thread tr = new Thread(connection.getReader()); - Thread tw = new Thread(connection.getWriter()); - tr.start(); - tw.start(); - return tr; + Future<?> reader = pool.submit(connection.getReader()); + pool.execute(connection.getWriter()); + return reader; } catch (IOException e) { throw new RuntimeException(e); } @@ -143,8 +145,10 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { for (NetworkAddress address : addresses) { startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects)); } + Thread.sleep(10000); + } else { + Thread.sleep(30000); } - Thread.sleep(30000); } catch (InterruptedException e) { running = false; } catch (Exception e) { @@ -204,7 +208,6 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { } } } - LOG.debug(target.size() + " connections available to offer " + iv); List<Connection> randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target); for (Connection connection : randomSubset) { connection.offer(iv); diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java index 74b3ff1..9fb2ea5 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkHandlerTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.mockito.Mockito; import java.net.InetAddress; +import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -116,10 +117,10 @@ public class NetworkHandlerTest { "V1Msg.payload" ); - Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, + Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, mock(NetworkHandler.MessageListener.class), 10); - t.join(); + future.get(); assertInventorySize(3, nodeInventory); assertInventorySize(3, peerInventory); } @@ -133,10 +134,10 @@ public class NetworkHandlerTest { nodeInventory.init(); - Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, + Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, mock(NetworkHandler.MessageListener.class), 10); - t.join(); + future.get(); assertInventorySize(2, nodeInventory); assertInventorySize(2, peerInventory); } @@ -149,10 +150,10 @@ public class NetworkHandlerTest { "V1Msg.payload" ); - Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, + Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, mock(NetworkHandler.MessageListener.class), 10); - t.join(); + future.get(); assertInventorySize(1, nodeInventory); assertInventorySize(1, peerInventory); }