Synchronisation API and related refactorings / improvements

-> lets you synchronize with the Bitmessage network without staying connected
This commit is contained in:
Christian Basler 2015-10-07 21:50:41 +02:00
parent c3fdee79ca
commit f9ff22bebe
13 changed files with 154 additions and 49 deletions

View File

@ -22,7 +22,10 @@ import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.payload.Pubkey; import ch.dissem.bitmessage.entity.payload.Pubkey;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler; import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.ports.MemoryNodeRegistry; import ch.dissem.bitmessage.ports.MemoryNodeRegistry;
import ch.dissem.bitmessage.repository.*; import ch.dissem.bitmessage.repository.JdbcAddressRepository;
import ch.dissem.bitmessage.repository.JdbcConfig;
import ch.dissem.bitmessage.repository.JdbcInventory;
import ch.dissem.bitmessage.repository.JdbcMessageRepository;
import ch.dissem.bitmessage.security.bc.BouncySecurity; import ch.dissem.bitmessage.security.bc.BouncySecurity;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,18 +53,19 @@ public class Application {
.networkHandler(new DefaultNetworkHandler()) .networkHandler(new DefaultNetworkHandler())
.security(new BouncySecurity()) .security(new BouncySecurity())
.port(48444) .port(48444)
.listener(new BitmessageContext.Listener() {
@Override
public void receive(Plaintext plaintext) {
try {
System.out.println(new String(plaintext.getMessage(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
LOG.error(e.getMessage(), e);
}
}
})
.build(); .build();
ctx.startup(new BitmessageContext.Listener() { ctx.startup();
@Override
public void receive(Plaintext plaintext) {
try {
System.out.println(new String(plaintext.getMessage(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
LOG.error(e.getMessage(), e);
}
}
});
scanner = new Scanner(System.in); scanner = new Scanner(System.in);

View File

@ -31,6 +31,7 @@ import ch.dissem.bitmessage.utils.Property;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -62,10 +63,14 @@ public class BitmessageContext {
private final InternalContext ctx; private final InternalContext ctx;
private Listener listener; private final Listener listener;
private final NetworkHandler.MessageListener networkListener;
private BitmessageContext(Builder builder) { private BitmessageContext(Builder builder) {
ctx = new InternalContext(builder); ctx = new InternalContext(builder);
listener = builder.listener;
networkListener = new DefaultMessageListener(ctx, listener);
// As this thread is used for parts that do POW, which itself uses parallel threads, only // As this thread is used for parts that do POW, which itself uses parallel threads, only
// one should be executed at any time. // one should be executed at any time.
pool = Executors.newFixedThreadPool(1); pool = Executors.newFixedThreadPool(1);
@ -179,15 +184,22 @@ public class BitmessageContext {
); );
} }
public void startup(Listener listener) { public void startup() {
this.listener = listener; ctx.getNetworkHandler().start(networkListener);
ctx.getNetworkHandler().start(new DefaultMessageListener(ctx, listener));
} }
public void shutdown() { public void shutdown() {
ctx.getNetworkHandler().stop(); ctx.getNetworkHandler().stop();
} }
public void synchronize(InetAddress host, int port, long timeoutInSeconds) {
ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds);
}
public void cleanup() {
ctx.getInventory().cleanup();
}
public boolean isRunning() { public boolean isRunning() {
return ctx.getNetworkHandler().isRunning(); return ctx.getNetworkHandler().isRunning();
} }
@ -268,6 +280,7 @@ public class BitmessageContext {
ProofOfWorkEngine proofOfWorkEngine; ProofOfWorkEngine proofOfWorkEngine;
Security security; Security security;
MessageCallback messageCallback; MessageCallback messageCallback;
Listener listener;
public Builder() { public Builder() {
} }
@ -317,6 +330,11 @@ public class BitmessageContext {
return this; return this;
} }
public Builder listener(Listener listener) {
this.listener = listener;
return this;
}
public BitmessageContext build() { public BitmessageContext build() {
nonNull("inventory", inventory); nonNull("inventory", inventory);
nonNull("nodeRegistry", nodeRegistry); nonNull("nodeRegistry", nodeRegistry);

View File

@ -82,7 +82,7 @@ public class InternalContext {
streams.add(1L); streams.add(1L);
} }
init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine); init(security, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine);
for (BitmessageAddress identity : addressRepository.getIdentities()) { for (BitmessageAddress identity : addressRepository.getIdentities()) {
streams.add(identity.getStream()); streams.add(identity.getStream());
} }

View File

@ -36,5 +36,7 @@ public interface Inventory {
void storeObject(ObjectMessage object); void storeObject(ObjectMessage object);
boolean contains(ObjectMessage object);
void cleanup(); void cleanup();
} }

View File

@ -34,38 +34,43 @@ import static java.util.Collections.newSetFromMap;
public class MemoryNodeRegistry implements NodeRegistry { public class MemoryNodeRegistry implements NodeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MemoryNodeRegistry.class); private static final Logger LOG = LoggerFactory.getLogger(MemoryNodeRegistry.class);
private final Map<Long, Set<NetworkAddress>> stableNodes = new HashMap<>(); private final Map<Long, Set<NetworkAddress>> stableNodes = new ConcurrentHashMap<>();
private final Map<Long, Set<NetworkAddress>> knownNodes = new ConcurrentHashMap<>(); private final Map<Long, Set<NetworkAddress>> knownNodes = new ConcurrentHashMap<>();
public MemoryNodeRegistry() { public MemoryNodeRegistry() {
try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { new Thread(new Runnable() {
Scanner scanner = new Scanner(in); @Override
long stream = 0; public void run() {
Set<NetworkAddress> streamSet = null; try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) {
while (scanner.hasNext()) { Scanner scanner = new Scanner(in);
try { long stream = 0;
String line = scanner.nextLine().trim(); Set<NetworkAddress> streamSet = null;
if (line.startsWith("#") || line.isEmpty()) { while (scanner.hasNext()) {
// Ignore try {
continue; String line = scanner.nextLine().trim();
} if (line.startsWith("#") || line.isEmpty()) {
if (line.startsWith("[stream")) { // Ignore
stream = Long.parseLong(line.substring(8, line.lastIndexOf(']'))); continue;
streamSet = new HashSet<>(); }
stableNodes.put(stream, streamSet); if (line.startsWith("[stream")) {
} else if (streamSet != null) { stream = Long.parseLong(line.substring(8, line.lastIndexOf(']')));
int portIndex = line.lastIndexOf(':'); streamSet = new HashSet<>();
InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex)); stableNodes.put(stream, streamSet);
int port = Integer.valueOf(line.substring(portIndex + 1)); } else if (streamSet != null) {
streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); int portIndex = line.lastIndexOf(':');
InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex));
int port = Integer.valueOf(line.substring(portIndex + 1));
streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build());
}
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); throw new RuntimeException(e);
} }
} }
} catch (IOException e) { }, "node registry initializer").start();
throw new RuntimeException(e);
}
} }
@Override @Override

View File

@ -28,6 +28,8 @@ public interface MessageRepository {
List<Label> getLabels(Label.Type... types); List<Label> getLabels(Label.Type... types);
int countUnread(Label label);
List<Plaintext> findMessages(Label label); List<Plaintext> findMessages(Label label);
List<Plaintext> findMessages(Status status); List<Plaintext> findMessages(Status status);

View File

@ -1,15 +1,19 @@
[stream 1] [stream 1]
[2604:2000:1380:9f:82e:148b:2746:d0c7]:8080
5.45.99.75:8444 5.45.99.75:8444
75.167.159.54:8444 75.167.159.54:8444
95.165.168.168:8444 95.165.168.168:8444
85.180.139.241:8444 85.180.139.241:8444
158.222.211.81:8080
178.62.12.187:8448 178.62.12.187:8448
[2604:2000:1380:9f:82e:148b:2746:d0c7]:8080
158.222.211.81:8080
24.188.198.204:8111 24.188.198.204:8111
109.147.204.113:1195 109.147.204.113:1195
178.11.46.221:8444 178.11.46.221:8444
# Add named nodes at the end, as resolving them might take time
dissem.ch:8444
[stream 2] [stream 2]
# none yet # none yet

View File

@ -310,6 +310,8 @@ public class Connection implements Runnable {
try { try {
LOG.debug("Received object " + objectMessage.getInventoryVector()); LOG.debug("Received object " + objectMessage.getInventoryVector());
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
if (ctx.getInventory().contains(objectMessage))
break;
listener.receive(objectMessage); listener.receive(objectMessage);
ctx.getInventory().storeObject(objectMessage); ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network: // offer object to some random nodes so it gets distributed throughout the network:

View File

@ -55,8 +55,9 @@ public class NetworkHandlerTest {
.nodeRegistry(new TestNodeRegistry()) .nodeRegistry(new TestNodeRegistry())
.networkHandler(new DefaultNetworkHandler()) .networkHandler(new DefaultNetworkHandler())
.security(new BouncySecurity()) .security(new BouncySecurity())
.listener(Mockito.mock(BitmessageContext.Listener.class))
.build(); .build();
peer.startup(Mockito.mock(BitmessageContext.Listener.class)); peer.startup();
nodeInventory = new TestInventory(); nodeInventory = new TestInventory();
networkHandler = new DefaultNetworkHandler(); networkHandler = new DefaultNetworkHandler();
@ -68,13 +69,14 @@ public class NetworkHandlerTest {
.nodeRegistry(new TestNodeRegistry(localhost)) .nodeRegistry(new TestNodeRegistry(localhost))
.networkHandler(networkHandler) .networkHandler(networkHandler)
.security(new BouncySecurity()) .security(new BouncySecurity())
.listener(Mockito.mock(BitmessageContext.Listener.class))
.build(); .build();
} }
@Test(timeout = 20_000) @Test(timeout = 20_000)
public void ensureNodesAreConnecting() { public void ensureNodesAreConnecting() {
try { try {
node.startup(Mockito.mock(BitmessageContext.Listener.class)); node.startup();
Property status; Property status;
do { do {
Thread.yield(); Thread.yield();

View File

@ -60,6 +60,11 @@ public class TestInventory implements Inventory {
inventory.put(object.getInventoryVector(), object); inventory.put(object.getInventoryVector(), object);
} }
@Override
public boolean contains(ObjectMessage object) {
return inventory.containsKey(object.getInventoryVector());
}
@Override @Override
public void cleanup() { public void cleanup() {

View File

@ -24,7 +24,6 @@ import ch.dissem.bitmessage.ports.Inventory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.sql.*; import java.sql.*;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -139,6 +138,23 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
} }
} }
@Override
public boolean contains(ObjectMessage object) {
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT count(1) FROM Inventory WHERE hash = X'"
+ object.getInventoryVector() + "'");
if (rs.next()) {
return rs.getInt(1) > 0;
} else {
throw new RuntimeException("Couldn't query if inventory contains " + object.getInventoryVector());
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override @Override
public void cleanup() { public void cleanup() {
try (Connection connection = config.getConnection()) { try (Connection connection = config.getConnection()) {

View File

@ -25,7 +25,6 @@ import ch.dissem.bitmessage.ports.MessageRepository;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.sql.*; import java.sql.*;
@ -86,6 +85,29 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
return result; return result;
} }
@Override
public int countUnread(Label label) {
String where;
if (label != null) {
where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ") AND ";
} else {
where = "";
}
where += "id IN (SELECT message_id FROM Message_Label WHERE label_id IN (" +
"SELECT id FROM Label WHERE type = '" + Label.Type.UNREAD.name() + "'))";
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT count(*) FROM Message WHERE " + where);
if (rs.next()) {
return rs.getInt(1);
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return 0;
}
@Override @Override
public List<Plaintext> findMessages(Label label) { public List<Plaintext> findMessages(Label label) {
return find("id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ")"); return find("id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ")");
@ -230,8 +252,20 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
@Override @Override
public void remove(Plaintext message) { public void remove(Plaintext message) {
try (Connection connection = config.getConnection()) { try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement(); try {
stmt.executeUpdate("DELETE FROM Message WHERE id = " + message.getId()); connection.setAutoCommit(false);
Statement stmt = connection.createStatement();
stmt.executeUpdate("DELETE FROM Message_Label WHERE message_id = " + message.getId());
stmt.executeUpdate("DELETE FROM Message WHERE id = " + message.getId());
connection.commit();
} catch (SQLException e) {
try {
connection.rollback();
} catch (SQLException e1) {
LOG.debug(e1.getMessage(), e);
}
LOG.error(e.getMessage(), e);
}
} catch (SQLException e) { } catch (SQLException e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
} }

View File

@ -110,6 +110,17 @@ public class JdbcInventoryTest extends TestBase {
assertNotNull(inventory.getObject(object.getInventoryVector())); assertNotNull(inventory.getObject(object.getInventoryVector()));
} }
@Test
public void testContains() {
ObjectMessage object = getObjectMessage(5, 0, getGetPubkey());
assertFalse(inventory.contains(object));
inventory.storeObject(object);
assertTrue(inventory.contains(object));
}
@Test @Test
public void testCleanup() throws Exception { public void testCleanup() throws Exception {
assertNotNull(inventory.getObject(inventoryVectorIgnore)); assertNotNull(inventory.getObject(inventoryVectorIgnore));