Network code now works well enough for the server to think it successfully established a connection

This commit is contained in:
2015-04-07 18:48:58 +02:00
parent 3299d8ca4a
commit 35088ca033
27 changed files with 636 additions and 150 deletions

View File

@ -8,7 +8,7 @@ repositories {
}
dependencies {
compile ':domain'
compile project(':domain')
compile 'com.google.guava:guava-concurrent:r03'
testCompile 'org.slf4j:slf4j-simple:1.7.12'
testCompile 'junit:junit:4.11'

View File

@ -21,7 +21,7 @@ import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkMessageReceiver.MessageListener;
import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,6 +69,14 @@ public class Connection implements Runnable {
this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).build();
}
public State getState() {
return state;
}
public NetworkAddress getNode() {
return node;
}
@Override
public void run() {
if (state == CLIENT) {
@ -77,6 +85,8 @@ public class Connection implements Runnable {
while (state != DISCONNECTED) {
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
continue;
switch (state) {
case ACTIVE:
receiveMessage(msg.getPayload());
@ -90,20 +100,16 @@ public class Connection implements Runnable {
this.version = payload.getVersion();
this.streams = payload.getStreams();
send(new VerAck());
if (state == SERVER) {
state = ACTIVE;
}
state = ACTIVE;
sendAddresses();
sendInventory();
} else {
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
disconnect();
}
break;
case VERACK:
if (state == CLIENT) {
sendAddresses();
sendInventory();
state = ACTIVE;
} else {
if (state == SERVER) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
}
break;
@ -155,7 +161,7 @@ public class Connection implements Runnable {
private void sendAddresses() {
List<NetworkAddress> addresses = ctx.getAddressRepository().getKnownAddresses(1000, streams);
send(new Addr.Builder().addresses(addresses).build());
sendingQueue.offer(new Addr.Builder().addresses(addresses).build());
}
private void sendInventory() {

View File

@ -16,86 +16,113 @@
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.Context;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkMessageReceiver;
import ch.dissem.bitmessage.ports.NetworkMessageSender;
import ch.dissem.bitmessage.ports.NetworkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static ch.dissem.bitmessage.networking.Connection.State.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.State.SERVER;
import static ch.dissem.bitmessage.networking.Connection.State.*;
/**
* Handles all the networky stuff.
*/
public class NetworkNode implements NetworkMessageSender, NetworkMessageReceiver {
public class NetworkNode implements NetworkHandler {
private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class);
/**
* This is only to be used where it's ignored
*/
private final static NetworkAddress LOCALHOST = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build();
private final ExecutorService pool;
private final List<Connection> connections = new LinkedList<>();
private MessageListener listener;
public NetworkNode() {
pool = Executors.newCachedThreadPool();
// TODO: sending
// Thread sender = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
// try {
// NetworkMessage message = sendingQueue.take();
//
// try (Socket socket = getSocket(message.getTargetNode())) {
// message.write(socket.getOutputStream());
// } catch (Exception e) {
// e.printStackTrace();
// }
// } catch (InterruptedException e) {
// // Ignore?
// }
// }
// }
// }, "Sender");
// sender.setDaemon(true);
// sender.start();
}
@Override
public void registerListener(final int port, final MessageListener listener) throws IOException {
final ServerSocket serverSocket = new ServerSocket(port);
pool.execute(new Runnable() {
@Override
public void run() {
NetworkAddress address = null;
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(20000);
pool.execute(new Connection(SERVER, socket, listener));
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
public void setListener(final MessageListener listener) {
if (this.listener != null) {
throw new IllegalStateException("Listener can only be set once");
}
this.listener = listener;
}
@Override
public void start() {
final Context ctx = Context.getInstance();
if (listener == null) {
throw new IllegalStateException("Listener must be set at start");
}
try {
final ServerSocket serverSocket = new ServerSocket(Context.getInstance().getPort());
pool.execute(new Runnable() {
@Override
public void run() {
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(20000);
startConnection(new Connection(SERVER, socket, listener));
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
}
});
});
Thread connectionManager = new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.interrupted()) {
synchronized (connections) {
for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) {
Connection c = iterator.next();
if (c.getState() == DISCONNECTED) {
// Remove the current element from the iterator and the list.
iterator.remove();
}
}
}
if (connections.size() < 1) {
List<NetworkAddress> addresses = ctx.getAddressRepository().getKnownAddresses(8, ctx.getStreams());
for (NetworkAddress address : addresses) {
try {
startConnection(new Connection(CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener));
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
// FIXME: prevent connecting twice to the same node
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
LOG.debug(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}
}, "connection-manager");
connectionManager.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void startConnection(Connection c) {
synchronized (connections) {
connections.add(c);
}
pool.execute(c);
}
@Override
public void registerListener(final NetworkAddress node, final MessageListener listener) throws IOException {
pool.execute(new Connection(CLIENT, new Socket(node.toInetAddress(), node.getPort()), listener));
}
@Override
public void send(final NetworkAddress node, final NetworkMessage message) {
public void send(final ObjectPayload payload) {
// TODO: sendingQueue.add(message);
}
}

View File

@ -20,7 +20,7 @@ import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.NetworkMessageReceiver;
import ch.dissem.bitmessage.ports.NetworkHandler;
import org.junit.Test;
/**
@ -33,13 +33,13 @@ public class NetworkNodeTest {
public void testSendMessage() throws Exception {
final Thread baseThread = Thread.currentThread();
NetworkNode net = new NetworkNode();
net.registerListener(localhost, new NetworkMessageReceiver.MessageListener() {
@Override
public void receive(ObjectPayload payload) {
System.out.println(payload);
baseThread.interrupt();
}
});
// net.setListener(localhost, new NetworkHandler.MessageListener() {
// @Override
// public void receive(ObjectPayload payload) {
// System.out.println(payload);
// baseThread.interrupt();
// }
// });
NetworkMessage ver = new NetworkMessage(
new Version.Builder()
.version(3)
@ -52,7 +52,7 @@ public class NetworkNodeTest {
.streams(1, 2)
.build()
);
net.send(localhost, ver);
// net.send(localhost, ver);
Thread.sleep(20000);
}
}