smarter network code, fixed various issues

- deciding on the stream at creation time is just silly - it should be done based on the identities (this part is TODO)
- changed NodeRegistry so it doesn't store nodes - this should help to connect faster
- inventory items shouldn't be advertised to nodes that are already aware of them (issue #13)
- objects shouldn't be requested more than once (issue #9)
This commit is contained in:
Christian Basler 2015-07-01 06:57:30 +02:00
parent c8960df2b3
commit 2c4d95af2f
11 changed files with 248 additions and 205 deletions

View File

@ -43,11 +43,10 @@ public class Application {
ctx = new BitmessageContext.Builder() ctx = new BitmessageContext.Builder()
.addressRepo(new JdbcAddressRepository(jdbcConfig)) .addressRepo(new JdbcAddressRepository(jdbcConfig))
.inventory(new JdbcInventory(jdbcConfig)) .inventory(new JdbcInventory(jdbcConfig))
.nodeRegistry(new JdbcNodeRegistry(jdbcConfig)) .nodeRegistry(new MemoryNodeRegistry())
.messageRepo(new JdbcMessageRepository(jdbcConfig)) .messageRepo(new JdbcMessageRepository(jdbcConfig))
.networkHandler(new NetworkNode()) .networkHandler(new NetworkNode())
.port(48444) .port(48444)
.streams(1)
.build(); .build();
ctx.startup(new BitmessageContext.Listener() { ctx.startup(new BitmessageContext.Listener() {

View File

@ -267,7 +267,6 @@ public class BitmessageContext {
AddressRepository addressRepo; AddressRepository addressRepo;
MessageRepository messageRepo; MessageRepository messageRepo;
ProofOfWorkEngine proofOfWorkEngine; ProofOfWorkEngine proofOfWorkEngine;
TreeSet<Long> streams;
public Builder() { public Builder() {
} }
@ -307,28 +306,12 @@ public class BitmessageContext {
return this; return this;
} }
public Builder streams(Collection<Long> streams) {
this.streams = new TreeSet<>(streams);
return this;
}
public Builder streams(long... streams) {
this.streams = new TreeSet<>();
for (long stream : streams) {
this.streams.add(stream);
}
return this;
}
public BitmessageContext build() { public BitmessageContext build() {
nonNull("inventory", inventory); nonNull("inventory", inventory);
nonNull("nodeRegistry", nodeRegistry); nonNull("nodeRegistry", nodeRegistry);
nonNull("networkHandler", networkHandler); nonNull("networkHandler", networkHandler);
nonNull("addressRepo", addressRepo); nonNull("addressRepo", addressRepo);
nonNull("messageRepo", messageRepo); nonNull("messageRepo", messageRepo);
if (streams == null) {
streams(1);
}
if (proofOfWorkEngine == null) { if (proofOfWorkEngine == null) {
proofOfWorkEngine = new MultiThreadedPOWEngine(); proofOfWorkEngine = new MultiThreadedPOWEngine();
} }

View File

@ -49,7 +49,7 @@ public class InternalContext {
private final MessageRepository messageRepository; private final MessageRepository messageRepository;
private final ProofOfWorkEngine proofOfWorkEngine; private final ProofOfWorkEngine proofOfWorkEngine;
private final TreeSet<Long> streams; private final TreeSet<Long> streams = new TreeSet<>();
private final int port; private final int port;
private long networkNonceTrialsPerByte = 1000; private long networkNonceTrialsPerByte = 1000;
private long networkExtraBytes = 1000; private long networkExtraBytes = 1000;
@ -65,7 +65,7 @@ public class InternalContext {
this.clientNonce = Security.randomNonce(); this.clientNonce = Security.randomNonce();
port = builder.port; port = builder.port;
streams = builder.streams; streams.add(1L); // FIXME
init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine); init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine);
} }

View File

@ -16,14 +16,11 @@
package ch.dissem.bitmessage.ports; package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.utils.Property; import ch.dissem.bitmessage.utils.Property;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
/** /**
* Handles incoming messages * Handles incoming messages

View File

@ -57,4 +57,15 @@ public class Collections {
} }
return result; return result;
} }
public static <T> T selectRandom(Collection<T> collection) {
int index = RANDOM.nextInt(collection.size());
for (T item : collection) {
if (index == 0) {
return item;
}
index--;
}
throw new IllegalArgumentException("Empty collection? Size: " + collection.size());
}
} }

View File

@ -20,10 +20,14 @@ package ch.dissem.bitmessage.utils;
* A simple utility class that simplifies using the second based time used in Bitmessage. * A simple utility class that simplifies using the second based time used in Bitmessage.
*/ */
public class UnixTime { public class UnixTime {
/**
* Length of a minute in seconds, intended for use with {@link #now(long)}.
*/
public static final int MINUTE = 60;
/** /**
* Length of an hour in seconds, intended for use with {@link #now(long)}. * Length of an hour in seconds, intended for use with {@link #now(long)}.
*/ */
public static final long HOUR = 60 * 60; public static final long HOUR = 60 * MINUTE;
/** /**
* Length of a day in seconds, intended for use with {@link #now(long)}. * Length of a day in seconds, intended for use with {@link #now(long)}.
*/ */

View File

@ -37,14 +37,14 @@ import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Arrays; import java.util.*;
import java.util.List; import java.util.concurrent.ConcurrentHashMap;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.State.*; import static ch.dissem.bitmessage.networking.Connection.State.*;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/** /**
* A connection to a specific node * A connection to a specific node
@ -53,41 +53,44 @@ public class Connection implements Runnable {
public static final int READ_TIMEOUT = 2000; public static final int READ_TIMEOUT = 2000;
private final static Logger LOG = LoggerFactory.getLogger(Connection.class); private final static Logger LOG = LoggerFactory.getLogger(Connection.class);
private static final int CONNECT_TIMEOUT = 5000; private static final int CONNECT_TIMEOUT = 5000;
private final ConcurrentMap<InventoryVector, Long> ivCache;
private InternalContext ctx; private InternalContext ctx;
private Mode mode; private Mode mode;
private State state; private State state;
private Socket socket; private Socket socket;
private InputStream in; private InputStream in;
private OutputStream out; private OutputStream out;
private MessageListener listener; private MessageListener listener;
private int version; private int version;
private long[] streams; private long[] streams;
private NetworkAddress host; private NetworkAddress host;
private NetworkAddress node; private NetworkAddress node;
private Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>(); private Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
private ConcurrentMap<InventoryVector, Long> requestedObjects;
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener) throws IOException { public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener,
this.ctx = context; ConcurrentMap<InventoryVector, Long> requestedObjectsMap) throws IOException {
this.mode = mode; this(context, mode, listener, requestedObjectsMap);
this.state = CONNECTING;
this.socket = socket; this.socket = socket;
this.listener = listener;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(); this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build();
} }
public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener) { public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener,
ConcurrentMap<InventoryVector, Long> requestedObjectsMap) {
this(context, mode, listener, requestedObjectsMap);
this.socket = new Socket();
this.node = node;
}
private Connection(InternalContext context, Mode mode, MessageListener listener,
ConcurrentMap<InventoryVector, Long> requestedObjectsMap) {
this.ctx = context; this.ctx = context;
this.mode = mode; this.mode = mode;
this.state = CONNECTING; this.state = CONNECTING;
this.socket = new Socket();
this.node = node;
this.listener = listener; this.listener = listener;
this.requestedObjects = requestedObjectsMap;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
ivCache = new ConcurrentHashMap<>();
} }
public Mode getMode() { public Mode getMode() {
@ -199,13 +202,66 @@ public class Connection implements Runnable {
} }
} }
private void cleanupIvCache() {
Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE);
for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) {
if (entry.getValue() < fiveMinutesAgo) {
ivCache.remove(entry.getKey());
}
}
}
private void updateIvCache(InventoryVector... inventory) {
cleanupIvCache();
Long now = UnixTime.now();
for (InventoryVector iv : inventory) {
ivCache.put(iv, now);
}
}
private void updateIvCache(List<InventoryVector> inventory) {
cleanupIvCache();
Long now = UnixTime.now();
for (InventoryVector iv : inventory) {
ivCache.put(iv, now);
}
}
private void updateRequestedObjects(List<InventoryVector> missing) {
Long now = UnixTime.now();
Long fiveMinutesAgo = now - 5 * MINUTE;
Long tenMinutesAgo = now - 10 * MINUTE;
List<InventoryVector> stillMissing = new LinkedList<>();
for (Map.Entry<InventoryVector, Long> entry : requestedObjects.entrySet()) {
if (entry.getValue() < fiveMinutesAgo) {
stillMissing.add(entry.getKey());
// If it's still not available after 10 minutes, we won't look for it
// any longer (except it's announced again)
if (entry.getValue() < tenMinutesAgo) {
requestedObjects.remove(entry.getKey());
}
}
}
for (InventoryVector iv : missing) {
requestedObjects.put(iv, now);
}
if (!stillMissing.isEmpty()) {
LOG.debug(stillMissing.size() + " items are still missing.");
missing.addAll(stillMissing);
}
}
private void receiveMessage(MessagePayload messagePayload) { private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) { switch (messagePayload.getCommand()) {
case INV: case INV:
Inv inv = (Inv) messagePayload; Inv inv = (Inv) messagePayload;
updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams); List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(requestedObjects.keySet());
LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are " LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are "
+ missing.size() + " missing."); + missing.size() + " missing.");
updateRequestedObjects(missing);
send(new GetData.Builder().inventory(missing).build()); send(new GetData.Builder().inventory(missing).build());
break; break;
case GETDATA: case GETDATA:
@ -276,6 +332,11 @@ public class Connection implements Runnable {
sendingQueue.offer(new Inv.Builder() sendingQueue.offer(new Inv.Builder()
.addInventoryVector(iv) .addInventoryVector(iv)
.build()); .build());
updateIvCache(iv);
}
public boolean knowsOf(InventoryVector iv) {
return ivCache.containsKey(iv);
} }
@Override @Override
@ -291,6 +352,13 @@ public class Connection implements Runnable {
return Objects.hash(node); return Objects.hash(node);
} }
public void request(InventoryVector key) {
sendingQueue.offer(new GetData.Builder()
.addInventoryVector(key)
.build()
);
}
public enum Mode {SERVER, CLIENT} public enum Mode {SERVER, CLIENT}
public enum State {CONNECTING, ACTIVE, DISCONNECTED} public enum State {CONNECTING, ACTIVE, DISCONNECTED}

View File

@ -30,6 +30,8 @@ import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -43,13 +45,17 @@ import static ch.dissem.bitmessage.utils.DebugUtils.inc;
* Handles all the networky stuff. * Handles all the networky stuff.
*/ */
public class NetworkNode implements NetworkHandler, ContextHolder { public class NetworkNode implements NetworkHandler, ContextHolder {
public final static int NETWORK_MAGIC_NUMBER = 8;
private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class); private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class);
private final ExecutorService pool; private final ExecutorService pool;
private final List<Connection> connections = new LinkedList<>(); private final List<Connection> connections = new LinkedList<>();
private InternalContext ctx; private InternalContext ctx;
private ServerSocket serverSocket; private ServerSocket serverSocket;
private Thread serverThread;
private Thread connectionManager; private Thread connectionManager;
private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>();
public NetworkNode() { public NetworkNode() {
pool = Executors.newCachedThreadPool(); pool = Executors.newCachedThreadPool();
} }
@ -66,18 +72,21 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
} }
try { try {
serverSocket = new ServerSocket(ctx.getPort()); serverSocket = new ServerSocket(ctx.getPort());
pool.execute(new Runnable() { serverThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
try { while (!serverSocket.isClosed()) {
Socket socket = serverSocket.accept(); try {
socket.setSoTimeout(10000); Socket socket = serverSocket.accept();
startConnection(new Connection(ctx, SERVER, socket, listener)); socket.setSoTimeout(Connection.READ_TIMEOUT);
} catch (IOException e) { startConnection(new Connection(ctx, SERVER, socket, listener, requestedObjects));
LOG.debug(e.getMessage(), e); } catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
} }
} }
}); }, "server");
serverThread.start();
connectionManager = new Thread(new Runnable() { connectionManager = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -96,10 +105,11 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
} }
} }
} }
if (active < 8) { if (active < NETWORK_MAGIC_NUMBER) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(8 - active, ctx.getStreams()); List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
startConnection(new Connection(ctx, CLIENT, address, listener)); startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects));
} }
} }
Thread.sleep(30000); Thread.sleep(30000);
@ -126,12 +136,12 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
} catch (IOException e) { } catch (IOException e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);
} }
pool.shutdown();
synchronized (connections) { synchronized (connections) {
for (Connection c : connections) { for (Connection c : connections) {
c.disconnect(); c.disconnect();
} }
} }
pool.shutdown();
} }
private void startConnection(Connection c) { private void startConnection(Connection c) {
@ -147,17 +157,17 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
@Override @Override
public void offer(final InventoryVector iv) { public void offer(final InventoryVector iv) {
List<Connection> active = new LinkedList<>(); List<Connection> target = new LinkedList<>();
synchronized (connections) { synchronized (connections) {
for (Connection connection : connections) { for (Connection connection : connections) {
if (connection.getState() == ACTIVE) { if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
active.add(connection); target.add(connection);
} }
} }
} }
LOG.debug(active.size() + " connections available to offer " + iv); LOG.debug(target.size() + " connections available to offer " + iv);
List<Connection> random8 = Collections.selectRandom(8, active); List<Connection> randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target);
for (Connection connection : random8) { for (Connection connection : randomSubset) {
connection.offer(iv); connection.offer(iv);
} }
} }

View File

@ -1,144 +0,0 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.NodeRegistry;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Scanner;
import static ch.dissem.bitmessage.utils.UnixTime.HOUR;
public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class);
public JdbcNodeRegistry(JdbcConfig config) {
super(config);
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
List<NetworkAddress> result = doGetKnownNodes(limit, streams);
if (result.isEmpty()) {
try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) {
Scanner scanner = new Scanner(in);
long stream = 1;
while (scanner.hasNext()) {
try {
String line = scanner.nextLine().trim();
if (line.startsWith("#") || line.isEmpty()) {
// Ignore
continue;
}
if (line.startsWith("[stream")) {
stream = Long.parseLong(line.substring(8, line.lastIndexOf(']')));
} else {
int portIndex = line.lastIndexOf(':');
InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex));
int port = Integer.valueOf(line.substring(portIndex + 1));
result.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build());
}
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}
offerAddresses(result);
return doGetKnownNodes(limit, streams);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
return result;
}
private List<NetworkAddress> doGetKnownNodes(int limit, long... streams) {
List<NetworkAddress> result = new LinkedList<>();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE stream IN (" + join(streams) + ") ORDER BY RANDOM() LIMIT " + limit);
while (rs.next()) {
result.add(new NetworkAddress.Builder()
.ipv6(rs.getBytes("ip"))
.port(rs.getInt("port"))
.services(rs.getLong("services"))
.stream(rs.getLong("stream"))
.time(rs.getLong("time"))
.build());
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return result;
}
@Override
public void offerAddresses(List<NetworkAddress> addresses) {
try (Connection connection = config.getConnection()) {
PreparedStatement exists = connection.prepareStatement("SELECT time FROM Node WHERE ip = ? AND port = ? AND stream = ?");
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)");
PreparedStatement update = connection.prepareStatement(
"UPDATE Node SET services = ?, time = ? WHERE ip = ? AND port = ? AND stream = ?");
connection.setAutoCommit(false);
for (NetworkAddress node : addresses) {
exists.setBytes(1, node.getIPv6());
exists.setInt(2, node.getPort());
exists.setLong(3, node.getStream());
ResultSet lastConnectionTime = exists.executeQuery();
if (lastConnectionTime.next()) {
long time = lastConnectionTime.getLong("time");
if (time < node.getTime() && node.getTime() <= UnixTime.now()) {
time = node.getTime();
update.setLong(1, node.getServices());
update.setLong(2, time);
update.setBytes(3, node.getIPv6());
update.setInt(4, node.getPort());
update.setLong(5, node.getStream());
update.executeUpdate();
}
} else if (node.getTime() <= UnixTime.now()) {
insert.setBytes(1, node.getIPv6());
insert.setInt(2, node.getPort());
insert.setLong(3, node.getServices());
insert.setLong(4, node.getStream());
insert.setLong(5, node.getTime());
insert.executeUpdate();
}
connection.commit();
}
if (addresses.size() > 100) {
// Let's clean up after we received an update from another node. This way, we shouldn't end up with an
// empty node registry.
PreparedStatement cleanup = connection.prepareStatement("DELETE FROM Node WHERE time < ?");
cleanup.setLong(1, UnixTime.now(-3 * HOUR));
cleanup.execute();
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.NodeRegistry;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static ch.dissem.bitmessage.utils.Collections.selectRandom;
import static ch.dissem.bitmessage.utils.UnixTime.HOUR;
import static java.util.Collections.newSetFromMap;
public class MemoryNodeRegistry implements NodeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MemoryNodeRegistry.class);
private final Map<Long, Set<NetworkAddress>> stableNodes = new HashMap<>();
private final Map<Long, Set<NetworkAddress>> knownNodes = new ConcurrentHashMap<>();
public MemoryNodeRegistry() {
try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) {
Scanner scanner = new Scanner(in);
long stream = 0;
Set<NetworkAddress> streamSet = null;
while (scanner.hasNext()) {
try {
String line = scanner.nextLine().trim();
if (line.startsWith("#") || line.isEmpty()) {
// Ignore
continue;
}
if (line.startsWith("[stream")) {
stream = Long.parseLong(line.substring(8, line.lastIndexOf(']')));
streamSet = new HashSet<>();
stableNodes.put(stream, streamSet);
} else if (streamSet != null) {
int portIndex = line.lastIndexOf(':');
InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex));
int port = Integer.valueOf(line.substring(portIndex + 1));
streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build());
}
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
List<NetworkAddress> result = new LinkedList<>();
for (long stream : streams) {
Set<NetworkAddress> known = knownNodes.get(stream);
if (known != null && !known.isEmpty()) {
for (NetworkAddress node : known) {
if (node.getTime() > UnixTime.now(-3 * HOUR)) {
result.add(node);
} else {
known.remove(node);
}
}
} else if (stableNodes.containsKey(stream)) {
// To reduce load on stable nodes, only return one
result.add(selectRandom(stableNodes.get(stream)));
}
}
return selectRandom(limit, result);
}
@Override
public void offerAddresses(List<NetworkAddress> addresses) {
for (NetworkAddress node : addresses) {
if (node.getTime() <= UnixTime.now()) {
if (!knownNodes.containsKey(node.getStream())) {
synchronized (knownNodes) {
if (!knownNodes.containsKey(node.getStream())) {
knownNodes.put(
node.getStream(),
newSetFromMap(new ConcurrentHashMap<NetworkAddress, Boolean>())
);
}
}
}
if (node.getTime() <= UnixTime.now()) {
// TODO: This isn't quite correct
// If the node is already known, the one with the more recent time should be used
knownNodes.get(node.getStream()).add(node);
}
}
}
}
}

View File

@ -35,7 +35,7 @@ public class JdbcNodeRegistryTest {
public void setUp() throws Exception { public void setUp() throws Exception {
config = new TestJdbcConfig(); config = new TestJdbcConfig();
config.reset(); config.reset();
registry = new JdbcNodeRegistry(config); registry = new MemoryNodeRegistry();
registry.offerAddresses(Arrays.asList( registry.offerAddresses(Arrays.asList(
createAddress(1, 8444, 1, now()), createAddress(1, 8444, 1, now()),