Merge branch 'feature/threaded-connections' into develop

This commit is contained in:
Christian Basler 2015-10-14 20:38:37 +02:00
commit ddaa52f416
10 changed files with 248 additions and 182 deletions

View File

@ -14,6 +14,8 @@ uploadArchives {
} }
} }
sourceCompatibility = 1.8
task fatCapsule(type: FatCapsule) { task fatCapsule(type: FatCapsule) {
applicationClass 'ch.dissem.bitmessage.demo.Main' applicationClass 'ch.dissem.bitmessage.demo.Main'
} }
@ -26,6 +28,6 @@ dependencies {
compile project(':wif') compile project(':wif')
compile 'org.slf4j:slf4j-simple:1.7.12' compile 'org.slf4j:slf4j-simple:1.7.12'
compile 'args4j:args4j:2.32' compile 'args4j:args4j:2.32'
compile 'com.h2database:h2:1.4.187' compile 'com.h2database:h2:1.4.190'
testCompile 'junit:junit:4.11' testCompile 'junit:junit:4.11'
} }

View File

@ -109,6 +109,7 @@ public class Application {
} }
} while (!"e".equals(command)); } while (!"e".equals(command));
LOG.info("Shutting down client"); LOG.info("Shutting down client");
ctx.cleanup();
ctx.shutdown(); ctx.shutdown();
} }

View File

@ -23,7 +23,6 @@ import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.payload.*;
import ch.dissem.bitmessage.entity.valueobject.PrivateKey; import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.ports.Security;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -155,7 +154,7 @@ public class Factory {
} }
} }
// fallback: just store the message - we don't really care what it is // fallback: just store the message - we don't really care what it is
// LOG.info("Unexpected object type: " + objectType); LOG.trace("Unexpected object type: " + objectType);
return GenericPayload.read(version, stream, streamNumber, length); return GenericPayload.read(version, stream, streamNumber, length);
} }

View File

@ -26,17 +26,32 @@ import java.util.List;
* The Inventory stores and retrieves objects, cleans up outdated objects and can tell which objects are still missing. * The Inventory stores and retrieves objects, cleans up outdated objects and can tell which objects are still missing.
*/ */
public interface Inventory { public interface Inventory {
/**
* Returns the IVs of all valid objects we have for the given streams
*/
List<InventoryVector> getInventory(long... streams); List<InventoryVector> getInventory(long... streams);
/**
* Returns the IVs of all objects in the offer that we don't have already. Implementations are allowed to
* ignore the streams parameter, but it must be set when calling this method.
*/
List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams); List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams);
ObjectMessage getObject(InventoryVector vector); ObjectMessage getObject(InventoryVector vector);
/**
* This method is mainly used to search for public keys to newly added addresses or broadcasts from new
* subscriptions.
*/
List<ObjectMessage> getObjects(long stream, long version, ObjectType... types); List<ObjectMessage> getObjects(long stream, long version, ObjectType... types);
void storeObject(ObjectMessage object); void storeObject(ObjectMessage object);
boolean contains(ObjectMessage object); boolean contains(ObjectMessage object);
/**
* Deletes all objects that expired 5 minutes ago or earlier
* (so we don't accidentally request objects we just deleted)
*/
void cleanup(); void cleanup();
} }

View File

@ -43,8 +43,6 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine {
for (int i = 0; i < cores; i++) { for (int i = 0; i < cores; i++) {
Worker w = new Worker(workers, (byte) cores, i, initialHash, target); Worker w = new Worker(workers, (byte) cores, i, initialHash, target);
workers.add(w); workers.add(w);
}
for (Worker w : workers) {
w.start(); w.start();
} }
for (Worker w : workers) { for (Worker w : workers) {

View File

@ -12,7 +12,6 @@
109.147.204.113:1195 109.147.204.113:1195
178.11.46.221:8444 178.11.46.221:8444
# Add named nodes at the end, as resolving them might take time
dissem.ch:8444 dissem.ch:8444
[stream 2] [stream 2]

View File

@ -49,7 +49,7 @@ import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/** /**
* A connection to a specific node * A connection to a specific node
*/ */
public class Connection implements Runnable { public class Connection {
public static final int READ_TIMEOUT = 2000; public static final int READ_TIMEOUT = 2000;
private final static Logger LOG = LoggerFactory.getLogger(Connection.class); private final static Logger LOG = LoggerFactory.getLogger(Connection.class);
private static final int CONNECT_TIMEOUT = 5000; private static final int CONNECT_TIMEOUT = 5000;
@ -63,13 +63,16 @@ public class Connection implements Runnable {
private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>(); private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
private final Map<InventoryVector, Long> requestedObjects; private final Map<InventoryVector, Long> requestedObjects;
private final long syncTimeout; private final long syncTimeout;
private final ReaderRunnable reader = new ReaderRunnable();
private final WriterRunnable writer = new WriterRunnable();
private State state; private volatile State state;
private InputStream in; private InputStream in;
private OutputStream out; private OutputStream out;
private int version; private int version;
private long[] streams; private long[] streams;
private int readTimeoutCounter; private int readTimeoutCounter;
private boolean socketInitialized;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
ConcurrentMap<InventoryVector, Long> requestedObjectsMap) throws IOException { ConcurrentMap<InventoryVector, Long> requestedObjectsMap) throws IOException {
@ -118,86 +121,6 @@ public class Connection implements Runnable {
return node; return node;
} }
@Override
public void run() {
try (Socket socket = this.socket) {
if (!socket.isConnected()) {
LOG.debug("Trying to connect to node " + node);
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
}
socket.setSoTimeout(READ_TIMEOUT);
this.in = socket.getInputStream();
this.out = socket.getOutputStream();
if (mode == CLIENT) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
}
while (state != DISCONNECTED) {
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
continue;
switch (state) {
case ACTIVE:
receiveMessage(msg.getPayload());
sendQueue();
break;
default:
switch (msg.getPayload().getCommand()) {
case VERSION:
Version payload = (Version) msg.getPayload();
if (payload.getNonce() == ctx.getClientNonce()) {
LOG.info("Tried to connect to self, disconnecting.");
disconnect();
} else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) {
this.version = payload.getVersion();
this.streams = payload.getStreams();
send(new VerAck());
switch (mode) {
case SERVER:
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
break;
case CLIENT:
activateConnection();
break;
}
} else {
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
disconnect();
}
break;
case VERACK:
switch (mode) {
case SERVER:
activateConnection();
break;
case CLIENT:
// NO OP
break;
}
break;
default:
throw new NodeException("Command 'version' or 'verack' expected, but was '"
+ msg.getPayload().getCommand() + "'");
}
}
if (socket.isClosed() || syncFinished(msg)) disconnect();
} catch (SocketTimeoutException ignore) {
if (state == ACTIVE) {
sendQueue();
if (syncFinished(null)) disconnect();
}
}
}
} catch (IOException | NodeException e) {
disconnect();
LOG.debug("Disconnected from node " + node + ": " + e.getMessage());
} catch (RuntimeException e) {
disconnect();
throw e;
}
}
@SuppressWarnings("RedundantIfStatement") @SuppressWarnings("RedundantIfStatement")
private boolean syncFinished(NetworkMessage msg) { private boolean syncFinished(NetworkMessage msg) {
if (syncTimeout == 0 || state != ACTIVE) { if (syncTimeout == 0 || state != ACTIVE) {
@ -229,15 +152,6 @@ public class Connection implements Runnable {
ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node)); ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
} }
private void sendQueue() {
if (sendingQueue.size() > 0) {
LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node);
}
for (MessagePayload msg = sendingQueue.poll(); msg != null; msg = sendingQueue.poll()) {
send(msg);
}
}
private void cleanupIvCache() { private void cleanupIvCache() {
Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE);
for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) { for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) {
@ -310,10 +224,12 @@ public class Connection implements Runnable {
case OBJECT: case OBJECT:
ObjectMessage objectMessage = (ObjectMessage) messagePayload; ObjectMessage objectMessage = (ObjectMessage) messagePayload;
try { try {
if (ctx.getInventory().contains(objectMessage)) {
LOG.debug("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
break;
}
LOG.debug("Received object " + objectMessage.getInventoryVector()); LOG.debug("Received object " + objectMessage.getInventoryVector());
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
if (ctx.getInventory().contains(objectMessage))
break;
listener.receive(objectMessage); listener.receive(objectMessage);
ctx.getInventory().storeObject(objectMessage); ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network: // offer object to some random nodes so it gets distributed throughout the network:
@ -392,14 +308,136 @@ public class Connection implements Runnable {
return Objects.hash(node); return Objects.hash(node);
} }
public void request(InventoryVector key) { private synchronized void initSocket(Socket socket) throws IOException {
sendingQueue.offer(new GetData.Builder() if (!socketInitialized) {
.addInventoryVector(key) if (!socket.isConnected()) {
.build() LOG.debug("Trying to connect to node " + node);
); socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
}
socket.setSoTimeout(READ_TIMEOUT);
in = socket.getInputStream();
out = socket.getOutputStream();
if (!socket.isConnected()) {
LOG.debug("Trying to connect to node " + node);
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
}
socket.setSoTimeout(READ_TIMEOUT);
in = socket.getInputStream();
out = socket.getOutputStream();
socketInitialized = true;
}
}
public ReaderRunnable getReader() {
return reader;
}
public WriterRunnable getWriter() {
return writer;
} }
public enum Mode {SERVER, CLIENT} public enum Mode {SERVER, CLIENT}
public enum State {CONNECTING, ACTIVE, DISCONNECTED} public enum State {CONNECTING, ACTIVE, DISCONNECTED}
public class ReaderRunnable implements Runnable {
@Override
public void run() {
try (Socket socket = Connection.this.socket) {
initSocket(socket);
if (mode == CLIENT) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
}
while (state != DISCONNECTED) {
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
continue;
switch (state) {
case ACTIVE:
receiveMessage(msg.getPayload());
break;
default:
switch (msg.getPayload().getCommand()) {
case VERSION:
Version payload = (Version) msg.getPayload();
if (payload.getNonce() == ctx.getClientNonce()) {
LOG.info("Tried to connect to self, disconnecting.");
disconnect();
} else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) {
version = payload.getVersion();
streams = payload.getStreams();
send(new VerAck());
switch (mode) {
case SERVER:
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
break;
case CLIENT:
activateConnection();
break;
}
} else {
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
disconnect();
}
break;
case VERACK:
switch (mode) {
case SERVER:
activateConnection();
break;
case CLIENT:
// NO OP
break;
}
break;
default:
throw new NodeException("Command 'version' or 'verack' expected, but was '"
+ msg.getPayload().getCommand() + "'");
}
}
if (socket.isClosed() || syncFinished(msg)) disconnect();
} catch (SocketTimeoutException ignore) {
if (state == ACTIVE) {
if (syncFinished(null)) disconnect();
}
}
Thread.yield();
}
} catch (IOException | NodeException e) {
disconnect();
LOG.debug("Reader disconnected from node " + node + ": " + e.getMessage());
} catch (RuntimeException e) {
LOG.debug("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e);
disconnect();
} finally {
try {
socket.close();
} catch (Exception e) {
LOG.debug(e.getMessage(), e);
}
}
}
}
public class WriterRunnable implements Runnable {
@Override
public void run() {
try (Socket socket = Connection.this.socket) {
initSocket(socket);
while (state != DISCONNECTED) {
if (sendingQueue.size() > 0) {
LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node);
send(sendingQueue.poll());
} else {
Thread.sleep(100);
}
}
} catch (IOException | InterruptedException e) {
LOG.debug("Writer disconnected from node " + node + ": " + e.getMessage());
disconnect();
}
}
}
} }

View File

@ -48,12 +48,11 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc;
public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
public final static int NETWORK_MAGIC_NUMBER = 8; public final static int NETWORK_MAGIC_NUMBER = 8;
private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class); private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class);
private final ExecutorService pool;
private final List<Connection> connections = new LinkedList<>(); private final List<Connection> connections = new LinkedList<>();
private final ExecutorService pool;
private InternalContext ctx; private InternalContext ctx;
private ServerSocket serverSocket; private ServerSocket serverSocket;
private Thread serverThread; private volatile boolean running;
private Thread connectionManager;
private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(); private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>();
@ -69,9 +68,12 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
@Override @Override
public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) { public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) {
try { try {
Thread t = new Thread(Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds)); Connection connection = Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds);
t.start(); Thread tr = new Thread(connection.getReader());
return t; Thread tw = new Thread(connection.getWriter());
tr.start();
tw.start();
return tr;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -82,9 +84,14 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
if (listener == null) { if (listener == null) {
throw new IllegalStateException("Listener must be set at start"); throw new IllegalStateException("Listener must be set at start");
} }
if (running) {
throw new IllegalStateException("Network already running - you need to stop first.");
}
try { try {
running = true;
connections.clear();
serverSocket = new ServerSocket(ctx.getPort()); serverSocket = new ServerSocket(ctx.getPort());
serverThread = new Thread(new Runnable() { pool.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
while (!serverSocket.isClosed()) { while (!serverSocket.isClosed()) {
@ -97,44 +104,46 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
} }
} }
}, "server"); });
serverThread.start(); pool.execute(new Runnable() {
connectionManager = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
while (!Thread.interrupted()) { try {
try { while (running) {
int active = 0; try {
synchronized (connections) { int active = 0;
for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) { synchronized (connections) {
Connection c = iterator.next(); for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) {
if (c.getState() == DISCONNECTED) { Connection c = iterator.next();
// Remove the current element from the iterator and the list. if (c.getState() == DISCONNECTED) {
iterator.remove(); // Remove the current element from the iterator and the list.
} iterator.remove();
if (c.getState() == ACTIVE) { }
active++; if (c.getState() == ACTIVE) {
active++;
}
} }
} }
} if (active < NETWORK_MAGIC_NUMBER) {
if (active < NETWORK_MAGIC_NUMBER) { List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses( NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); for (NetworkAddress address : addresses) {
for (NetworkAddress address : addresses) { startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects));
startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects)); }
} }
Thread.sleep(30000);
} catch (InterruptedException e) {
running = false;
} catch (Exception e) {
LOG.error("Error in connection manager. Ignored.", e);
} }
Thread.sleep(30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Error in connection manager. Ignored.", e);
} }
} finally {
LOG.debug("Connection manager shutting down.");
running = false;
} }
LOG.debug("Connection manager shutting down.");
} }
}, "connection-manager"); });
connectionManager.start();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -142,18 +151,17 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return connectionManager != null && connectionManager.isAlive(); return running;
} }
@Override @Override
public void stop() { public void stop() {
connectionManager.interrupt(); running = false;
try { try {
serverSocket.close(); serverSocket.close();
} catch (IOException e) { } catch (IOException e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);
} }
pool.shutdown();
synchronized (connections) { synchronized (connections) {
for (Connection c : connections) { for (Connection c : connections) {
c.disconnect(); c.disconnect();
@ -169,7 +177,8 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
connections.add(c); connections.add(c);
} }
pool.execute(c); pool.execute(c.getReader());
pool.execute(c.getWriter());
} }
@Override @Override
@ -221,8 +230,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
i++; i++;
} }
return new Property("network", null, return new Property("network", null,
new Property("connectionManager", new Property("connectionManager", running ? "running" : "stopped"),
connectionManager != null && connectionManager.isAlive() ? "running" : "stopped"),
new Property("connections", null, streamProperties) new Property("connections", null, streamProperties)
); );
} }

View File

@ -10,11 +10,13 @@ uploadArchives {
} }
} }
sourceCompatibility = 1.8
dependencies { dependencies {
compile project(':domain') compile project(':domain')
compile 'org.flywaydb:flyway-core:3.2.1' compile 'org.flywaydb:flyway-core:3.2.1'
testCompile 'junit:junit:4.11' testCompile 'junit:junit:4.12'
testCompile 'com.h2database:h2:1.4.187' testCompile 'com.h2database:h2:1.4.190'
testCompile 'org.mockito:mockito-core:1.10.19' testCompile 'org.mockito:mockito-core:1.10.19'
testCompile project(':security-bc') testCompile project(':security-bc')
} }

View File

@ -27,12 +27,17 @@ import org.slf4j.LoggerFactory;
import java.sql.*; import java.sql.*;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
import static ch.dissem.bitmessage.utils.UnixTime.now; import static ch.dissem.bitmessage.utils.UnixTime.now;
public class JdbcInventory extends JdbcHelper implements Inventory { public class JdbcInventory extends JdbcHelper implements Inventory {
private static final Logger LOG = LoggerFactory.getLogger(JdbcInventory.class); private static final Logger LOG = LoggerFactory.getLogger(JdbcInventory.class);
private final Map<Long, Map<InventoryVector, Long>> cache = new ConcurrentHashMap<>();
public JdbcInventory(JdbcConfig config) { public JdbcInventory(JdbcConfig config) {
super(config); super(config);
} }
@ -40,36 +45,43 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
@Override @Override
public List<InventoryVector> getInventory(long... streams) { public List<InventoryVector> getInventory(long... streams) {
List<InventoryVector> result = new LinkedList<>(); List<InventoryVector> result = new LinkedList<>();
try (Connection connection = config.getConnection()) { for (long stream : streams) {
Statement stmt = connection.createStatement(); getCache(stream).entrySet().stream()
ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE expires > " + now() + .filter(e -> e.getValue() > now())
" AND stream IN (" + join(streams) + ")"); .forEach(e -> result.add(e.getKey()));
while (rs.next()) {
result.add(new InventoryVector(rs.getBytes("hash")));
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
} }
return result; return result;
} }
private List<InventoryVector> getFullInventory(long... streams) { private Map<InventoryVector, Long> getCache(long stream) {
List<InventoryVector> result = new LinkedList<>(); Map<InventoryVector, Long> result = cache.get(stream);
try (Connection connection = config.getConnection()) { if (result == null) {
Statement stmt = connection.createStatement(); synchronized (cache) {
ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE stream IN (" + join(streams) + ")"); if (cache.get(stream) == null) {
while (rs.next()) { result = new ConcurrentHashMap<>();
result.add(new InventoryVector(rs.getBytes("hash"))); cache.put(stream, result);
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT hash, expires FROM Inventory WHERE expires > "
+ now(-5 * MINUTE) + " AND stream = " + stream);
while (rs.next()) {
result.put(new InventoryVector(rs.getBytes("hash")), rs.getLong("expires"));
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
} }
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
} }
return result; return result;
} }
@Override @Override
public List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams) { public List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams) {
offer.removeAll(getFullInventory(streams)); for (long stream : streams) {
getCache(stream).forEach((iv, t) -> offer.remove(iv));
}
return offer; return offer;
} }
@ -131,6 +143,7 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
ps.setLong(5, object.getType()); ps.setLong(5, object.getType());
ps.setLong(6, object.getVersion()); ps.setLong(6, object.getVersion());
ps.executeUpdate(); ps.executeUpdate();
getCache(object.getStream()).put(iv, object.getExpiresTime());
} catch (SQLException e) { } catch (SQLException e) {
LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e);
} catch (Exception e) { } catch (Exception e) {
@ -140,28 +153,19 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
@Override @Override
public boolean contains(ObjectMessage object) { public boolean contains(ObjectMessage object) {
try (Connection connection = config.getConnection()) { return getCache(object.getStream()).entrySet().stream()
Statement stmt = connection.createStatement(); .anyMatch(x -> x.getKey().equals(object.getInventoryVector()));
ResultSet rs = stmt.executeQuery("SELECT count(1) FROM Inventory WHERE hash = X'"
+ object.getInventoryVector() + "'");
if (rs.next()) {
return rs.getInt(1) > 0;
} else {
throw new RuntimeException("Couldn't query if inventory contains " + object.getInventoryVector());
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
} }
@Override @Override
public void cleanup() { public void cleanup() {
try (Connection connection = config.getConnection()) { try (Connection connection = config.getConnection()) {
// We delete only objects that expired 5 minutes ago or earlier, so we don't request objects we just deleted connection.createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + now(-5 * MINUTE));
connection.createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300));
} catch (SQLException e) { } catch (SQLException e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);
} }
for (Map<InventoryVector, Long> c : cache.values()) {
c.entrySet().removeIf(e -> e.getValue() < now(-5 * MINUTE));
}
} }
} }