Migrated networking and fixed networking tests

This commit is contained in:
2017-06-25 20:06:17 +02:00
parent 894e0ff724
commit 322bddcc4f
49 changed files with 2081 additions and 2936 deletions

View File

@ -1,343 +0,0 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES;
import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.*;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/**
* Contains everything used by both the old streams-oriented NetworkHandler and the new NioNetworkHandler,
* respectively their connection objects.
*/
public abstract class AbstractConnection {
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnection.class);
protected final InternalContext ctx;
protected final Mode mode;
protected final NetworkAddress host;
protected final NetworkAddress node;
protected final NetworkHandler.MessageListener listener;
protected final Map<InventoryVector, Long> ivCache;
protected final Deque<MessagePayload> sendingQueue;
protected final Map<InventoryVector, Long> commonRequestedObjects;
protected final Set<InventoryVector> requestedObjects;
protected volatile State state;
protected long lastObjectTime;
private final long syncTimeout;
private long syncReadTimeout = Long.MAX_VALUE;
protected long peerNonce;
protected int version;
protected long[] streams;
private boolean verackSent;
private boolean verackReceived;
public AbstractConnection(InternalContext context, Mode mode,
NetworkAddress node,
Map<InventoryVector, Long> commonRequestedObjects,
long syncTimeout) {
this.ctx = context;
this.mode = mode;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = node;
this.listener = context.getNetworkListener();
this.syncTimeout = (syncTimeout > 0 ? UnixTime.now() + syncTimeout : 0);
this.requestedObjects = Collections.newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000));
this.ivCache = new ConcurrentHashMap<>();
this.sendingQueue = new ConcurrentLinkedDeque<>();
this.state = CONNECTING;
this.commonRequestedObjects = commonRequestedObjects;
}
public Mode getMode() {
return mode;
}
public NetworkAddress getNode() {
return node;
}
public State getState() {
return state;
}
public long[] getStreams() {
return streams;
}
protected void handleMessage(MessagePayload payload) {
switch (state) {
case ACTIVE:
receiveMessage(payload);
break;
case DISCONNECTED:
break;
default:
handleCommand(payload);
break;
}
}
private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) {
case INV:
receiveMessage((Inv) messagePayload);
break;
case GETDATA:
receiveMessage((GetData) messagePayload);
break;
case OBJECT:
receiveMessage((ObjectMessage) messagePayload);
break;
case ADDR:
receiveMessage((Addr) messagePayload);
break;
case CUSTOM:
case VERACK:
case VERSION:
default:
throw new IllegalStateException("Unexpectedly received '" + messagePayload.getCommand() + "' command");
}
}
private void receiveMessage(Inv inv) {
int originalSize = inv.getInventory().size();
updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(commonRequestedObjects.keySet());
LOG.trace("Received inventory with " + originalSize + " elements, of which are "
+ missing.size() + " missing.");
send(new GetData(missing));
}
private void receiveMessage(GetData getData) {
for (InventoryVector iv : getData.getInventory()) {
ObjectMessage om = ctx.getInventory().getObject(iv);
if (om != null) sendingQueue.offer(om);
}
}
private void receiveMessage(ObjectMessage objectMessage) {
requestedObjects.remove(objectMessage.getInventoryVector());
if (ctx.getInventory().contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
return;
}
try {
listener.receive(objectMessage);
cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES);
ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network:
ctx.getNetworkHandler().offer(objectMessage.getInventoryVector());
lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) {
LOG.warn(e.getMessage());
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally {
if (commonRequestedObjects.remove(objectMessage.getInventoryVector()) == null) {
LOG.debug("Received object that wasn't requested.");
}
}
}
private void receiveMessage(Addr addr) {
LOG.trace("Received " + addr.getAddresses().size() + " addresses.");
ctx.getNodeRegistry().offerAddresses(addr.getAddresses());
}
private void updateIvCache(List<InventoryVector> inventory) {
cleanupIvCache();
Long now = UnixTime.now();
for (InventoryVector iv : inventory) {
ivCache.put(iv, now);
}
}
public void offer(InventoryVector iv) {
sendingQueue.offer(new Inv(Collections.singletonList(iv)));
updateIvCache(Collections.singletonList(iv));
}
public boolean knowsOf(InventoryVector iv) {
return ivCache.containsKey(iv);
}
public boolean requested(InventoryVector iv) {
return requestedObjects.contains(iv);
}
private void cleanupIvCache() {
long fiveMinutesAgo = UnixTime.now() - 5 * MINUTE;
for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) {
if (entry.getValue() < fiveMinutesAgo) {
ivCache.remove(entry.getKey());
}
}
}
private void handleCommand(MessagePayload payload) {
switch (payload.getCommand()) {
case VERSION:
handleVersion((Version) payload);
break;
case VERACK:
if (verackSent) {
activateConnection();
}
verackReceived = true;
break;
case CUSTOM:
MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload);
if (response == null) {
disconnect();
} else {
send(response);
}
break;
default:
throw new NodeException("Command 'version' or 'verack' expected, but was '"
+ payload.getCommand() + "'");
}
}
private void activateConnection() {
LOG.info("Successfully established connection with node " + node);
state = ACTIVE;
node.setTime(UnixTime.now());
if (mode != SYNC) {
sendAddresses();
ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node));
}
sendInventory();
}
private void sendAddresses() {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(1000, streams);
sendingQueue.offer(new Addr(addresses));
}
private void sendInventory() {
List<InventoryVector> inventory = ctx.getInventory().getInventory(streams);
for (int i = 0; i < inventory.size(); i += 50000) {
sendingQueue.offer(new Inv(inventory.subList(i, Math.min(inventory.size(), i + 50000))));
}
}
private void handleVersion(Version version) {
if (version.getNonce() == ctx.getClientNonce()) {
LOG.info("Tried to connect to self, disconnecting.");
disconnect();
} else if (version.getVersion() >= BitmessageContext.CURRENT_VERSION) {
this.peerNonce = version.getNonce();
if (peerNonce == ctx.getClientNonce()) disconnect();
this.version = version.getVersion();
this.streams = version.getStreams();
verackSent = true;
send(new VerAck());
if (mode == SERVER) {
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
}
if (verackReceived) {
activateConnection();
}
} else {
LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting.");
disconnect();
}
}
@SuppressWarnings("RedundantIfStatement")
protected boolean syncFinished(NetworkMessage msg) {
if (mode != SYNC) {
return false;
}
if (Thread.interrupted()) {
return true;
}
if (state != ACTIVE) {
return false;
}
if (syncTimeout < UnixTime.now()) {
LOG.info("Synchronization timed out");
return true;
}
if (!sendingQueue.isEmpty()) {
syncReadTimeout = System.currentTimeMillis() + 1000;
return false;
}
if (msg == null) {
return syncReadTimeout < System.currentTimeMillis();
} else {
syncReadTimeout = System.currentTimeMillis() + 1000;
return false;
}
}
public void disconnect() {
state = DISCONNECTED;
// Make sure objects that are still missing are requested from other nodes
ctx.getNetworkHandler().request(requestedObjects);
}
protected abstract void send(MessagePayload payload);
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbstractConnection that = (AbstractConnection) o;
return Objects.equals(node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(node);
}
public enum Mode {SERVER, CLIENT, SYNC}
public enum State {CONNECTING, ACTIVE, DISCONNECTED}
}

View File

@ -1,225 +0,0 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.UnixTime.MINUTE;
/**
* A connection to a specific node
*/
class Connection extends AbstractConnection {
public static final int READ_TIMEOUT = 2000;
private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
private static final int CONNECT_TIMEOUT = 5000;
private final long startTime;
private final Socket socket;
private final ReaderRunnable reader = new ReaderRunnable();
private final WriterRunnable writer = new WriterRunnable();
private InputStream in;
private OutputStream out;
private boolean socketInitialized;
public Connection(InternalContext context, Mode mode, Socket socket,
Map<InventoryVector, Long> requestedObjectsMap) throws IOException {
this(context, mode, socket, requestedObjectsMap,
new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(),
0);
}
public Connection(InternalContext context, Mode mode, NetworkAddress node,
Map<InventoryVector, Long> requestedObjectsMap) {
this(context, mode, new Socket(), requestedObjectsMap,
node, 0);
}
private Connection(InternalContext context, Mode mode, Socket socket,
Map<InventoryVector, Long> commonRequestedObjects, NetworkAddress node, long syncTimeout) {
super(context, mode, node, commonRequestedObjects, syncTimeout);
this.startTime = UnixTime.now();
this.socket = socket;
}
public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener,
long timeoutInSeconds) throws IOException {
return new Connection(ctx, SYNC, new Socket(address, port),
new HashMap<InventoryVector, Long>(),
new NetworkAddress.Builder().ip(address).port(port).stream(1).build(),
timeoutInSeconds);
}
public long getStartTime() {
return startTime;
}
public Mode getMode() {
return mode;
}
public State getState() {
return state;
}
public NetworkAddress getNode() {
return node;
}
@Override
protected void send(MessagePayload payload) {
try {
if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory());
}
synchronized (this) {
new NetworkMessage(payload).write(out);
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
disconnect();
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Connection that = (Connection) o;
return Objects.equals(node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(node);
}
private synchronized void initSocket(Socket socket) throws IOException {
if (!socketInitialized) {
if (!socket.isConnected()) {
LOG.trace("Trying to connect to node " + node);
socket.connect(new InetSocketAddress(node.toInetAddress(), node.getPort()), CONNECT_TIMEOUT);
}
socket.setSoTimeout(READ_TIMEOUT);
in = socket.getInputStream();
out = socket.getOutputStream();
socketInitialized = true;
}
}
public ReaderRunnable getReader() {
return reader;
}
public WriterRunnable getWriter() {
return writer;
}
public class ReaderRunnable implements Runnable {
@Override
public void run() {
try (Socket socket = Connection.this.socket) {
initSocket(socket);
if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
}
while (state != DISCONNECTED) {
if (mode != SYNC) {
if (state == ACTIVE && requestedObjects.isEmpty() && sendingQueue.isEmpty()) {
Thread.sleep(1000);
} else {
Thread.sleep(100);
}
}
receive();
}
} catch (Exception e) {
LOG.trace("Reader disconnected from node " + node + ": " + e.getMessage());
} finally {
disconnect();
try {
socket.close();
} catch (Exception e) {
LOG.debug(e.getMessage(), e);
}
}
}
private void receive() throws InterruptedException {
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
return;
handleMessage(msg.getPayload());
if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect();
} catch (SocketTimeoutException ignore) {
if (state == ACTIVE && syncFinished(null)) disconnect();
}
}
}
private boolean checkOpenRequests() {
return !requestedObjects.isEmpty() && lastObjectTime > 0 && (UnixTime.now() - lastObjectTime) > 2 * MINUTE;
}
public class WriterRunnable implements Runnable {
@Override
public void run() {
try (Socket socket = Connection.this.socket) {
initSocket(socket);
while (state != DISCONNECTED) {
if (sendingQueue.isEmpty()) {
Thread.sleep(1000);
} else {
send(sendingQueue.poll());
}
}
} catch (IOException | InterruptedException e) {
LOG.trace("Writer disconnected from node " + node + ": " + e.getMessage());
disconnect();
}
}
}
}

View File

@ -1,118 +0,0 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER;
/**
* @author Christian Basler
*/
@Deprecated
@SuppressWarnings("deprecation")
public class ConnectionOrganizer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class);
private final InternalContext ctx;
private final DefaultNetworkHandler networkHandler;
private Connection initialConnection;
public ConnectionOrganizer(InternalContext ctx,
DefaultNetworkHandler networkHandler) {
this.ctx = ctx;
this.networkHandler = networkHandler;
}
@Override
public void run() {
try {
while (networkHandler.isRunning()) {
try {
int active = 0;
long now = UnixTime.now();
int diff = networkHandler.connections.size() - ctx.getConnectionLimit();
if (diff > 0) {
for (Connection c : networkHandler.connections) {
c.disconnect();
diff--;
if (diff == 0) break;
}
}
boolean forcedDisconnect = false;
for (Iterator<Connection> iterator = networkHandler.connections.iterator(); iterator.hasNext(); ) {
Connection c = iterator.next();
// Just in case they were all created at the same time, don't disconnect
// all at once.
if (!forcedDisconnect && now - c.getStartTime() > ctx.getConnectionTTL()) {
c.disconnect();
forcedDisconnect = true;
}
switch (c.getState()) {
case DISCONNECTED:
iterator.remove();
break;
case ACTIVE:
active++;
break;
default:
// nothing to do
}
}
if (active < NETWORK_MAGIC_NUMBER) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) {
Connection c = new Connection(ctx, CLIENT, address, networkHandler.requestedObjects);
if (first) {
initialConnection = c;
first = false;
}
networkHandler.startConnection(c);
}
Thread.sleep(10000);
} else if (initialConnection == null) {
Thread.sleep(30000);
} else {
initialConnection.disconnect();
initialConnection = null;
Thread.sleep(10000);
}
} catch (InterruptedException e) {
networkHandler.stop();
} catch (Exception e) {
LOG.error("Error in connection manager. Ignored.", e);
}
}
} finally {
LOG.debug("Connection manager shutting down.");
networkHandler.stop();
}
}
}

View File

@ -1,249 +0,0 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.InternalContext.ContextHolder;
import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.Collections;
import ch.dissem.bitmessage.utils.Property;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.*;
import static ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.utils.DebugUtils.inc;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
/**
* Handles all the networky stuff.
*
* @deprecated use {@link ch.dissem.bitmessage.networking.nio.NioNetworkHandler NioNetworkHandler} instead.
*/
@Deprecated
public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
final Collection<Connection> connections = new ConcurrentLinkedQueue<>();
private final ExecutorService pool = Executors.newCachedThreadPool(
pool("network")
.lowPrio()
.daemon()
.build());
private InternalContext ctx;
private ServerRunnable server;
private volatile boolean running;
final Map<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(50_000);
@Override
public void setContext(InternalContext context) {
this.ctx = context;
}
@Override
public Future<?> synchronize(InetAddress server, int port, long timeoutInSeconds) {
try {
Connection connection = Connection.sync(ctx, server, port, ctx.getNetworkListener(), timeoutInSeconds);
Future<?> reader = pool.submit(connection.getReader());
pool.execute(connection.getWriter());
return reader;
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public CustomMessage send(InetAddress server, int port, CustomMessage request) {
try (Socket socket = new Socket(server, port)) {
socket.setSoTimeout(Connection.READ_TIMEOUT);
new NetworkMessage(request).write(socket.getOutputStream());
NetworkMessage networkMessage = Factory.getNetworkMessage(3, socket.getInputStream());
if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) {
return (CustomMessage) networkMessage.getPayload();
} else {
if (networkMessage == null) {
throw new NodeException("No response from node " + server);
} else {
throw new NodeException("Unexpected response from node " +
server + ": " + networkMessage.getPayload().getCommand());
}
}
} catch (IOException e) {
throw new NodeException(e.getMessage(), e);
}
}
@Override
public void start() {
if (running) {
throw new IllegalStateException("Network already running - you need to stop first.");
}
try {
running = true;
connections.clear();
server = new ServerRunnable(ctx, this);
pool.execute(server);
pool.execute(new ConnectionOrganizer(ctx, this));
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public boolean isRunning() {
return running;
}
@Override
public void stop() {
server.close();
synchronized (connections) {
running = false;
for (Connection c : connections) {
c.disconnect();
}
}
requestedObjects.clear();
}
void startConnection(Connection c) {
if (!running) return;
synchronized (connections) {
if (!running) return;
// prevent connecting twice to the same node
if (connections.contains(c)) {
return;
}
connections.add(c);
}
pool.execute(c.getReader());
pool.execute(c.getWriter());
}
@Override
public void offer(final InventoryVector iv) {
List<Connection> target = new LinkedList<>();
for (Connection connection : connections) {
if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
target.add(connection);
}
}
List<Connection> randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target);
for (Connection connection : randomSubset) {
connection.offer(iv);
}
}
@Override
public Property getNetworkStatus() {
TreeSet<Long> streams = new TreeSet<>();
TreeMap<Long, Integer> incomingConnections = new TreeMap<>();
TreeMap<Long, Integer> outgoingConnections = new TreeMap<>();
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
for (long stream : connection.getStreams()) {
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
}
}
}
Property[] streamProperties = new Property[streams.size()];
int i = 0;
for (Long stream : streams) {
int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0;
int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0;
streamProperties[i] = new Property("stream " + stream,
null, new Property("nodes", incoming + outgoing),
new Property("incoming", incoming),
new Property("outgoing", outgoing)
);
i++;
}
return new Property("network", null,
new Property("connectionManager", running ? "running" : "stopped"),
new Property("connections", streamProperties),
new Property("requestedObjects", requestedObjects.size())
);
}
@Override
public void request(Collection<InventoryVector> inventoryVectors) {
if (!running || inventoryVectors.isEmpty()) return;
Map<Connection, List<InventoryVector>> distribution = new HashMap<>();
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>());
}
}
Iterator<InventoryVector> iterator = inventoryVectors.iterator();
if (!iterator.hasNext()) {
return;
}
InventoryVector next = iterator.next();
Connection previous = null;
do {
for (Connection connection : distribution.keySet()) {
if (connection == previous) {
next = iterator.next();
}
if (connection.knowsOf(next)) {
List<InventoryVector> ivs = distribution.get(connection);
if (ivs.size() == GetData.MAX_INVENTORY_SIZE) {
connection.send(new GetData(ivs));
ivs.clear();
}
ivs.add(next);
iterator.remove();
if (iterator.hasNext()) {
next = iterator.next();
previous = connection;
} else {
break;
}
}
}
} while (iterator.hasNext());
for (Connection connection : distribution.keySet()) {
List<InventoryVector> ivs = distribution.get(connection);
if (!ivs.isEmpty()) {
connection.send(new GetData(ivs));
}
}
}
}

View File

@ -1,68 +0,0 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.ports.NetworkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
/**
* @author Christian Basler
*/
@Deprecated
public class ServerRunnable implements Runnable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class);
private final InternalContext ctx;
private final ServerSocket serverSocket;
private final DefaultNetworkHandler networkHandler;
public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler) throws IOException {
this.ctx = ctx;
this.networkHandler = networkHandler;
this.serverSocket = new ServerSocket(ctx.getPort());
}
@Override
public void run() {
while (!serverSocket.isClosed()) {
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(Connection.READ_TIMEOUT);
networkHandler.startConnection(new Connection(ctx, SERVER, socket, networkHandler.requestedObjects));
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
}
@Override
public void close() {
try {
serverSocket.close();
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
}

View File

@ -1,163 +0,0 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.V3MessageReader;
import ch.dissem.bitmessage.networking.AbstractConnection;
import ch.dissem.bitmessage.utils.UnixTime;
import java.nio.ByteBuffer;
import java.util.*;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
/**
* Represents the current state of a connection.
*/
public class ConnectionInfo extends AbstractConnection {
private final ByteBuffer headerOut = ByteBuffer.allocate(24);
private ByteBuffer payloadOut;
private V3MessageReader reader = new V3MessageReader();
private boolean syncFinished;
private long lastUpdate = System.currentTimeMillis();
public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node,
Map<InventoryVector, Long> commonRequestedObjects, long syncTimeout) {
super(context, mode, node, commonRequestedObjects, syncTimeout);
headerOut.flip();
if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults(ctx.getClientNonce()).addrFrom(host).addrRecv(node).build());
}
}
public State getState() {
return state;
}
public boolean knowsOf(InventoryVector iv) {
return ivCache.containsKey(iv);
}
public Queue<MessagePayload> getSendingQueue() {
return sendingQueue;
}
public ByteBuffer getInBuffer() {
if (reader == null) {
throw new NodeException("Node is disconnected");
}
return reader.getActiveBuffer();
}
public void updateWriter() {
if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) {
headerOut.clear();
MessagePayload payload = sendingQueue.poll();
payloadOut = new NetworkMessage(payload).writeHeaderAndGetPayloadBuffer(headerOut);
headerOut.flip();
lastUpdate = System.currentTimeMillis();
}
}
public ByteBuffer[] getOutBuffers() {
return new ByteBuffer[]{headerOut, payloadOut};
}
public void cleanupBuffers() {
if (payloadOut != null && !payloadOut.hasRemaining()) {
payloadOut = null;
}
}
public void updateReader() {
reader.update();
if (!reader.getMessages().isEmpty()) {
Iterator<NetworkMessage> iterator = reader.getMessages().iterator();
NetworkMessage msg = null;
while (iterator.hasNext()) {
msg = iterator.next();
handleMessage(msg.getPayload());
iterator.remove();
}
syncFinished = syncFinished(msg);
}
lastUpdate = System.currentTimeMillis();
}
public void updateSyncStatus() {
if (!syncFinished) {
syncFinished = (reader == null || reader.getMessages().isEmpty()) && syncFinished(null);
}
}
public boolean isExpired() {
switch (state) {
case CONNECTING:
// the TCP timeout starts out at 20 seconds
return lastUpdate < System.currentTimeMillis() - 20_000;
case ACTIVE:
// after verack messages are exchanged, the timeout is raised to 10 minutes
return lastUpdate < System.currentTimeMillis() - 600_000;
case DISCONNECTED:
return true;
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
@Override
public void disconnect() {
super.disconnect();
if (reader != null) {
reader.cleanup();
reader = null;
}
payloadOut = null;
}
public boolean isSyncFinished() {
return syncFinished;
}
@Override
protected void send(MessagePayload payload) {
sendingQueue.add(payload);
if (payload instanceof GetData) {
Long now = UnixTime.now();
List<InventoryVector> inventory = ((GetData) payload).getInventory();
requestedObjects.addAll(inventory);
for (InventoryVector iv : inventory) {
commonRequestedObjects.put(iv, now);
}
}
}
public boolean isWritePending() {
return !sendingQueue.isEmpty()
|| headerOut != null && headerOut.hasRemaining()
|| payloadOut != null && payloadOut.hasRemaining();
}
}

View File

@ -1,529 +0,0 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.V3MessageReader;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.DebugUtils;
import ch.dissem.bitmessage.utils.Property;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import static ch.dissem.bitmessage.constants.Network.HEADER_SIZE;
import static ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE;
import static ch.dissem.bitmessage.networking.AbstractConnection.State.DISCONNECTED;
import static ch.dissem.bitmessage.utils.Collections.selectRandom;
import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool;
import static java.nio.channels.SelectionKey.*;
/**
* Network handler using java.nio, resulting in less threads.
*/
public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder {
private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class);
private static final long REQUESTED_OBJECTS_MAX_TIME = 2 * 60_000; // 2 minutes
private static final Long DELAYED = Long.MIN_VALUE;
private final ExecutorService threadPool = Executors.newCachedThreadPool(
pool("network")
.lowPrio()
.daemon()
.build());
private InternalContext ctx;
private Selector selector;
private ServerSocketChannel serverChannel;
private Queue<NetworkAddress> connectionQueue = new ConcurrentLinkedQueue<>();
private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>();
private final Map<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(10_000);
private Thread starter;
@NotNull
@Override
public Future<Void> synchronize(@NotNull final InetAddress server, final int port, final long timeoutInSeconds) {
return threadPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SYNC,
new NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
new HashMap<InventoryVector, Long>(), timeoutInSeconds);
while (channel.isConnected() && !connection.isSyncFinished()) {
write(channel, connection);
read(channel, connection);
Thread.sleep(10);
}
LOG.info("Synchronization finished");
}
return null;
}
});
}
@NotNull
@Override
public CustomMessage send(@NotNull InetAddress server, int port, @NotNull CustomMessage request) {
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress(server, port))) {
channel.configureBlocking(true);
ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
ByteBuffer payloadBuffer = new NetworkMessage(request).writeHeaderAndGetPayloadBuffer(headerBuffer);
headerBuffer.flip();
while (headerBuffer.hasRemaining()) {
channel.write(headerBuffer);
}
while (payloadBuffer.hasRemaining()) {
channel.write(payloadBuffer);
}
V3MessageReader reader = new V3MessageReader();
while (channel.isConnected() && reader.getMessages().isEmpty()) {
if (channel.read(reader.getActiveBuffer()) > 0) {
reader.update();
} else {
throw new NodeException("No response from node " + server);
}
}
NetworkMessage networkMessage;
if (reader.getMessages().isEmpty()) {
throw new NodeException("No response from node " + server);
} else {
networkMessage = reader.getMessages().get(0);
}
if (networkMessage != null && networkMessage.getPayload() instanceof CustomMessage) {
return (CustomMessage) networkMessage.getPayload();
} else {
if (networkMessage == null) {
throw new NodeException("Empty response from node " + server);
} else {
throw new NodeException("Unexpected response from node " + server + ": "
+ networkMessage.getPayload().getClass());
}
}
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public void start() {
if (selector != null && selector.isOpen()) {
throw new IllegalStateException("Network already running - you need to stop first.");
}
try {
selector = Selector.open();
} catch (IOException e) {
throw new ApplicationException(e);
}
requestedObjects.clear();
starter = thread("connection manager", new Runnable() {
@Override
public void run() {
while (selector.isOpen()) {
int missing = NETWORK_MAGIC_NUMBER;
for (ConnectionInfo connectionInfo : connections.keySet()) {
if (connectionInfo.getState() == ACTIVE) {
missing--;
if (missing == 0) break;
}
}
if (missing > 0) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(100, ctx.getStreams());
addresses = selectRandom(missing, addresses);
for (NetworkAddress address : addresses) {
if (!isConnectedTo(address)) {
connectionQueue.offer(address);
}
}
}
Iterator<Map.Entry<ConnectionInfo, SelectionKey>> it = connections.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ConnectionInfo, SelectionKey> e = it.next();
if (!e.getValue().isValid() || e.getKey().isExpired()) {
try {
e.getValue().channel().close();
} catch (Exception ignore) {
}
e.getValue().cancel();
e.getValue().attach(null);
e.getKey().disconnect();
it.remove();
}
}
// The list 'requested objects' helps to prevent downloading an object
// twice. From time to time there is an error though, and an object is
// never downloaded. To prevent a large list of failed objects and give
// them a chance to get downloaded again, we will attempt to download an
// object from another node after some time out.
long timedOut = System.currentTimeMillis() - REQUESTED_OBJECTS_MAX_TIME;
List<InventoryVector> delayed = new LinkedList<>();
Iterator<Map.Entry<InventoryVector, Long>> iterator = requestedObjects.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<InventoryVector, Long> e = iterator.next();
//noinspection NumberEquality
if (e.getValue() == DELAYED) {
iterator.remove();
} else if (e.getValue() < timedOut) {
delayed.add(e.getKey());
e.setValue(DELAYED);
}
}
request(delayed);
try {
Thread.sleep(30_000);
} catch (InterruptedException e) {
return;
}
}
}
});
thread("selector worker", new Runnable() {
@Override
public void run() {
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(ctx.getPort()));
serverChannel.register(selector, OP_ACCEPT, null);
while (selector.isOpen()) {
selector.select(1000);
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.attachment() == null) {
try {
if (key.isAcceptable()) {
// handle accept
try {
SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept();
accepted.configureBlocking(false);
ConnectionInfo connection = new ConnectionInfo(ctx, SERVER,
new NetworkAddress.Builder()
.ip(accepted.socket().getInetAddress())
.port(accepted.socket().getPort())
.stream(1)
.build(),
requestedObjects, 0
);
connections.put(
connection,
accepted.register(selector, OP_READ | OP_WRITE, connection)
);
} catch (AsynchronousCloseException e) {
LOG.trace(e.getMessage());
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
} catch (CancelledKeyException e) {
LOG.debug(e.getMessage(), e);
}
} else {
// handle read/write
SocketChannel channel = (SocketChannel) key.channel();
ConnectionInfo connection = (ConnectionInfo) key.attachment();
try {
if (key.isConnectable()) {
if (!channel.finishConnect()) {
continue;
}
}
if (key.isWritable()) {
write(channel, connection);
}
if (key.isReadable()) {
read(channel, connection);
}
if (connection.getState() == DISCONNECTED) {
key.interestOps(0);
channel.close();
} else if (connection.isWritePending()) {
key.interestOps(OP_READ | OP_WRITE);
} else {
key.interestOps(OP_READ);
}
} catch (CancelledKeyException | NodeException | IOException e) {
connection.disconnect();
}
}
}
// set interest ops
for (Map.Entry<ConnectionInfo, SelectionKey> e : connections.entrySet()) {
try {
if (e.getValue().isValid()
&& (e.getValue().interestOps() & OP_WRITE) == 0
&& (e.getValue().interestOps() & OP_CONNECT) == 0
&& !e.getKey().getSendingQueue().isEmpty()) {
e.getValue().interestOps(OP_READ | OP_WRITE);
}
} catch (CancelledKeyException x) {
e.getKey().disconnect();
}
}
// start new connections
if (!connectionQueue.isEmpty()) {
NetworkAddress address = connectionQueue.poll();
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(address.toInetAddress(), address.getPort()));
ConnectionInfo connection = new ConnectionInfo(ctx, CLIENT,
address,
requestedObjects, 0
);
connections.put(
connection,
channel.register(selector, OP_CONNECT, connection)
);
} catch (NoRouteToHostException ignore) {
// We'll try to connect to many offline nodes, so
// this is expected to happen quite a lot.
} catch (AsynchronousCloseException e) {
// The exception is expected if the network is being
// shut down, as we actually do asynchronously close
// the connections.
if (isRunning()) {
LOG.error(e.getMessage(), e);
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
}
selector.close();
} catch (ClosedSelectorException ignore) {
} catch (IOException e) {
throw new ApplicationException(e);
}
}
});
}
private static void write(SocketChannel channel, ConnectionInfo connection)
throws IOException {
writeBuffer(connection.getOutBuffers(), channel);
connection.updateWriter();
writeBuffer(connection.getOutBuffers(), channel);
connection.cleanupBuffers();
}
private static void writeBuffer(ByteBuffer[] buffers, SocketChannel channel) throws IOException {
if (buffers[1] == null) {
if (buffers[0].hasRemaining()) {
channel.write(buffers[0]);
}
} else if (buffers[1].hasRemaining() || buffers[0].hasRemaining()) {
channel.write(buffers);
}
}
private static void read(SocketChannel channel, ConnectionInfo connection) throws IOException {
if (channel.read(connection.getInBuffer()) > 0) {
connection.updateReader();
}
connection.updateSyncStatus();
}
private Thread thread(String threadName, Runnable runnable) {
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
thread.start();
return thread;
}
@Override
public void stop() {
try {
serverChannel.socket().close();
selector.close();
for (SelectionKey selectionKey : connections.values()) {
selectionKey.channel().close();
}
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public void offer(@NotNull InventoryVector iv) {
List<ConnectionInfo> target = new LinkedList<>();
for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
target.add(connection);
}
}
List<ConnectionInfo> randomSubset = selectRandom(NETWORK_MAGIC_NUMBER, target);
for (ConnectionInfo connection : randomSubset) {
connection.offer(iv);
}
}
@Override
public void request(@NotNull Collection<InventoryVector> inventoryVectors) {
if (!isRunning()) {
requestedObjects.clear();
return;
}
Iterator<InventoryVector> iterator = inventoryVectors.iterator();
if (!iterator.hasNext()) {
return;
}
Map<ConnectionInfo, List<InventoryVector>> distribution = new HashMap<>();
for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>());
}
}
if (distribution.isEmpty()) {
return;
}
InventoryVector next = iterator.next();
ConnectionInfo previous = null;
do {
for (ConnectionInfo connection : distribution.keySet()) {
if (connection == previous || previous == null) {
if (iterator.hasNext()) {
previous = connection;
next = iterator.next();
} else {
break;
}
}
if (connection.knowsOf(next) && !connection.requested(next)) {
List<InventoryVector> ivs = distribution.get(connection);
if (ivs.size() == GetData.MAX_INVENTORY_SIZE) {
connection.send(new GetData(ivs));
ivs.clear();
}
ivs.add(next);
iterator.remove();
if (iterator.hasNext()) {
next = iterator.next();
previous = connection;
} else {
break;
}
}
}
} while (iterator.hasNext());
// remove objects nobody knows of
for (InventoryVector iv : inventoryVectors) {
requestedObjects.remove(iv);
}
for (ConnectionInfo connection : distribution.keySet()) {
List<InventoryVector> ivs = distribution.get(connection);
if (!ivs.isEmpty()) {
connection.send(new GetData(ivs));
}
}
}
@NotNull
@Override
public Property getNetworkStatus() {
TreeSet<Long> streams = new TreeSet<>();
TreeMap<Long, Integer> incomingConnections = new TreeMap<>();
TreeMap<Long, Integer> outgoingConnections = new TreeMap<>();
for (ConnectionInfo connection : connections.keySet()) {
if (connection.getState() == ACTIVE) {
for (long stream : connection.getStreams()) {
streams.add(stream);
if (connection.getMode() == SERVER) {
DebugUtils.inc(incomingConnections, stream);
} else {
DebugUtils.inc(outgoingConnections, stream);
}
}
}
}
Property[] streamProperties = new Property[streams.size()];
int i = 0;
for (Long stream : streams) {
int incoming = incomingConnections.containsKey(stream) ? incomingConnections.get(stream) : 0;
int outgoing = outgoingConnections.containsKey(stream) ? outgoingConnections.get(stream) : 0;
streamProperties[i] = new Property("stream " + stream,
null, new Property("nodes", incoming + outgoing),
new Property("incoming", incoming),
new Property("outgoing", outgoing)
);
i++;
}
return new Property("network", null,
new Property("connectionManager", isRunning() ? "running" : "stopped"),
new Property("connections", streamProperties),
new Property("requestedObjects", requestedObjects.size())
);
}
private boolean isConnectedTo(NetworkAddress address) {
for (ConnectionInfo c : connections.keySet()) {
if (c.getNode().equals(address)) {
return true;
}
}
return false;
}
@Override
public boolean isRunning() {
return selector != null && selector.isOpen() && starter.isAlive();
}
@Override
public void setContext(@NotNull InternalContext context) {
this.ctx = context;
}
}

View File

@ -0,0 +1,202 @@
/*
* Copyright 2017 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio
import ch.dissem.bitmessage.InternalContext
import ch.dissem.bitmessage.InternalContext.Companion.NETWORK_EXTRA_BYTES
import ch.dissem.bitmessage.InternalContext.Companion.NETWORK_NONCE_TRIALS_PER_BYTE
import ch.dissem.bitmessage.entity.*
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress
import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException
import ch.dissem.bitmessage.ports.NetworkHandler
import ch.dissem.bitmessage.utils.Singleton.cryptography
import ch.dissem.bitmessage.utils.UnixTime
import ch.dissem.bitmessage.utils.UnixTime.MINUTE
import org.slf4j.LoggerFactory
import java.io.IOException
import java.util.*
import java.util.concurrent.ConcurrentHashMap
/**
* Contains everything used by both the old streams-oriented NetworkHandler and the new NioNetworkHandler,
* respectively their connection objects.
*/
class Connection(
private val ctx: InternalContext,
val mode: Mode,
val node: NetworkAddress,
private val commonRequestedObjects: MutableMap<InventoryVector, Long>,
syncTimeout: Long
) {
private val requestedObjects: MutableSet<InventoryVector> = Collections.newSetFromMap(ConcurrentHashMap<InventoryVector, Boolean>(10000))
internal val io = ConnectionIO(mode, syncTimeout, commonRequestedObjects, requestedObjects, { state }, this::handleMessage)
private var initializer: NetworkConnectionInitializer? = NetworkConnectionInitializer(ctx, node, mode, io::send) { s ->
state = State.ACTIVE
streams = s
initializer = null
}
private val listener: NetworkHandler.MessageListener = ctx.networkListener
private val ivCache: MutableMap<InventoryVector, Long> = ConcurrentHashMap()
private var lastObjectTime: Long = 0
lateinit var streams: LongArray
protected set
@Volatile var state = State.CONNECTING
private set
val isSyncFinished
get() = io.isSyncFinished
val nothingToSend
get() = io.sendingQueue.isEmpty()
init {
initializer!!.start()
}
fun send(payload: MessagePayload) = io.send(payload)
protected fun handleMessage(payload: MessagePayload) {
when (state) {
State.CONNECTING -> initializer!!.handleCommand(payload)
State.ACTIVE -> receiveMessage(payload)
State.DISCONNECTED -> disconnect()
}
}
private fun receiveMessage(messagePayload: MessagePayload) {
when (messagePayload.command) {
MessagePayload.Command.INV -> receiveMessage(messagePayload as Inv)
MessagePayload.Command.GETDATA -> receiveMessage(messagePayload as GetData)
MessagePayload.Command.OBJECT -> receiveMessage(messagePayload as ObjectMessage)
MessagePayload.Command.ADDR -> receiveMessage(messagePayload as Addr)
else -> throw IllegalStateException("Unexpectedly received '${messagePayload.command}' command")
}
}
private fun receiveMessage(inv: Inv) {
val originalSize = inv.inventory.size
updateIvCache(inv.inventory)
val missing = ctx.inventory.getMissing(inv.inventory, *streams)
LOG.trace("Received inventory with $originalSize elements, of which are ${missing.size} missing.")
io.send(GetData(missing - commonRequestedObjects.keys))
}
private fun receiveMessage(getData: GetData) {
getData.inventory.forEach { iv -> ctx.inventory.getObject(iv)?.let { obj -> io.send(obj) } }
}
private fun receiveMessage(objectMessage: ObjectMessage) {
requestedObjects.remove(objectMessage.inventoryVector)
if (ctx.inventory.contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.inventoryVector + " - already in inventory")
return
}
try {
listener.receive(objectMessage)
cryptography().checkProofOfWork(objectMessage, NETWORK_NONCE_TRIALS_PER_BYTE, NETWORK_EXTRA_BYTES)
ctx.inventory.storeObject(objectMessage)
// offer object to some random nodes so it gets distributed throughout the network:
ctx.networkHandler.offer(objectMessage.inventoryVector)
lastObjectTime = UnixTime.now
} catch (e: InsufficientProofOfWorkException) {
LOG.warn(e.message)
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (e: IOException) {
LOG.error("Stream " + objectMessage.stream + ", object type " + objectMessage.type + ": " + e.message, e)
} finally {
if (commonRequestedObjects.remove(objectMessage.inventoryVector) == null) {
LOG.debug("Received object that wasn't requested.")
}
}
}
private fun receiveMessage(addr: Addr) {
LOG.trace("Received " + addr.addresses.size + " addresses.")
ctx.nodeRegistry.offerAddresses(addr.addresses)
}
private fun updateIvCache(inventory: List<InventoryVector>) {
cleanupIvCache()
val now = UnixTime.now
for (iv in inventory) {
ivCache.put(iv, now)
}
}
fun offer(iv: InventoryVector) {
io.send(Inv(listOf(iv)))
updateIvCache(listOf(iv))
}
fun knowsOf(iv: InventoryVector): Boolean {
return ivCache.containsKey(iv)
}
fun requested(iv: InventoryVector): Boolean {
return requestedObjects.contains(iv)
}
private fun cleanupIvCache() {
val fiveMinutesAgo = UnixTime.now - 5 * MINUTE
for ((key, value) in ivCache) {
if (value < fiveMinutesAgo) {
ivCache.remove(key)
}
}
}
// the TCP timeout starts out at 20 seconds
// after verack messages are exchanged, the timeout is raised to 10 minutes
fun isExpired(): Boolean = when (state) {
State.CONNECTING -> io.lastUpdate < System.currentTimeMillis() - 20000
State.ACTIVE -> io.lastUpdate < System.currentTimeMillis() - 600000
State.DISCONNECTED -> true
}
fun disconnect() {
state = State.DISCONNECTED
io.disconnect()
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is Connection) return false
return node == other.node
}
override fun hashCode(): Int {
return Objects.hash(node)
}
enum class State {
CONNECTING, ACTIVE, DISCONNECTED
}
enum class Mode {
SERVER, CLIENT, SYNC
}
companion object {
private val LOG = LoggerFactory.getLogger(Connection::class.java)
}
}

View File

@ -0,0 +1,158 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio
import ch.dissem.bitmessage.entity.GetData
import ch.dissem.bitmessage.entity.MessagePayload
import ch.dissem.bitmessage.entity.NetworkMessage
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
import ch.dissem.bitmessage.exception.NodeException
import ch.dissem.bitmessage.factory.V3MessageReader
import ch.dissem.bitmessage.utils.UnixTime
import org.slf4j.LoggerFactory
import java.nio.ByteBuffer
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
/**
* Represents the current state of a connection.
*/
class ConnectionIO(
private val mode: Connection.Mode,
syncTimeout: Long,
private val commonRequestedObjects: MutableMap<InventoryVector, Long>,
private val requestedObjects: MutableSet<InventoryVector>,
private val getState: () -> Connection.State,
private val handleMessage: (MessagePayload) -> Unit
) {
private val headerOut: ByteBuffer = ByteBuffer.allocate(24)
private var payloadOut: ByteBuffer? = null
private var reader: V3MessageReader? = V3MessageReader()
internal val sendingQueue: Deque<MessagePayload> = ConcurrentLinkedDeque<MessagePayload>()
internal var lastUpdate = System.currentTimeMillis()
private set
private val syncTimeout: Long = if (syncTimeout > 0) UnixTime.now + syncTimeout else 0
private var syncReadTimeout = java.lang.Long.MAX_VALUE
init {
headerOut.flip()
}
val inBuffer: ByteBuffer
get() = reader?.getActiveBuffer() ?: throw NodeException("Node is disconnected")
fun updateWriter() {
if (!headerOut.hasRemaining() && !sendingQueue.isEmpty()) {
headerOut.clear()
val payload = sendingQueue.poll()
payloadOut = NetworkMessage(payload).writeHeaderAndGetPayloadBuffer(headerOut)
headerOut.flip()
lastUpdate = System.currentTimeMillis()
}
}
val outBuffers: Array<ByteBuffer>
get() = payloadOut?.let { arrayOf(headerOut, it) } ?: arrayOf(headerOut)
fun cleanupBuffers() {
payloadOut?.let {
if (!it.hasRemaining()) payloadOut = null
}
}
fun updateReader() {
reader?.let { reader ->
reader.update()
if (!reader.getMessages().isEmpty()) {
val iterator = reader.getMessages().iterator()
var msg: NetworkMessage? = null
while (iterator.hasNext()) {
msg = iterator.next()
handleMessage(msg.payload)
iterator.remove()
}
isSyncFinished = syncFinished(msg)
}
lastUpdate = System.currentTimeMillis()
}
}
fun updateSyncStatus() {
if (!isSyncFinished) {
isSyncFinished = reader?.getMessages()?.isEmpty() ?: true && syncFinished(null)
}
}
protected fun syncFinished(msg: NetworkMessage?): Boolean {
if (mode != Connection.Mode.SYNC) {
return false
}
if (Thread.interrupted() || getState() == Connection.State.DISCONNECTED) {
return true
}
if (getState() == Connection.State.CONNECTING) {
return false
}
if (syncTimeout < UnixTime.now) {
LOG.info("Synchronization timed out")
return true
}
if (!nothingToSend()) {
syncReadTimeout = System.currentTimeMillis() + 1000
return false
}
if (msg == null) {
return syncReadTimeout < System.currentTimeMillis()
} else {
syncReadTimeout = System.currentTimeMillis() + 1000
return false
}
}
fun disconnect() {
reader?.let {
it.cleanup()
reader = null
}
payloadOut = null
}
fun send(payload: MessagePayload) {
sendingQueue.add(payload)
if (payload is GetData) {
val now = UnixTime.now
val inventory = payload.inventory
requestedObjects.addAll(inventory)
inventory.forEach { iv -> commonRequestedObjects.put(iv, now) }
}
}
var isSyncFinished = false
val isWritePending: Boolean
get() = !sendingQueue.isEmpty()
|| headerOut.hasRemaining()
|| payloadOut?.hasRemaining() ?: false
fun nothingToSend() = sendingQueue.isEmpty()
companion object {
val LOG = LoggerFactory.getLogger(ConnectionIO::class.java)
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright 2017 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio
import ch.dissem.bitmessage.BitmessageContext
import ch.dissem.bitmessage.InternalContext
import ch.dissem.bitmessage.entity.*
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress
import ch.dissem.bitmessage.exception.NodeException
import ch.dissem.bitmessage.utils.UnixTime
import org.slf4j.LoggerFactory
/**
* Handles the initialization phase of connection and, due to their design, custom commands.
*/
class NetworkConnectionInitializer(
private val ctx: InternalContext,
val node: NetworkAddress,
val mode: Connection.Mode,
val send: (MessagePayload) -> Unit,
val markActive: (LongArray) -> Unit
) {
private lateinit var version: Version
private var verackSent: Boolean = false
private var verackReceived: Boolean = false
fun start() {
if (mode == Connection.Mode.CLIENT || mode == Connection.Mode.SYNC) {
send(Version(nonce = ctx.clientNonce, addrFrom = NetworkAddress.ANY, addrRecv = node))
}
}
fun handleCommand(payload: MessagePayload) {
when (payload.command) {
MessagePayload.Command.VERSION -> handleVersion(payload as Version)
MessagePayload.Command.VERACK -> {
if (verackSent) {
activateConnection()
}
verackReceived = true
}
MessagePayload.Command.CUSTOM -> {
ctx.customCommandHandler.handle(payload as CustomMessage)?.let { response ->
send(response)
} ?: throw NodeException("No response for custom command available")
}
else -> throw NodeException("Command 'version' or 'verack' expected, but was '${payload.command}'")
}
}
private fun handleVersion(version: Version) {
if (version.nonce == ctx.clientNonce) {
throw NodeException("Tried to connect to self, disconnecting.")
} else if (version.version >= BitmessageContext.CURRENT_VERSION) {
this.version = version
verackSent = true
send(VerAck())
if (mode == Connection.Mode.SERVER) {
send(Version.Builder().defaults(ctx.clientNonce).addrFrom(NetworkAddress.ANY).addrRecv(node).build())
}
if (verackReceived) {
activateConnection()
}
} else {
throw NodeException("Received unsupported version " + version.version + ", disconnecting.")
}
}
private fun activateConnection() {
LOG.info("Successfully established connection with node " + node)
markActive(version.streams)
node.time = UnixTime.now
if (mode != Connection.Mode.SYNC) {
sendAddresses()
ctx.nodeRegistry.offerAddresses(listOf(node))
}
sendInventory()
}
private fun sendAddresses() {
val addresses = ctx.nodeRegistry.getKnownAddresses(1000, *version.streams)
send(Addr(addresses))
}
private fun sendInventory() {
val inventory = ctx.inventory.getInventory(*version.streams)
var i = 0
while (i < inventory.size) {
send(Inv(inventory.subList(i, Math.min(inventory.size, i + 50000))))
i += 50000
}
}
companion object {
val LOG = LoggerFactory.getLogger(NetworkConnectionInitializer::class.java)!!
}
}

View File

@ -0,0 +1,476 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking.nio
import ch.dissem.bitmessage.InternalContext
import ch.dissem.bitmessage.constants.Network.HEADER_SIZE
import ch.dissem.bitmessage.constants.Network.NETWORK_MAGIC_NUMBER
import ch.dissem.bitmessage.entity.CustomMessage
import ch.dissem.bitmessage.entity.GetData
import ch.dissem.bitmessage.entity.NetworkMessage
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress
import ch.dissem.bitmessage.exception.NodeException
import ch.dissem.bitmessage.factory.V3MessageReader
import ch.dissem.bitmessage.networking.nio.Connection.Mode.*
import ch.dissem.bitmessage.ports.NetworkHandler
import ch.dissem.bitmessage.utils.Collections.selectRandom
import ch.dissem.bitmessage.utils.DebugUtils
import ch.dissem.bitmessage.utils.Property
import ch.dissem.bitmessage.utils.ThreadFactoryBuilder.Companion.pool
import ch.dissem.bitmessage.utils.UnixTime.now
import org.slf4j.LoggerFactory
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.NoRouteToHostException
import java.nio.ByteBuffer
import java.nio.channels.*
import java.nio.channels.SelectionKey.*
import java.util.*
import java.util.concurrent.*
/**
* Network handler using java.nio, resulting in less threads.
*/
class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
private val threadPool = Executors.newCachedThreadPool(
pool("network")
.lowPrio()
.daemon()
.build())
private lateinit var ctx: InternalContext
private var selector: Selector? = null
private var serverChannel: ServerSocketChannel? = null
private val connectionQueue = ConcurrentLinkedQueue<NetworkAddress>()
private val connections = ConcurrentHashMap<Connection, SelectionKey>()
private val requestedObjects = ConcurrentHashMap<InventoryVector, Long>(10000)
private var starter: Thread? = null
override fun setContext(context: InternalContext) {
ctx = context
}
override fun synchronize(server: InetAddress, port: Int, timeoutInSeconds: Long): Future<Void> {
return threadPool.submit(Callable<Void> {
SocketChannel.open(InetSocketAddress(server, port)).use { channel ->
channel.configureBlocking(false)
val connection = Connection(ctx, SYNC,
NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
HashMap<InventoryVector, Long>(), timeoutInSeconds)
while (channel.isConnected && !connection.isSyncFinished) {
write(channel, connection.io)
read(channel, connection.io)
Thread.sleep(10)
}
LOG.info("Synchronization finished")
}
null
})
}
override fun send(server: InetAddress, port: Int, request: CustomMessage): CustomMessage {
SocketChannel.open(InetSocketAddress(server, port)).use { channel ->
channel.configureBlocking(true)
val headerBuffer = ByteBuffer.allocate(HEADER_SIZE)
val payloadBuffer = NetworkMessage(request).writeHeaderAndGetPayloadBuffer(headerBuffer)
headerBuffer.flip()
while (headerBuffer.hasRemaining()) {
channel.write(headerBuffer)
}
while (payloadBuffer.hasRemaining()) {
channel.write(payloadBuffer)
}
val reader = V3MessageReader()
while (channel.isConnected && reader.getMessages().isEmpty()) {
if (channel.read(reader.getActiveBuffer()) > 0) {
reader.update()
} else {
throw NodeException("No response from node $server")
}
}
val networkMessage: NetworkMessage?
if (reader.getMessages().isEmpty()) {
throw NodeException("No response from node " + server)
} else {
networkMessage = reader.getMessages().first()
}
if (networkMessage.payload is CustomMessage) {
return networkMessage.payload as CustomMessage
} else {
throw NodeException("Unexpected response from node $server: ${networkMessage.payload.javaClass}")
}
}
}
override fun start() {
if (selector?.isOpen ?: false) {
throw IllegalStateException("Network already running - you need to stop first.")
}
val selector = Selector.open()
this.selector = selector
requestedObjects.clear()
starter = thread("connection manager") {
while (selector.isOpen) {
var missing = NETWORK_MAGIC_NUMBER
for (connection in connections.keys) {
if (connection.state == Connection.State.ACTIVE) {
missing--
if (missing == 0) break
}
}
if (missing > 0) {
var addresses = ctx.nodeRegistry.getKnownAddresses(100, *ctx.streams)
addresses = selectRandom(missing, addresses)
for (address in addresses) {
if (!isConnectedTo(address)) {
connectionQueue.offer(address)
}
}
}
val it = connections.entries.iterator()
while (it.hasNext()) {
val e = it.next()
if (!e.value.isValid || e.key.isExpired()) {
try {
e.value.channel().close()
} catch (ignore: Exception) {
}
e.value.cancel()
e.value.attach(null)
e.key.disconnect()
it.remove()
}
}
// The list 'requested objects' helps to prevent downloading an object
// twice. From time to time there is an error though, and an object is
// never downloaded. To prevent a large list of failed objects and give
// them a chance to get downloaded again, we will attempt to download an
// object from another node after some time out.
val timedOut = System.currentTimeMillis() - REQUESTED_OBJECTS_MAX_TIME
val delayed = mutableListOf<InventoryVector>()
val iterator = requestedObjects.entries.iterator()
while (iterator.hasNext()) {
val e = iterator.next()
if (e.value == DELAYED) {
iterator.remove()
} else if (e.value < timedOut) {
delayed.add(e.key)
e.setValue(DELAYED)
}
}
request(delayed)
try {
Thread.sleep(30000)
} catch (e: InterruptedException) {
return@thread
}
}
}
thread("selector worker", {
try {
val serverChannel = ServerSocketChannel.open()
this.serverChannel = serverChannel
serverChannel.configureBlocking(false)
serverChannel.socket().bind(InetSocketAddress(ctx.port))
serverChannel.register(selector, OP_ACCEPT, null)
while (selector.isOpen) {
selector.select(1000)
val keyIterator = selector.selectedKeys().iterator()
while (keyIterator.hasNext()) {
val key = keyIterator.next()
keyIterator.remove()
if (key.attachment() == null) {
try {
if (key.isAcceptable) {
// handle accept
try {
val accepted = (key.channel() as ServerSocketChannel).accept()
accepted.configureBlocking(false)
val connection = Connection(ctx, SERVER,
NetworkAddress(
time = now,
stream = 1L,
socket = accepted.socket()!!
),
requestedObjects, 0
)
connections.put(
connection,
accepted.register(selector, OP_READ or OP_WRITE, connection)
)
} catch (e: AsynchronousCloseException) {
LOG.trace(e.message)
} catch (e: IOException) {
LOG.error(e.message, e)
}
}
} catch (e: CancelledKeyException) {
LOG.debug(e.message, e)
}
} else {
// handle read/write
val channel = key.channel() as SocketChannel
val connection = key.attachment() as Connection
try {
if (key.isConnectable) {
if (!channel.finishConnect()) {
continue
}
}
if (key.isWritable) {
write(channel, connection.io)
}
if (key.isReadable) {
read(channel, connection.io)
}
if (connection.state == Connection.State.DISCONNECTED) {
key.interestOps(0)
channel.close()
} else if (connection.io.isWritePending) {
key.interestOps(OP_READ or OP_WRITE)
} else {
key.interestOps(OP_READ)
}
} catch (e: CancelledKeyException) {
connection.disconnect()
} catch (e: NodeException) {
connection.disconnect()
} catch (e: IOException) {
connection.disconnect()
}
}
}
// set interest ops
for ((connection, selectionKey) in connections) {
try {
if (selectionKey.isValid
&& selectionKey.interestOps() and OP_WRITE == 0
&& selectionKey.interestOps() and OP_CONNECT == 0
&& !connection.nothingToSend) {
selectionKey.interestOps(OP_READ or OP_WRITE)
}
} catch (x: CancelledKeyException) {
connection.disconnect()
}
}
// start new connections
if (!connectionQueue.isEmpty()) {
val address = connectionQueue.poll()
try {
val channel = SocketChannel.open()
channel.configureBlocking(false)
channel.connect(InetSocketAddress(address.toInetAddress(), address.port))
val connection = Connection(ctx, CLIENT, address, requestedObjects, 0)
connections.put(
connection,
channel.register(selector, OP_CONNECT, connection)
)
} catch (ignore: NoRouteToHostException) {
// We'll try to connect to many offline nodes, so
// this is expected to happen quite a lot.
} catch (e: AsynchronousCloseException) {
// The exception is expected if the network is being
// shut down, as we actually do asynchronously close
// the connections.
if (isRunning) {
LOG.error(e.message, e)
}
} catch (e: IOException) {
LOG.error(e.message, e)
}
}
}
selector.close()
} catch (_: ClosedSelectorException) {
}
})
}
private fun thread(threadName: String, runnable: () -> Unit): Thread {
val thread = Thread(runnable, threadName)
thread.isDaemon = true
thread.priority = Thread.MIN_PRIORITY
thread.start()
return thread
}
override fun stop() {
serverChannel?.socket()?.close()
selector?.close()
for (selectionKey in connections.values) {
selectionKey.channel().close()
}
}
override fun offer(iv: InventoryVector) {
val targetConnections = connections.keys.filter { it.state == Connection.State.ACTIVE && !it.knowsOf(iv) }
selectRandom(NETWORK_MAGIC_NUMBER, targetConnections).forEach { it.offer(iv) }
}
override fun request(inventoryVectors: MutableCollection<InventoryVector>) {
if (!isRunning) {
requestedObjects.clear()
return
}
val iterator = inventoryVectors.iterator()
if (!iterator.hasNext()) {
return
}
val distribution = HashMap<Connection, MutableList<InventoryVector>>()
for (connection in connections.keys) {
if (connection.state == Connection.State.ACTIVE) {
distribution.put(connection, mutableListOf<InventoryVector>())
}
}
if (distribution.isEmpty()) {
return
}
var next = iterator.next()
var previous: Connection? = null
do {
for (connection in distribution.keys) {
if (connection === previous || previous == null) {
if (iterator.hasNext()) {
previous = connection
next = iterator.next()
} else {
break
}
}
if (connection.knowsOf(next) && !connection.requested(next)) {
val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection")
if (ivs.size == GetData.MAX_INVENTORY_SIZE) {
connection.send(GetData(ivs))
ivs.clear()
}
ivs.add(next)
iterator.remove()
if (iterator.hasNext()) {
next = iterator.next()
previous = connection
} else {
break
}
}
}
} while (iterator.hasNext())
// remove objects nobody knows of
for (iv in inventoryVectors) {
requestedObjects.remove(iv)
}
for (connection in distribution.keys) {
val ivs = distribution[connection] ?: throw IllegalStateException("distribution not available for $connection")
if (!ivs.isEmpty()) {
connection.send(GetData(ivs))
}
}
}
override fun getNetworkStatus(): Property {
val streams = TreeSet<Long>()
val incomingConnections = TreeMap<Long, Int>()
val outgoingConnections = TreeMap<Long, Int>()
for (connection in connections.keys) {
if (connection.state == Connection.State.ACTIVE) {
for (stream in connection.streams) {
streams.add(stream)
if (connection.mode == SERVER) {
DebugUtils.inc(incomingConnections, stream)
} else {
DebugUtils.inc(outgoingConnections, stream)
}
}
}
}
val streamProperties = mutableListOf<Property>()
for (stream in streams) {
val incoming = incomingConnections[stream] ?: 0
val outgoing = outgoingConnections[stream] ?: 0
streamProperties.add(Property("stream " + stream, Property("nodes", incoming + outgoing),
Property("incoming", incoming),
Property("outgoing", outgoing)
))
}
return Property("network",
Property("connectionManager", if (isRunning) "running" else "stopped"),
Property("connections", *streamProperties.toTypedArray()),
Property("requestedObjects", requestedObjects.size)
)
}
private fun isConnectedTo(address: NetworkAddress): Boolean {
for (c in connections.keys) {
if (c.node == address) {
return true
}
}
return false
}
override val isRunning: Boolean
get() = selector?.isOpen ?: false && starter?.isAlive ?: false
companion object {
private val LOG = LoggerFactory.getLogger(NioNetworkHandler::class.java)
private val REQUESTED_OBJECTS_MAX_TIME = (2 * 60000).toLong() // 2 minutes in ms
private val DELAYED = java.lang.Long.MIN_VALUE
private fun write(channel: SocketChannel, connection: ConnectionIO) {
writeBuffer(connection.outBuffers, channel)
connection.updateWriter()
writeBuffer(connection.outBuffers, channel)
connection.cleanupBuffers()
}
private fun writeBuffer(buffers: Array<ByteBuffer>, channel: SocketChannel) {
if (buffers.any { buf -> buf.hasRemaining() }) channel.write(buffers)
}
private fun read(channel: SocketChannel, connection: ConnectionIO) {
if (channel.read(connection.inBuffer) > 0) {
connection.updateReader()
}
connection.updateSyncStatus()
}
}
}

View File

@ -1,279 +0,0 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.BitmessageContext;
import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography;
import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.MessagePayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.networking.nio.NioNetworkHandler;
import ch.dissem.bitmessage.ports.*;
import ch.dissem.bitmessage.testutils.TestInventory;
import ch.dissem.bitmessage.utils.Property;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import static ch.dissem.bitmessage.utils.Singleton.cryptography;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
/**
* Tests network handlers. This test is parametrized, so it can test both the nio and classic implementation
* as well as their combinations. It might be slightly over the top and will most probably be cleaned up once
* the nio implementation is deemed stable.
*/
@RunWith(Parameterized.class)
public class NetworkHandlerTest {
private static final Logger LOG = LoggerFactory.getLogger(NetworkHandlerTest.class);
private static NetworkAddress peerAddress = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build();
private TestInventory peerInventory;
private TestInventory nodeInventory;
private BitmessageContext peer;
private BitmessageContext node;
private final NetworkHandler peerNetworkHandler;
private final NetworkHandler nodeNetworkHandler;
@Rule
public final TestRule timeout = new DisableOnDebug(Timeout.seconds(60));
public NetworkHandlerTest(NetworkHandler peer, NetworkHandler node) {
this.peerNetworkHandler = peer;
this.nodeNetworkHandler = node;
}
@Parameterized.Parameters
@SuppressWarnings("deprecation")
public static List<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{new DefaultNetworkHandler(), new DefaultNetworkHandler()},
{new DefaultNetworkHandler(), new NioNetworkHandler()},
{new NioNetworkHandler(), new DefaultNetworkHandler()},
{new NioNetworkHandler(), new NioNetworkHandler()}
});
}
@Before
public void setUp() throws InterruptedException {
peerInventory = new TestInventory();
peer = new BitmessageContext.Builder()
.addressRepo(mock(AddressRepository.class))
.inventory(peerInventory)
.messageRepo(mock(MessageRepository.class))
.powRepo(mock(ProofOfWorkRepository.class))
.port(peerAddress.getPort())
.nodeRegistry(new TestNodeRegistry())
.networkHandler(peerNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.customCommandHandler(new CustomCommandHandler() {
@Override
public MessagePayload handle(CustomMessage request) {
byte[] data = request.getData();
if (data.length > 0) {
switch (data[0]) {
case 0:
return null;
case 1:
break;
case 3:
data[0] = 0;
break;
default:
break;
}
}
return new CustomMessage("test response", request.getData());
}
})
.build();
peer.startup();
Thread.sleep(100);
nodeInventory = new TestInventory();
node = new BitmessageContext.Builder()
.addressRepo(mock(AddressRepository.class))
.inventory(nodeInventory)
.messageRepo(mock(MessageRepository.class))
.powRepo(mock(ProofOfWorkRepository.class))
.port(6002)
.nodeRegistry(new TestNodeRegistry(peerAddress))
.networkHandler(nodeNetworkHandler)
.cryptography(new BouncyCryptography())
.listener(mock(BitmessageContext.Listener.class))
.build();
}
@After
public void cleanUp() {
shutdown(peer);
shutdown(node);
shutdown(nodeNetworkHandler);
}
private static void shutdown(BitmessageContext ctx) {
if (!ctx.isRunning()) return;
ctx.shutdown();
do {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
} while (ctx.isRunning());
}
private static void shutdown(NetworkHandler networkHandler) {
if (!networkHandler.isRunning()) return;
networkHandler.stop();
do {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
if (networkHandler.isRunning()) {
LOG.warn("Thread interrupted while waiting for network shutdown - " +
"this could cause problems in subsequent tests.");
}
return;
}
} while (networkHandler.isRunning());
}
private Property waitForNetworkStatus(BitmessageContext ctx) throws InterruptedException {
Property status;
do {
Thread.sleep(100);
status = ctx.status().getProperty("network", "connections", "stream 1");
} while (status == null);
return status;
}
@Test
public void ensureNodesAreConnecting() throws Exception {
node.startup();
Property nodeStatus = waitForNetworkStatus(node);
Property peerStatus = waitForNetworkStatus(peer);
assertEquals(1, nodeStatus.getProperty("outgoing").getValue());
assertEquals(1, peerStatus.getProperty("incoming").getValue());
}
@Test
public void ensureCustomMessageIsSentAndResponseRetrieved() throws Exception {
byte[] data = cryptography().randomBytes(8);
data[0] = (byte) 1;
CustomMessage request = new CustomMessage("test request", data);
node.startup();
CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
assertThat(response, notNullValue());
assertThat(response.getCustomCommand(), is("test response"));
assertThat(response.getData(), is(data));
}
@Test(expected = NodeException.class)
public void ensureCustomMessageWithoutResponseYieldsException() throws Exception {
byte[] data = cryptography().randomBytes(8);
data[0] = (byte) 0;
CustomMessage request = new CustomMessage("test request", data);
CustomMessage response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.getPort(), request);
assertThat(response, notNullValue());
assertThat(response.getCustomCommand(), is("test response"));
assertThat(response.getData(), is(request.getData()));
}
@Test
public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception {
peerInventory.init(
"V4Pubkey.payload",
"V5Broadcast.payload"
);
nodeInventory.init(
"V1Msg.payload",
"V4Pubkey.payload"
);
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
future.get();
assertInventorySize(3, nodeInventory);
assertInventorySize(3, peerInventory);
}
@Test
public void ensureObjectsAreSynchronizedIfOnlyPeerHasObjects() throws Exception {
peerInventory.init(
"V4Pubkey.payload",
"V5Broadcast.payload"
);
nodeInventory.init();
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
future.get();
assertInventorySize(2, nodeInventory);
assertInventorySize(2, peerInventory);
}
@Test
public void ensureObjectsAreSynchronizedIfOnlyNodeHasObjects() throws Exception {
peerInventory.init();
nodeInventory.init(
"V1Msg.payload"
);
Future<?> future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.getPort(), 10);
future.get();
assertInventorySize(1, nodeInventory);
assertInventorySize(1, peerInventory);
}
private void assertInventorySize(int expected, TestInventory inventory) throws InterruptedException {
long timeout = System.currentTimeMillis() + 1000;
while (expected != inventory.getInventory().size() && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
}
assertEquals(expected, inventory.getInventory().size());
}
}

View File

@ -0,0 +1,266 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking
import ch.dissem.bitmessage.BitmessageContext
import ch.dissem.bitmessage.cryptography.bc.BouncyCryptography
import ch.dissem.bitmessage.entity.CustomMessage
import ch.dissem.bitmessage.entity.MessagePayload
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress
import ch.dissem.bitmessage.exception.NodeException
import ch.dissem.bitmessage.networking.nio.NioNetworkHandler
import ch.dissem.bitmessage.ports.*
import ch.dissem.bitmessage.testutils.TestInventory
import ch.dissem.bitmessage.utils.Property
import ch.dissem.bitmessage.utils.Singleton.cryptography
import com.nhaarman.mockito_kotlin.mock
import org.hamcrest.Matchers.`is`
import org.hamcrest.Matchers.notNullValue
import org.junit.After
import org.junit.Assert.*
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.DisableOnDebug
import org.junit.rules.TestRule
import org.junit.rules.Timeout
import org.slf4j.LoggerFactory
/**
* Tests network handlers. This test is parametrized, so it can test both the nio and classic implementation
* as well as their combinations. It might be slightly over the top and will most probably be cleaned up once
* the nio implementation is deemed stable.
*/
class NetworkHandlerTest {
private lateinit var peerInventory: TestInventory
private lateinit var nodeInventory: TestInventory
private lateinit var peer: BitmessageContext
private lateinit var node: BitmessageContext
private lateinit var peerNetworkHandler: NetworkHandler
private lateinit var nodeNetworkHandler: NetworkHandler
@JvmField @Rule val timeout: TestRule = DisableOnDebug(Timeout.seconds(60))
@Before
fun setUp() {
peerInventory = TestInventory()
peerNetworkHandler = NioNetworkHandler()
peer = BitmessageContext(
cryptography = BouncyCryptography(),
inventory = peerInventory,
nodeRegistry = TestNodeRegistry(),
networkHandler = peerNetworkHandler,
addressRepository = mock<AddressRepository>(),
messageRepository = mock<MessageRepository>(),
proofOfWorkRepository = mock<ProofOfWorkRepository>(),
customCommandHandler = object : CustomCommandHandler {
override fun handle(request: CustomMessage): MessagePayload? {
val data = request.getData()
if (data.isNotEmpty()) {
when (data[0]) {
0.toByte() -> return null
1.toByte() -> {
}
3.toByte() -> data[0] = 0
}
}
return CustomMessage("test response", request.getData())
}
},
listener = mock<BitmessageContext.Listener>(),
port = peerAddress.port
)
peer.startup()
Thread.sleep(100)
nodeInventory = TestInventory()
nodeNetworkHandler = NioNetworkHandler()
node = BitmessageContext(
cryptography = BouncyCryptography(),
inventory = nodeInventory,
nodeRegistry = TestNodeRegistry(peerAddress),
networkHandler = nodeNetworkHandler,
addressRepository = mock<AddressRepository>(),
messageRepository = mock<MessageRepository>(),
proofOfWorkRepository = mock<ProofOfWorkRepository>(),
customCommandHandler = object : CustomCommandHandler {
override fun handle(request: CustomMessage): MessagePayload? {
val data = request.getData()
if (data.isNotEmpty()) {
when (data[0]) {
0.toByte() -> return null
1.toByte() -> {
}
3.toByte() -> data[0] = 0
}
}
return CustomMessage("test response", request.getData())
}
},
listener = mock<BitmessageContext.Listener>(),
port = 6002
)
}
@After
fun cleanUp() {
shutdown(peer)
shutdown(node)
shutdown(nodeNetworkHandler)
}
private fun waitForNetworkStatus(ctx: BitmessageContext): Property {
var status: Property?
do {
Thread.sleep(100)
status = ctx.status().getProperty("network", "connections", "stream 1")
} while (status == null)
return status
}
@Test
fun `ensure nodes are connecting`() {
node.startup()
val nodeStatus = waitForNetworkStatus(node)
val peerStatus = waitForNetworkStatus(peer)
assertEquals(1, nodeStatus.getProperty("outgoing")!!.value)
assertEquals(1, peerStatus.getProperty("incoming")!!.value)
}
@Test
fun `ensure CustomMessage is sent and response retrieved`() {
val data = cryptography().randomBytes(8)
data[0] = 1.toByte()
val request = CustomMessage("test request", data)
node.startup()
val response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.port, request)
assertThat(response, notNullValue())
assertThat(response.customCommand, `is`("test response"))
assertThat(response.getData(), `is`(data))
}
@Test(expected = NodeException::class)
fun `ensure CustomMessage without response yields exception`() {
val data = cryptography().randomBytes(8)
data[0] = 0.toByte()
val request = CustomMessage("test request", data)
val response = nodeNetworkHandler.send(peerAddress.toInetAddress(), peerAddress.port, request)
assertThat(response, notNullValue())
assertThat(response.customCommand, `is`("test response"))
assertThat(response.getData(), `is`(request.getData()))
}
@Test
fun `ensure objects are synchronized if both have objects`() {
peerInventory.init(
"V4Pubkey.payload",
"V5Broadcast.payload"
)
nodeInventory.init(
"V1Msg.payload",
"V4Pubkey.payload"
)
val future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.port, 10)
future.get()
assertInventorySize(3, nodeInventory)
assertInventorySize(3, peerInventory)
}
@Test
fun `ensure objects are synchronized if only peer has objects`() {
peerInventory.init(
"V4Pubkey.payload",
"V5Broadcast.payload"
)
nodeInventory.init()
val future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.port, 10)
future.get()
assertInventorySize(2, nodeInventory)
assertInventorySize(2, peerInventory)
}
@Test
fun `ensure objects are synchronized if only node has objects`() {
peerInventory.init()
nodeInventory.init(
"V1Msg.payload"
)
val future = nodeNetworkHandler.synchronize(peerAddress.toInetAddress(), peerAddress.port, 10)
future.get()
assertInventorySize(1, nodeInventory)
assertInventorySize(1, peerInventory)
}
private fun assertInventorySize(expected: Int, inventory: TestInventory) {
val timeout = System.currentTimeMillis() + 1000
while (expected != inventory.getInventory().size && System.currentTimeMillis() < timeout) {
Thread.sleep(10)
}
assertEquals(expected.toLong(), inventory.getInventory().size.toLong())
}
companion object {
private val LOG = LoggerFactory.getLogger(NetworkHandlerTest::class.java)
private val peerAddress = NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build()
private fun shutdown(ctx: BitmessageContext) {
if (!ctx.isRunning) return
ctx.shutdown()
do {
try {
Thread.sleep(100)
} catch (ignore: InterruptedException) {
}
} while (ctx.isRunning)
}
private fun shutdown(networkHandler: NetworkHandler) {
if (!networkHandler.isRunning) return
networkHandler.stop()
do {
try {
Thread.sleep(100)
} catch (ignore: InterruptedException) {
if (networkHandler.isRunning) {
LOG.warn("Thread interrupted while waiting for network shutdown - " + "this could cause problems in subsequent tests.")
}
return
}
} while (networkHandler.isRunning)
}
}
}