diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index 2f6f9fd..d93c3c8 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -22,7 +22,10 @@ import ch.dissem.bitmessage.entity.Plaintext; import ch.dissem.bitmessage.entity.payload.Pubkey; import ch.dissem.bitmessage.networking.DefaultNetworkHandler; import ch.dissem.bitmessage.ports.MemoryNodeRegistry; -import ch.dissem.bitmessage.repository.*; +import ch.dissem.bitmessage.repository.JdbcAddressRepository; +import ch.dissem.bitmessage.repository.JdbcConfig; +import ch.dissem.bitmessage.repository.JdbcInventory; +import ch.dissem.bitmessage.repository.JdbcMessageRepository; import ch.dissem.bitmessage.security.bc.BouncySecurity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,18 +53,19 @@ public class Application { .networkHandler(new DefaultNetworkHandler()) .security(new BouncySecurity()) .port(48444) + .listener(new BitmessageContext.Listener() { + @Override + public void receive(Plaintext plaintext) { + try { + System.out.println(new String(plaintext.getMessage(), "UTF-8")); + } catch (UnsupportedEncodingException e) { + LOG.error(e.getMessage(), e); + } + } + }) .build(); - ctx.startup(new BitmessageContext.Listener() { - @Override - public void receive(Plaintext plaintext) { - try { - System.out.println(new String(plaintext.getMessage(), "UTF-8")); - } catch (UnsupportedEncodingException e) { - LOG.error(e.getMessage(), e); - } - } - }); + ctx.startup(); scanner = new Scanner(System.in); diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index 2cd851e..0b3ad1d 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -31,6 +31,7 @@ import ch.dissem.bitmessage.utils.Property; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,10 +63,14 @@ public class BitmessageContext { private final InternalContext ctx; - private Listener listener; + private final Listener listener; + private final NetworkHandler.MessageListener networkListener; private BitmessageContext(Builder builder) { ctx = new InternalContext(builder); + listener = builder.listener; + networkListener = new DefaultMessageListener(ctx, listener); + // As this thread is used for parts that do POW, which itself uses parallel threads, only // one should be executed at any time. pool = Executors.newFixedThreadPool(1); @@ -179,15 +184,22 @@ public class BitmessageContext { ); } - public void startup(Listener listener) { - this.listener = listener; - ctx.getNetworkHandler().start(new DefaultMessageListener(ctx, listener)); + public void startup() { + ctx.getNetworkHandler().start(networkListener); } public void shutdown() { ctx.getNetworkHandler().stop(); } + public void synchronize(InetAddress host, int port, long timeoutInSeconds) { + ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds); + } + + public void cleanup() { + ctx.getInventory().cleanup(); + } + public boolean isRunning() { return ctx.getNetworkHandler().isRunning(); } @@ -268,6 +280,7 @@ public class BitmessageContext { ProofOfWorkEngine proofOfWorkEngine; Security security; MessageCallback messageCallback; + Listener listener; public Builder() { } @@ -317,6 +330,11 @@ public class BitmessageContext { return this; } + public Builder listener(Listener listener) { + this.listener = listener; + return this; + } + public BitmessageContext build() { nonNull("inventory", inventory); nonNull("nodeRegistry", nodeRegistry); diff --git a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java index 403e9c4..6d969bd 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java @@ -82,7 +82,7 @@ public class InternalContext { streams.add(1L); } - init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine); + init(security, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine); for (BitmessageAddress identity : addressRepository.getIdentities()) { streams.add(identity.getStream()); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java index 934a4d0..a10d50f 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/Inventory.java @@ -36,5 +36,7 @@ public interface Inventory { void storeObject(ObjectMessage object); + boolean contains(ObjectMessage object); + void cleanup(); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java b/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java index 04fa2f9..9c97b87 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/MemoryNodeRegistry.java @@ -34,38 +34,43 @@ import static java.util.Collections.newSetFromMap; public class MemoryNodeRegistry implements NodeRegistry { private static final Logger LOG = LoggerFactory.getLogger(MemoryNodeRegistry.class); - private final Map> stableNodes = new HashMap<>(); + private final Map> stableNodes = new ConcurrentHashMap<>(); private final Map> knownNodes = new ConcurrentHashMap<>(); public MemoryNodeRegistry() { - try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { - Scanner scanner = new Scanner(in); - long stream = 0; - Set streamSet = null; - while (scanner.hasNext()) { - try { - String line = scanner.nextLine().trim(); - if (line.startsWith("#") || line.isEmpty()) { - // Ignore - continue; - } - if (line.startsWith("[stream")) { - stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); - streamSet = new HashSet<>(); - stableNodes.put(stream, streamSet); - } else if (streamSet != null) { - int portIndex = line.lastIndexOf(':'); - InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex)); - int port = Integer.valueOf(line.substring(portIndex + 1)); - streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); + new Thread(new Runnable() { + @Override + public void run() { + try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { + Scanner scanner = new Scanner(in); + long stream = 0; + Set streamSet = null; + while (scanner.hasNext()) { + try { + String line = scanner.nextLine().trim(); + if (line.startsWith("#") || line.isEmpty()) { + // Ignore + continue; + } + if (line.startsWith("[stream")) { + stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); + streamSet = new HashSet<>(); + stableNodes.put(stream, streamSet); + } else if (streamSet != null) { + int portIndex = line.lastIndexOf(':'); + InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex)); + int port = Integer.valueOf(line.substring(portIndex + 1)); + streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + } } } catch (IOException e) { - LOG.warn(e.getMessage(), e); + throw new RuntimeException(e); } } - } catch (IOException e) { - throw new RuntimeException(e); - } + }, "node registry initializer").start(); } @Override diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java b/domain/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java index 6d8970b..af7b2bc 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/MessageRepository.java @@ -28,6 +28,8 @@ public interface MessageRepository { List