Improvements

- Massively reduced logging, especially at debug level
- Optimizations to reduce system load
- Use bootstrapping to find stable nodes
This commit is contained in:
Christian Basler 2015-11-08 10:14:37 +01:00
parent 2a8834e3c6
commit 1f05a52f05
10 changed files with 88 additions and 94 deletions

View File

@ -33,8 +33,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import static ch.dissem.bitmessage.entity.Plaintext.Status.*; import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST; import static ch.dissem.bitmessage.entity.Plaintext.Type.BROADCAST;
@ -201,13 +200,15 @@ public class BitmessageContext {
* @param wait waits for the synchronization thread to finish * @param wait waits for the synchronization thread to finish
*/ */
public void synchronize(InetAddress host, int port, long timeoutInSeconds, boolean wait) { public void synchronize(InetAddress host, int port, long timeoutInSeconds, boolean wait) {
Thread t = ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds); Future<?> future = ctx.getNetworkHandler().synchronize(host, port, networkListener, timeoutInSeconds);
if (wait) { if (wait) {
try { try {
t.join(); future.get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Thread was interrupted. Trying to shut down synchronization and returning."); LOG.info("Thread was interrupted. Trying to shut down synchronization and returning.");
t.interrupt(); future.cancel(true);
} catch (CancellationException | ExecutionException e) {
LOG.debug(e.getMessage(), e);
} }
} }
} }
@ -241,7 +242,7 @@ public class BitmessageContext {
ctx.getAddressRepo().save(address); ctx.getAddressRepo().save(address);
break; break;
} else { } else {
LOG.debug("Found pubkey for " + address + " but signature is invalid"); LOG.info("Found pubkey for " + address + " but signature is invalid");
} }
} }
} else { } else {

View File

@ -71,7 +71,8 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
protected void receive(ObjectMessage object, GetPubkey getPubkey) { protected void receive(ObjectMessage object, GetPubkey getPubkey) {
BitmessageAddress identity = ctx.getAddressRepo().findIdentity(getPubkey.getRipeTag()); BitmessageAddress identity = ctx.getAddressRepo().findIdentity(getPubkey.getRipeTag());
if (identity != null && identity.getPrivateKey() != null) { if (identity != null && identity.getPrivateKey() != null) {
LOG.debug("Got pubkey request for identity " + identity); LOG.info("Got pubkey request for identity " + identity);
// FIXME: only send pubkey if it wasn't sent in the last 28 days
ctx.sendPubkey(identity, object.getStream()); ctx.sendPubkey(identity, object.getStream());
} }
} }
@ -90,10 +91,10 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
} }
if (address != null) { if (address != null) {
address.setPubkey(pubkey); address.setPubkey(pubkey);
LOG.debug("Got pubkey for contact " + address); LOG.info("Got pubkey for contact " + address);
ctx.getAddressRepo().save(address); ctx.getAddressRepo().save(address);
List<Plaintext> messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address); List<Plaintext> messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address);
LOG.debug("Sending " + messages.size() + " messages for contact " + address); LOG.info("Sending " + messages.size() + " messages for contact " + address);
for (Plaintext msg : messages) { for (Plaintext msg : messages) {
msg.setStatus(DOING_PROOF_OF_WORK); msg.setStatus(DOING_PROOF_OF_WORK);
ctx.getMessageRepository().save(msg); ctx.getMessageRepository().save(msg);

View File

@ -157,7 +157,7 @@ public class CryptoBox implements Streamable {
} }
public Builder curveType(int curveType) { public Builder curveType(int curveType) {
if (curveType != 0x2CA) LOG.debug("Unexpected curve type " + curveType); if (curveType != 0x2CA) LOG.trace("Unexpected curve type " + curveType);
this.curveType = curveType; this.curveType = curveType;
return this; return this;
} }

View File

@ -123,7 +123,6 @@ public abstract class AbstractSecurity implements Security, InternalContext.Cont
private byte[] getProofOfWorkTarget(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException { private byte[] getProofOfWorkTarget(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException {
BigInteger TTL = BigInteger.valueOf(object.getExpiresTime() - UnixTime.now()); BigInteger TTL = BigInteger.valueOf(object.getExpiresTime() - UnixTime.now());
LOG.debug("TTL: " + TTL + "s");
BigInteger numerator = TWO.pow(64); BigInteger numerator = TWO.pow(64);
BigInteger powLength = BigInteger.valueOf(object.getPayloadBytesWithoutNonce().length + extraBytes); BigInteger powLength = BigInteger.valueOf(object.getPayloadBytesWithoutNonce().length + extraBytes);
BigInteger denominator = BigInteger.valueOf(nonceTrialsPerByte).multiply(powLength.add(powLength.multiply(TTL).divide(BigInteger.valueOf(2).pow(16)))); BigInteger denominator = BigInteger.valueOf(nonceTrialsPerByte).multiply(powLength.add(powLength.multiply(TTL).divide(BigInteger.valueOf(2).pow(16))));

View File

@ -37,10 +37,7 @@ public class MemoryNodeRegistry implements NodeRegistry {
private final Map<Long, Set<NetworkAddress>> stableNodes = new ConcurrentHashMap<>(); private final Map<Long, Set<NetworkAddress>> stableNodes = new ConcurrentHashMap<>();
private final Map<Long, Set<NetworkAddress>> knownNodes = new ConcurrentHashMap<>(); private final Map<Long, Set<NetworkAddress>> knownNodes = new ConcurrentHashMap<>();
public MemoryNodeRegistry() { private void loadStableNodes() {
new Thread(new Runnable() {
@Override
public void run() {
try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) { try (InputStream in = getClass().getClassLoader().getResourceAsStream("nodes.txt")) {
Scanner scanner = new Scanner(in); Scanner scanner = new Scanner(in);
long stream = 0; long stream = 0;
@ -58,20 +55,25 @@ public class MemoryNodeRegistry implements NodeRegistry {
stableNodes.put(stream, streamSet); stableNodes.put(stream, streamSet);
} else if (streamSet != null) { } else if (streamSet != null) {
int portIndex = line.lastIndexOf(':'); int portIndex = line.lastIndexOf(':');
InetAddress inetAddress = InetAddress.getByName(line.substring(0, portIndex)); InetAddress[] inetAddresses = InetAddress.getAllByName(line.substring(0, portIndex));
int port = Integer.valueOf(line.substring(portIndex + 1)); int port = Integer.valueOf(line.substring(portIndex + 1));
for (InetAddress inetAddress : inetAddresses) {
streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build()); streamSet.add(new NetworkAddress.Builder().ip(inetAddress).port(port).stream(stream).build());
} }
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
} }
} }
if (LOG.isDebugEnabled()) {
for (Map.Entry<Long, Set<NetworkAddress>> e : stableNodes.entrySet()) {
LOG.debug("Stream " + e.getKey() + ": loaded " + e.getValue().size() + " bootstrap nodes.");
}
}
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
}, "node registry initializer").start();
}
@Override @Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) { public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
@ -86,9 +88,16 @@ public class MemoryNodeRegistry implements NodeRegistry {
known.remove(node); known.remove(node);
} }
} }
} else if (stableNodes.containsKey(stream)) { } else {
Set<NetworkAddress> nodes = stableNodes.get(stream);
if (nodes == null || nodes.isEmpty()) {
loadStableNodes();
nodes = stableNodes.get(stream);
}
if (nodes != null && !nodes.isEmpty()) {
// To reduce load on stable nodes, only return one // To reduce load on stable nodes, only return one
result.add(selectRandom(stableNodes.get(stream))); result.add(selectRandom(nodes));
}
} }
} }
return selectRandom(limit, result); return selectRandom(limit, result);

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.utils.Property;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.Future;
/** /**
* Handles incoming messages * Handles incoming messages
@ -33,7 +34,7 @@ public interface NetworkHandler {
* An implementation should disconnect if either the timeout is reached or the returned thread is interrupted. * An implementation should disconnect if either the timeout is reached or the returned thread is interrupted.
* </p> * </p>
*/ */
Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds); Future<?> synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds);
/** /**
* Start a full network node, accepting incoming connections and relaying objects. * Start a full network node, accepting incoming connections and relaying objects.

View File

@ -1,18 +1,8 @@
[stream 1] [stream 1]
5.45.99.75:8444
75.167.159.54:8444
95.165.168.168:8444
85.180.139.241:8444
178.62.12.187:8448
[2604:2000:1380:9f:82e:148b:2746:d0c7]:8080
158.222.211.81:8080
24.188.198.204:8111
109.147.204.113:1195
178.11.46.221:8444
dissem.ch:8444 dissem.ch:8444
bootstrap8080.bitmessage.org:8080
bootstrap8444.bitmessage.org:8444
[stream 2] [stream 2]
# none yet # none yet

View File

@ -235,10 +235,9 @@ public class Connection {
ObjectMessage objectMessage = (ObjectMessage) messagePayload; ObjectMessage objectMessage = (ObjectMessage) messagePayload;
try { try {
if (ctx.getInventory().contains(objectMessage)) { if (ctx.getInventory().contains(objectMessage)) {
LOG.debug("Received object " + objectMessage.getInventoryVector() + " - already in inventory"); LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
break; break;
} }
LOG.debug("Received object " + objectMessage.getInventoryVector());
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
listener.receive(objectMessage); listener.receive(objectMessage);
ctx.getInventory().storeObject(objectMessage); ctx.getInventory().storeObject(objectMessage);
@ -294,7 +293,6 @@ public class Connection {
} }
public void offer(InventoryVector iv) { public void offer(InventoryVector iv) {
LOG.debug("Offering " + iv + " to node " + node.toString());
sendingQueue.offer(new Inv.Builder() sendingQueue.offer(new Inv.Builder()
.addInventoryVector(iv) .addInventoryVector(iv)
.build()); .build());
@ -321,14 +319,7 @@ public class Connection {
private synchronized void initSocket(Socket socket) throws IOException { private synchronized void initSocket(Socket socket) throws IOException {
if (!socketInitialized) { if (!socketInitialized) {
if (!socket.isConnected()) { if (!socket.isConnected()) {
LOG.debug("Trying to connect to node " + node); LOG.trace("Trying to connect to node " + node);
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
}
socket.setSoTimeout(READ_TIMEOUT);
in = socket.getInputStream();
out = socket.getOutputStream();
if (!socket.isConnected()) {
LOG.debug("Trying to connect to node " + node);
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT); socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
} }
socket.setSoTimeout(READ_TIMEOUT); socket.setSoTimeout(READ_TIMEOUT);
@ -359,6 +350,7 @@ public class Connection {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
} }
while (state != DISCONNECTED) { while (state != DISCONNECTED) {
Thread.sleep(100);
try { try {
NetworkMessage msg = Factory.getNetworkMessage(version, in); NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null) if (msg == null)
@ -413,15 +405,13 @@ public class Connection {
if (syncFinished(null)) disconnect(); if (syncFinished(null)) disconnect();
} }
} }
Thread.yield();
} }
} catch (IOException | NodeException e) { } catch (InterruptedException | IOException | NodeException e) {
disconnect(); LOG.trace("Reader disconnected from node " + node + ": " + e.getMessage());
LOG.debug("Reader disconnected from node " + node + ": " + e.getMessage());
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOG.debug("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e); LOG.trace("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e);
disconnect();
} finally { } finally {
disconnect();
try { try {
socket.close(); socket.close();
} catch (Exception e) { } catch (Exception e) {
@ -438,14 +428,13 @@ public class Connection {
initSocket(socket); initSocket(socket);
while (state != DISCONNECTED) { while (state != DISCONNECTED) {
if (sendingQueue.size() > 0) { if (sendingQueue.size() > 0) {
LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node);
send(sendingQueue.poll()); send(sendingQueue.poll());
} else { } else {
Thread.sleep(100); Thread.sleep(100);
} }
} }
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
LOG.debug("Writer disconnected from node " + node + ": " + e.getMessage()); LOG.trace("Writer disconnected from node " + node + ": " + e.getMessage());
disconnect(); disconnect();
} }
} }

View File

@ -32,10 +32,7 @@ import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
@ -58,7 +55,14 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(); private ConcurrentMap<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>();
public DefaultNetworkHandler() { public DefaultNetworkHandler() {
pool = Executors.newCachedThreadPool(); pool = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setPriority(Thread.MIN_PRIORITY);
return thread;
}
});
} }
@Override @Override
@ -67,14 +71,12 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
@Override @Override
public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) { public Future<?> synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) {
try { try {
Connection connection = Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds); Connection connection = Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds);
Thread tr = new Thread(connection.getReader()); Future<?> reader = pool.submit(connection.getReader());
Thread tw = new Thread(connection.getWriter()); pool.execute(connection.getWriter());
tr.start(); return reader;
tw.start();
return tr;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -143,8 +145,10 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
for (NetworkAddress address : addresses) { for (NetworkAddress address : addresses) {
startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects)); startConnection(new Connection(ctx, CLIENT, address, listener, requestedObjects));
} }
} Thread.sleep(10000);
} else {
Thread.sleep(30000); Thread.sleep(30000);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
running = false; running = false;
} catch (Exception e) { } catch (Exception e) {
@ -204,7 +208,6 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
} }
} }
} }
LOG.debug(target.size() + " connections available to offer " + iv);
List<Connection> randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target); List<Connection> randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target);
for (Connection connection : randomSubset) { for (Connection connection : randomSubset) {
connection.offer(iv); connection.offer(iv);

View File

@ -29,6 +29,7 @@ import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -116,10 +117,10 @@ public class NetworkHandlerTest {
"V1Msg.payload" "V1Msg.payload"
); );
Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001,
mock(NetworkHandler.MessageListener.class), mock(NetworkHandler.MessageListener.class),
10); 10);
t.join(); future.get();
assertInventorySize(3, nodeInventory); assertInventorySize(3, nodeInventory);
assertInventorySize(3, peerInventory); assertInventorySize(3, peerInventory);
} }
@ -133,10 +134,10 @@ public class NetworkHandlerTest {
nodeInventory.init(); nodeInventory.init();
Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001,
mock(NetworkHandler.MessageListener.class), mock(NetworkHandler.MessageListener.class),
10); 10);
t.join(); future.get();
assertInventorySize(2, nodeInventory); assertInventorySize(2, nodeInventory);
assertInventorySize(2, peerInventory); assertInventorySize(2, peerInventory);
} }
@ -149,10 +150,10 @@ public class NetworkHandlerTest {
"V1Msg.payload" "V1Msg.payload"
); );
Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001,
mock(NetworkHandler.MessageListener.class), mock(NetworkHandler.MessageListener.class),
10); 10);
t.join(); future.get();
assertInventorySize(1, nodeInventory); assertInventorySize(1, nodeInventory);
assertInventorySize(1, peerInventory); assertInventorySize(1, peerInventory);
} }