Greatly improved network code - the "manage the node repository" part of issue #5 should now be OK
This commit is contained in:
@ -22,23 +22,28 @@ import ch.dissem.bitmessage.entity.*;
|
||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
|
||||
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
|
||||
import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException;
|
||||
import ch.dissem.bitmessage.exception.NodeException;
|
||||
import ch.dissem.bitmessage.factory.Factory;
|
||||
import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener;
|
||||
import ch.dissem.bitmessage.utils.DebugUtils;
|
||||
import ch.dissem.bitmessage.utils.Security;
|
||||
import ch.dissem.bitmessage.utils.UnixTime;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
|
||||
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
|
||||
import static ch.dissem.bitmessage.networking.Connection.State.*;
|
||||
|
||||
/**
|
||||
@ -46,9 +51,11 @@ import static ch.dissem.bitmessage.networking.Connection.State.*;
|
||||
*/
|
||||
public class Connection implements Runnable {
|
||||
private final static Logger LOG = LoggerFactory.getLogger(Connection.class);
|
||||
private static final int CONNECT_TIMEOUT = 10000;
|
||||
|
||||
private InternalContext ctx;
|
||||
|
||||
private Mode mode;
|
||||
private State state;
|
||||
private Socket socket;
|
||||
private InputStream in;
|
||||
@ -63,17 +70,30 @@ public class Connection implements Runnable {
|
||||
|
||||
private Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
|
||||
|
||||
public Connection(InternalContext context, State state, Socket socket, MessageListener listener) throws IOException {
|
||||
public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener) throws IOException {
|
||||
this.ctx = context;
|
||||
this.state = state;
|
||||
this.mode = mode;
|
||||
this.state = CONNECTING;
|
||||
this.socket = socket;
|
||||
this.in = socket.getInputStream();
|
||||
this.out = socket.getOutputStream();
|
||||
this.listener = listener;
|
||||
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
|
||||
this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build();
|
||||
}
|
||||
|
||||
public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener) {
|
||||
this.ctx = context;
|
||||
this.mode = mode;
|
||||
this.state = CONNECTING;
|
||||
this.socket = new Socket();
|
||||
this.node = node;
|
||||
this.listener = listener;
|
||||
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
|
||||
}
|
||||
|
||||
public Mode getMode() {
|
||||
return mode;
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
return state;
|
||||
}
|
||||
@ -84,58 +104,86 @@ public class Connection implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (state == CLIENT) {
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
}
|
||||
while (state != DISCONNECTED) {
|
||||
try {
|
||||
NetworkMessage msg = Factory.getNetworkMessage(version, in);
|
||||
if (msg == null)
|
||||
continue;
|
||||
switch (state) {
|
||||
case ACTIVE:
|
||||
receiveMessage(msg.getPayload());
|
||||
sendQueue();
|
||||
break;
|
||||
try (Socket socket = this.socket) {
|
||||
if (!socket.isConnected()) {
|
||||
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
|
||||
}
|
||||
this.in = socket.getInputStream();
|
||||
this.out = socket.getOutputStream();
|
||||
if (mode == CLIENT) {
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
}
|
||||
while (state != DISCONNECTED) {
|
||||
try {
|
||||
NetworkMessage msg = Factory.getNetworkMessage(version, in);
|
||||
if (msg == null)
|
||||
continue;
|
||||
switch (state) {
|
||||
case ACTIVE:
|
||||
receiveMessage(msg.getPayload());
|
||||
sendQueue();
|
||||
break;
|
||||
|
||||
default:
|
||||
switch (msg.getPayload().getCommand()) {
|
||||
case VERSION:
|
||||
Version payload = (Version) msg.getPayload();
|
||||
if (payload.getNonce() == ctx.getClientNonce()) {
|
||||
LOG.info("Tried to connect to self, disconnecting.");
|
||||
disconnect();
|
||||
} else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) {
|
||||
this.version = payload.getVersion();
|
||||
this.streams = payload.getStreams();
|
||||
send(new VerAck());
|
||||
state = ACTIVE;
|
||||
sendAddresses();
|
||||
sendInventory();
|
||||
} else {
|
||||
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
|
||||
disconnect();
|
||||
}
|
||||
break;
|
||||
case VERACK:
|
||||
if (state == SERVER) {
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Command 'version' or 'verack' expected, but was '"
|
||||
+ msg.getPayload().getCommand() + "'");
|
||||
}
|
||||
}
|
||||
if (socket.isClosed()) state = DISCONNECTED;
|
||||
} catch (SocketTimeoutException ignore) {
|
||||
if (state == ACTIVE) {
|
||||
sendQueue();
|
||||
default:
|
||||
switch (msg.getPayload().getCommand()) {
|
||||
case VERSION:
|
||||
Version payload = (Version) msg.getPayload();
|
||||
if (payload.getNonce() == ctx.getClientNonce()) {
|
||||
LOG.info("Tried to connect to self, disconnecting.");
|
||||
disconnect();
|
||||
} else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) {
|
||||
this.version = payload.getVersion();
|
||||
this.streams = payload.getStreams();
|
||||
send(new VerAck());
|
||||
switch (mode) {
|
||||
case SERVER:
|
||||
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
|
||||
break;
|
||||
case CLIENT:
|
||||
activateConnection();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
|
||||
disconnect();
|
||||
}
|
||||
break;
|
||||
case VERACK:
|
||||
switch (mode) {
|
||||
case SERVER:
|
||||
activateConnection();
|
||||
break;
|
||||
case CLIENT:
|
||||
// NO OP
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Command 'version' or 'verack' expected, but was '"
|
||||
+ msg.getPayload().getCommand() + "'");
|
||||
}
|
||||
}
|
||||
if (socket.isClosed()) state = DISCONNECTED;
|
||||
} catch (SocketTimeoutException ignore) {
|
||||
if (state == ACTIVE) {
|
||||
sendQueue();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException | NodeException e) {
|
||||
LOG.debug("disconnection from node " + node + ": " + e.getMessage(), e);
|
||||
disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
private void activateConnection() {
|
||||
state = ACTIVE;
|
||||
sendAddresses();
|
||||
sendInventory();
|
||||
node.setTime(UnixTime.now());
|
||||
ctx.getNodeRegistry().offerAddresses(Arrays.asList(node));
|
||||
}
|
||||
|
||||
private void sendQueue() {
|
||||
LOG.debug("Sending " + sendingQueue.size() + " messages to node " + node);
|
||||
for (MessagePayload msg = sendingQueue.poll(); msg != null; msg = sendingQueue.poll()) {
|
||||
@ -166,6 +214,8 @@ public class Connection implements Runnable {
|
||||
Security.checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
|
||||
listener.receive(objectMessage);
|
||||
ctx.getInventory().storeObject(objectMessage);
|
||||
// offer object to some random nodes so it gets distributed throughout the network:
|
||||
ctx.getNetworkHandler().offer(objectMessage.getInventoryVector());
|
||||
} catch (InsufficientProofOfWorkException e) {
|
||||
LOG.warn(e.getMessage());
|
||||
} catch (IOException e) {
|
||||
@ -199,12 +249,7 @@ public class Connection implements Runnable {
|
||||
}
|
||||
|
||||
public void disconnect() {
|
||||
try {
|
||||
state = DISCONNECTED;
|
||||
socket.close();
|
||||
} catch (IOException e) {
|
||||
LOG.debug(e.getMessage(), e);
|
||||
}
|
||||
state = DISCONNECTED;
|
||||
}
|
||||
|
||||
private void send(MessagePayload payload) {
|
||||
@ -236,5 +281,7 @@ public class Connection implements Runnable {
|
||||
return Objects.hash(node);
|
||||
}
|
||||
|
||||
public enum State {SERVER, CLIENT, ACTIVE, DISCONNECTED}
|
||||
public enum Mode {SERVER, CLIENT}
|
||||
|
||||
public enum State {CONNECTING, ACTIVE, DISCONNECTED}
|
||||
}
|
||||
|
@ -22,19 +22,23 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
|
||||
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
|
||||
import ch.dissem.bitmessage.ports.NetworkHandler;
|
||||
import ch.dissem.bitmessage.utils.Collections;
|
||||
import ch.dissem.bitmessage.utils.DebugUtils;
|
||||
import ch.dissem.bitmessage.utils.Property;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static ch.dissem.bitmessage.networking.Connection.State.*;
|
||||
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
|
||||
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
|
||||
import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE;
|
||||
import static ch.dissem.bitmessage.networking.Connection.State.DISCONNECTED;
|
||||
import static ch.dissem.bitmessage.utils.DebugUtils.inc;
|
||||
|
||||
/**
|
||||
* Handles all the networky stuff.
|
||||
@ -91,11 +95,7 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
|
||||
if (connections.size() < 8) {
|
||||
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(8 - connections.size(), ctx.getStreams());
|
||||
for (NetworkAddress address : addresses) {
|
||||
try {
|
||||
startConnection(new Connection(ctx, CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener));
|
||||
} catch (IOException e) {
|
||||
LOG.debug(e.getMessage(), e);
|
||||
}
|
||||
startConnection(new Connection(ctx, CLIENT, address, listener));
|
||||
}
|
||||
}
|
||||
try {
|
||||
@ -142,12 +142,55 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
|
||||
|
||||
@Override
|
||||
public void offer(final InventoryVector iv) {
|
||||
List<Connection> active = new LinkedList<>();
|
||||
synchronized (connections) {
|
||||
LOG.debug(connections.size() + " connections available to offer " + iv);
|
||||
List<Connection> random8 = Collections.selectRandom(8, this.connections);
|
||||
for (Connection connection : random8) {
|
||||
connection.offer(iv);
|
||||
for (Connection connection : connections) {
|
||||
if (connection.getState() == ACTIVE) {
|
||||
active.add(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.debug(active.size() + " connections available to offer " + iv);
|
||||
List<Connection> random8 = Collections.selectRandom(8, active);
|
||||
for (Connection connection : random8) {
|
||||
connection.offer(iv);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Property getNetworkStatus() {
|
||||
TreeSet<Long> streams = new TreeSet<>();
|
||||
TreeMap<Long, Integer> incomingConnections = new TreeMap<>();
|
||||
TreeMap<Long, Integer> outgoingConnections = new TreeMap<>();
|
||||
|
||||
synchronized (connections) {
|
||||
for (Connection connection : connections) {
|
||||
if (connection.getState() == ACTIVE) {
|
||||
long stream = connection.getNode().getStream();
|
||||
streams.add(stream);
|
||||
if (connection.getMode() == SERVER) {
|
||||
inc(incomingConnections, stream);
|
||||
} else {
|
||||
inc(outgoingConnections, stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Property[] streamProperties = new Property[streams.size()];
|
||||
int i = 0;
|
||||
for (Long stream : streams) {
|
||||
int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0;
|
||||
int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0;
|
||||
streamProperties[i] = new Property("stream " + stream,
|
||||
null, new Property("nodes", incoming + outgoing),
|
||||
new Property("incoming", incoming),
|
||||
new Property("outgoing", outgoing)
|
||||
);
|
||||
i++;
|
||||
}
|
||||
return new Property("network", null,
|
||||
new Property("connectionManager", connectionManager.isAlive() ? "running" : "stopped"),
|
||||
new Property("connections", null, streamProperties)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user