Networking code, untested

This commit is contained in:
Christian Basler 2015-04-06 10:01:03 +02:00
parent 0c4b39bdee
commit 3299d8ca4a
21 changed files with 573 additions and 151 deletions

View File

@ -8,5 +8,6 @@ repositories {
}
dependencies {
compile 'org.slf4j:slf4j-api:1.7.12'
testCompile group: 'junit', name: 'junit', version: '4.11'
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.ports.AddressRepository;
import ch.dissem.bitmessage.ports.Inventory;
import ch.dissem.bitmessage.ports.NetworkMessageReceiver;
import ch.dissem.bitmessage.ports.NetworkMessageSender;
/**
* Created by chris on 05.04.15.
*/
public class Context {
public static final int CURRENT_VERSION = 3;
private static Context instance;
private Inventory inventory;
private AddressRepository addressRepo;
private NetworkMessageSender sender;
private NetworkMessageReceiver receiver;
private Context(Inventory inventory, AddressRepository addressRepo,
NetworkMessageSender sender, NetworkMessageReceiver receiver) {
this.inventory = inventory;
this.addressRepo = addressRepo;
this.sender = sender;
this.receiver = receiver;
}
public static void init(Inventory inventory, AddressRepository addressRepository, NetworkMessageSender sender, NetworkMessageReceiver receiver) {
instance = new Context(inventory, addressRepository, sender, receiver);
}
public static Context getInstance() {
return instance;
}
public Inventory getInventory() {
return inventory;
}
public AddressRepository getAddressRepository() {
return addressRepo;
}
}

View File

@ -22,12 +22,13 @@ import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* The 'addr' command holds a list of known active Bitmessage nodes.
*/
public class Addr implements Command {
public class Addr implements MessagePayload {
private final List<NetworkAddress> addresses;
private Addr(Builder builder) {
@ -35,8 +36,12 @@ public class Addr implements Command {
}
@Override
public String getCommand() {
return "addr";
public Command getCommand() {
return Command.ADDR;
}
public List<NetworkAddress> getAddresses() {
return addresses;
}
@Override
@ -53,6 +58,11 @@ public class Addr implements Command {
public Builder() {
}
public Builder addresses(Collection<NetworkAddress> addresses){
this.addresses.addAll(addresses);
return this;
}
public Builder addAddress(final NetworkAddress address) {
this.addresses.add(address);
return this;

View File

@ -27,7 +27,7 @@ import java.util.List;
/**
* The 'getdata' command is used to request objects from a node.
*/
public class GetData implements Command {
public class GetData implements MessagePayload {
List<InventoryVector> inventory;
private GetData(Builder builder) {
@ -35,8 +35,12 @@ public class GetData implements Command {
}
@Override
public String getCommand() {
return "getdata";
public Command getCommand() {
return Command.GETDATA;
}
public List<InventoryVector> getInventory() {
return inventory;
}
@Override

View File

@ -27,16 +27,20 @@ import java.util.List;
/**
* The 'inv' command holds up to 50000 inventory vectors, i.e. hashes of inventory items.
*/
public class Inv implements Command {
public class Inv implements MessagePayload {
private List<InventoryVector> inventory;
private Inv(Builder builder) {
inventory = builder.inventory;
}
public List<InventoryVector> getInventory() {
return inventory;
}
@Override
public String getCommand() {
return "inv";
public Command getCommand() {
return Command.INV;
}
@Override

View File

@ -19,6 +19,10 @@ package ch.dissem.bitmessage.entity;
/**
* A command can hold a network message payload
*/
public interface Command extends Streamable {
String getCommand();
public interface MessagePayload extends Streamable {
Command getCommand();
enum Command {
VERSION, VERACK, ADDR, INV, GETDATA, OBJECT
}
}

View File

@ -16,7 +16,6 @@
package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.utils.Encode;
import java.io.ByteArrayOutputStream;
@ -39,12 +38,9 @@ public class NetworkMessage implements Streamable {
public final static int MAGIC = 0xE9BEB4D9;
public final static byte[] MAGIC_BYTES = ByteBuffer.allocate(4).putInt(MAGIC).array();
private final NetworkAddress targetNode;
private final MessagePayload payload;
private final Command payload;
public NetworkMessage(NetworkAddress target, Command payload) {
this.targetNode = target;
public NetworkMessage(MessagePayload payload) {
this.payload = payload;
}
@ -59,22 +55,19 @@ public class NetworkMessage implements Streamable {
/**
* The actual data, a message or an object. Not to be confused with objectPayload.
*/
public Command getPayload() {
public MessagePayload getPayload() {
return payload;
}
public NetworkAddress getTargetNode() {
return targetNode;
}
@Override
public void write(OutputStream stream) throws IOException {
// magic
Encode.int32(MAGIC, stream);
// ASCII string identifying the packet content, NULL padded (non-NULL padding results in packet rejected)
stream.write(payload.getCommand().getBytes("ASCII"));
for (int i = payload.getCommand().length(); i < 12; i++) {
String command = payload.getCommand().name().toLowerCase();
stream.write(command.getBytes("ASCII"));
for (int i = command.length(); i < 12; i++) {
stream.write('\0');
}

View File

@ -17,6 +17,7 @@
package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException;
@ -25,7 +26,7 @@ import java.io.OutputStream;
/**
* The 'object' command sends an object that is shared throughout the network.
*/
public class ObjectMessage implements Command {
public class ObjectMessage implements MessagePayload {
private long nonce;
private long expiresTime;
private long objectType;
@ -47,8 +48,17 @@ public class ObjectMessage implements Command {
}
@Override
public String getCommand() {
return "object";
public Command getCommand() {
return Command.OBJECT;
}
public ObjectPayload getPayload() {
return payload;
}
public InventoryVector getInventoryVector() {
// TODO
return null;
}
@Override

View File

@ -22,10 +22,10 @@ import java.io.OutputStream;
/**
* The 'verack' command answers a 'version' command, accepting the other node's version.
*/
public class VerAck implements Command {
public class VerAck implements MessagePayload {
@Override
public String getCommand() {
return "verack";
public Command getCommand() {
return Command.VERACK;
}
@Override

View File

@ -16,6 +16,7 @@
package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.Context;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.utils.Encode;
@ -26,8 +27,7 @@ import java.util.Random;
/**
* The 'version' command advertises this node's latest supported protocol version upon initiation.
*/
public class Version implements Command {
public static final int CURRENT = 3;
public class Version implements MessagePayload {
/**
* Identifies protocol version being used by the node. Should equal 3. Nodes should disconnect if the remote node's
* version is lower but continue with the connection if it is higher.
@ -71,6 +71,17 @@ public class Version implements Command {
*/
private final long[] streamNumbers;
private Version(Builder builder) {
version = builder.version;
services = builder.services;
timestamp = builder.timestamp;
addrRecv = builder.addrRecv;
addrFrom = builder.addrFrom;
nonce = builder.nonce;
userAgent = builder.userAgent;
streamNumbers = builder.streamNumbers;
}
public int getVersion() {
return version;
}
@ -99,24 +110,13 @@ public class Version implements Command {
return userAgent;
}
public long[] getStreamNumbers() {
public long[] getStreams() {
return streamNumbers;
}
private Version(Builder builder) {
version = builder.version;
services = builder.services;
timestamp = builder.timestamp;
addrRecv = builder.addrRecv;
addrFrom = builder.addrFrom;
nonce = builder.nonce;
userAgent = builder.userAgent;
streamNumbers = builder.streamNumbers;
}
@Override
public String getCommand() {
return "version";
public Command getCommand() {
return Command.VERSION;
}
@Override
@ -146,7 +146,7 @@ public class Version implements Command {
}
public Builder defaults() {
version = CURRENT;
version = Context.CURRENT;
services = 1;
timestamp = System.currentTimeMillis() / 1000;
nonce = new Random().nextInt();

View File

@ -40,7 +40,7 @@ class V3MessageFactory {
byte[] payloadBytes = Decode.bytes(stream, length);
if (testChecksum(checksum, payloadBytes)) {
Command payload = getPayload(command, new ByteArrayInputStream(payloadBytes), length);
MessagePayload payload = getPayload(command, new ByteArrayInputStream(payloadBytes), length);
return new NetworkMessage(payload);
} else {
throw new IOException("Checksum failed for message '" + command + "'");
@ -49,7 +49,7 @@ class V3MessageFactory {
return null;
}
private Command getPayload(String command, InputStream stream, int length) throws IOException {
private MessagePayload getPayload(String command, InputStream stream, int length) throws IOException {
switch (command) {
case "version":
return parseVersion(stream);

View File

@ -0,0 +1,30 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import java.util.List;
/**
* Stores and provides known peers.
*/
public interface AddressRepository {
List<NetworkAddress> getKnownAddresses(int limit, long... streams);
void offerAddresses(List<NetworkAddress> addresses);
}

View File

@ -16,6 +16,7 @@
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
@ -25,13 +26,13 @@ import java.util.List;
* The Inventory stores and retrieves objects, cleans up outdated objects and can tell which objects are still missing.
*/
public interface Inventory {
public List<InventoryVector> getInventory();
public List<InventoryVector> getInventory(long... streams);
public List<InventoryVector> getMissing(List<InventoryVector> offer);
public ObjectPayload getObject(InventoryVector vector);
public ObjectMessage getObject(InventoryVector vector);
public void storeObject(InventoryVector vector, ObjectPayload object);
public void storeObject(ObjectMessage object);
public void cleanup();
}

View File

@ -17,6 +17,7 @@
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import java.io.IOException;
@ -25,11 +26,11 @@ import java.io.IOException;
* Handles incoming messages
*/
public interface NetworkMessageReceiver {
public void registerListener(int port) throws IOException;
void registerListener(int port, MessageListener listener) throws IOException;
public void registerListener(NetworkAddress node, MessageListener listener) throws IOException;
void registerListener(NetworkAddress node, MessageListener listener) throws IOException;
public static interface MessageListener {
public void receive(NetworkMessage message);
interface MessageListener {
void receive(ObjectPayload payload);
}
}

View File

@ -16,12 +16,12 @@
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
/**
* Sends messages
*/
public interface NetworkMessageSender {
public void send(NetworkAddress node, NetworkMessage message);
void send(NetworkAddress node, ObjectPayload payload);
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
/**
* Indicates an illegal Bitmessage address
*/
public class AddressFormatException extends RuntimeException {
public AddressFormatException(String message) {
super(message);
}
}

View File

@ -1,11 +1,12 @@
/*
* Copyright 2011 Google Inc.
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,17 +17,151 @@
package ch.dissem.bitmessage.utils;
import java.io.UnsupportedEncodingException;
/**
* Base58 encoder and decoder
*/
public class Base58 {
private static char[] ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz".toCharArray();
public static String encode(byte[] input) {
return null; // TODO
private static final int[] INDEXES = new int[128];
static {
for (int i = 0; i < INDEXES.length; i++) {
INDEXES[i] = -1;
}
for (int i = 0; i < ALPHABET.length; i++) {
INDEXES[ALPHABET[i]] = i;
}
}
public static byte[] decode(String input) {
return null; // TODO
/**
* Encodes the given bytes in base58. No checksum is appended.
*/
public static String encode(byte[] input) {
if (input.length == 0) {
return "";
}
input = copyOfRange(input, 0, input.length);
// Count leading zeroes.
int zeroCount = 0;
while (zeroCount < input.length && input[zeroCount] == 0) {
++zeroCount;
}
// The actual encoding.
byte[] temp = new byte[input.length * 2];
int j = temp.length;
int startAt = zeroCount;
while (startAt < input.length) {
byte mod = divmod58(input, startAt);
if (input[startAt] == 0) {
++startAt;
}
temp[--j] = (byte) ALPHABET[mod];
}
// Strip extra '1' if there are some after decoding.
while (j < temp.length && temp[j] == ALPHABET[0]) {
++j;
}
// Add as many leading '1' as there were leading zeros.
while (--zeroCount >= 0) {
temp[--j] = (byte) ALPHABET[0];
}
byte[] output = copyOfRange(temp, j, temp.length);
try {
return new String(output, "US-ASCII");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e); // Cannot happen.
}
}
public static byte[] decode(String input) throws AddressFormatException {
if (input.length() == 0) {
return new byte[0];
}
byte[] input58 = new byte[input.length()];
// Transform the String to a base58 byte sequence
for (int i = 0; i < input.length(); ++i) {
char c = input.charAt(i);
int digit58 = -1;
if (c >= 0 && c < 128) {
digit58 = INDEXES[c];
}
if (digit58 < 0) {
throw new AddressFormatException("Illegal character " + c + " at " + i);
}
input58[i] = (byte) digit58;
}
// Count leading zeroes
int zeroCount = 0;
while (zeroCount < input58.length && input58[zeroCount] == 0) {
++zeroCount;
}
// The encoding
byte[] temp = new byte[input.length()];
int j = temp.length;
int startAt = zeroCount;
while (startAt < input58.length) {
byte mod = divmod256(input58, startAt);
if (input58[startAt] == 0) {
++startAt;
}
temp[--j] = mod;
}
// Do no add extra leading zeroes, move j to first non null byte.
while (j < temp.length && temp[j] == 0) {
++j;
}
return copyOfRange(temp, j - zeroCount, temp.length);
}
//
// number -> number / 58, returns number % 58
//
private static byte divmod58(byte[] number, int startAt) {
int remainder = 0;
for (int i = startAt; i < number.length; i++) {
int digit256 = (int) number[i] & 0xFF;
int temp = remainder * 256 + digit256;
number[i] = (byte) (temp / 58);
remainder = temp % 58;
}
return (byte) remainder;
}
//
// number -> number / 256, returns number % 256
//
private static byte divmod256(byte[] number58, int startAt) {
int remainder = 0;
for (int i = startAt; i < number58.length; i++) {
int digit58 = (int) number58[i] & 0xFF;
int temp = remainder * 58 + digit58;
number58[i] = (byte) (temp / 256);
remainder = temp % 256;
}
return (byte) remainder;
}
private static byte[] copyOfRange(byte[] source, int from, int to) {
byte[] range = new byte[to - from];
System.arraycopy(source, from, range, 0, range.length);
return range;
}
}

View File

@ -9,5 +9,7 @@ repositories {
dependencies {
compile ':domain'
compile 'com.google.guava:guava-concurrent:r03'
testCompile 'org.slf4j:slf4j-simple:1.7.12'
testCompile 'junit:junit:4.11'
}

View File

@ -0,0 +1,189 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.Context;
import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkMessageReceiver.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import static ch.dissem.bitmessage.networking.Connection.State.*;
/**
* A connection to a specific node
*/
public class Connection implements Runnable {
private final static Logger LOG = LoggerFactory.getLogger(Connection.class);
private Context ctx;
private State state;
private Socket socket;
private InputStream in;
private OutputStream out;
private MessageListener listener;
private int version;
private long[] streams;
private NetworkAddress host;
private NetworkAddress node;
private Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>();
public Connection(State state, Socket socket, MessageListener listener) throws IOException {
this.ctx = Context.getInstance();
this.state = state;
this.socket = socket;
this.in = socket.getInputStream();
this.out = socket.getOutputStream();
this.listener = listener;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).build();
}
@Override
public void run() {
if (state == CLIENT) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
}
while (state != DISCONNECTED) {
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
switch (state) {
case ACTIVE:
receiveMessage(msg.getPayload());
break;
default:
switch (msg.getPayload().getCommand()) {
case VERSION:
Version payload = (Version) msg.getPayload();
if (payload.getVersion() >= Context.CURRENT_VERSION) {
this.version = payload.getVersion();
this.streams = payload.getStreams();
send(new VerAck());
if (state == SERVER) {
state = ACTIVE;
}
} else {
disconnect();
}
break;
case VERACK:
if (state == CLIENT) {
sendAddresses();
sendInventory();
state = ACTIVE;
} else {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
}
break;
default:
throw new RuntimeException("Command 'version' or 'verack' expected, but was "
+ msg.getPayload().getCommand());
}
}
} catch (SocketTimeoutException e) {
if (state == ACTIVE) {
for (MessagePayload msg = sendingQueue.poll(); msg != null; msg = sendingQueue.poll()) {
send(msg);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) {
case INV:
Inv inv = (Inv) messagePayload;
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory());
send(new GetData.Builder().inventory(missing).build());
break;
case GETDATA:
GetData getData = (GetData) messagePayload;
for (InventoryVector iv : getData.getInventory()) {
ObjectMessage om = ctx.getInventory().getObject(iv);
sendingQueue.offer(om);
}
break;
case OBJECT:
ObjectMessage objectMessage = (ObjectMessage) messagePayload;
ctx.getInventory().storeObject(objectMessage);
listener.receive(objectMessage.getPayload());
break;
case ADDR:
Addr addr = (Addr) messagePayload;
ctx.getAddressRepository().offerAddresses(addr.getAddresses());
break;
case VERACK:
case VERSION:
throw new RuntimeException("Unexpectedly received '" + messagePayload.getCommand() + "' command");
}
}
private void sendAddresses() {
List<NetworkAddress> addresses = ctx.getAddressRepository().getKnownAddresses(1000, streams);
send(new Addr.Builder().addresses(addresses).build());
}
private void sendInventory() {
List<InventoryVector> inventory = ctx.getInventory().getInventory(streams);
for (int i = 0; i < inventory.size(); i += 50000) {
sendingQueue.offer(new Inv.Builder()
.inventory(inventory.subList(i, Math.min(inventory.size(), i + 50000)))
.build());
}
}
private void disconnect() {
try {
state = DISCONNECTED;
socket.close();
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
private void send(MessagePayload payload) {
try {
new NetworkMessage(payload).write(out);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
disconnect();
}
}
public enum State {SERVER, CLIENT, ACTIVE, DISCONNECTED}
}

View File

@ -22,69 +22,68 @@ import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkMessageReceiver;
import ch.dissem.bitmessage.ports.NetworkMessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import static ch.dissem.bitmessage.networking.Connection.State.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.State.SERVER;
/**
* Handles all the networky stuff.
*/
public class NetworkNode implements NetworkMessageSender, NetworkMessageReceiver {
private final BlockingQueue<NetworkMessage> sendingQueue = new LinkedBlockingQueue<>();
private final ExecutorService pool;
private final Map<NetworkAddress, Socket> sockets = new HashMap<>();
private final Map<NetworkAddress, Integer> versions = new HashMap<>();
private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class);
/**
* This is only to be used where it's ignored
*/
private final static NetworkAddress LOCALHOST = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build();
private final ExecutorService pool;
public NetworkNode() {
pool = Executors.newCachedThreadPool();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
NetworkMessage message = sendingQueue.take();
Socket socket = getSocket(message.getTargetNode());
message.write(socket.getOutputStream());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "Sender");
// TODO: sending
// Thread sender = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
// try {
// NetworkMessage message = sendingQueue.take();
//
// try (Socket socket = getSocket(message.getTargetNode())) {
// message.write(socket.getOutputStream());
// } catch (Exception e) {
// e.printStackTrace();
// }
// } catch (InterruptedException e) {
// // Ignore?
// }
// }
// }
// }, "Sender");
// sender.setDaemon(true);
// sender.start();
}
@Override
public void registerListener(final int port) throws IOException {
public void registerListener(final int port, final MessageListener listener) throws IOException {
final ServerSocket serverSocket = new ServerSocket(port);
pool.execute(new Runnable() {
@Override
public void run() {
NetworkAddress address = null;
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(20000);
// FIXME: addd to sockets
registerListener(getVersion(null), socket, new MessageListener() {
@Override
public void receive(NetworkMessage message) {
// TODO
}
});
pool.execute(new Connection(SERVER, socket, listener));
} catch (IOException e) {
e.printStackTrace();
LOG.debug(e.getMessage(), e);
}
}
});
@ -92,60 +91,11 @@ public class NetworkNode implements NetworkMessageSender, NetworkMessageReceiver
@Override
public void registerListener(final NetworkAddress node, final MessageListener listener) throws IOException {
final Socket socket = getSocket(node);
final int version = getVersion(node);
sendVersion(node);
pool.execute(new Runnable() {
@Override
public void run() {
try {
registerListener(version, socket, listener);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
private void sendVersion(NetworkAddress node) {
send(node, new NetworkMessage(node, new Version.Builder().defaults().addrFrom(LOCALHOST).addrRecv(node).build()));
}
private void registerListener(int version, Socket socket, MessageListener listener) throws IOException {
NetworkMessage message = Factory.getNetworkMessage(version, socket.getInputStream());
if (message.getPayload() instanceof Version) {
version = ((Version) message.getPayload()).getVersion();
synchronized (versions) {
versions.put(new NetworkAddress.Builder()
.ip(socket.getInetAddress())
.port(socket.getPort())
.build(), version);
}
}
listener.receive(message);
pool.execute(new Connection(CLIENT, new Socket(node.toInetAddress(), node.getPort()), listener));
}
@Override
public void send(final NetworkAddress node, final NetworkMessage message) {
sendingQueue.add(message);
}
private Socket getSocket(NetworkAddress node) throws IOException {
synchronized (sockets) {
Socket socket = sockets.get(node);
if (socket == null) {
socket = new Socket(node.toInetAddress(), node.getPort());
sockets.put(node, socket);
}
return socket;
}
}
private synchronized int getVersion(NetworkAddress node) {
synchronized (versions) {
Integer version = versions.get(node);
return version == null ? 3 : version;
}
// TODO: sendingQueue.add(message);
}
}

View File

@ -18,6 +18,7 @@ package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.Version;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.NetworkMessageReceiver;
import org.junit.Test;
@ -34,12 +35,12 @@ public class NetworkNodeTest {
NetworkNode net = new NetworkNode();
net.registerListener(localhost, new NetworkMessageReceiver.MessageListener() {
@Override
public void receive(NetworkMessage message) {
System.out.println(message);
public void receive(ObjectPayload payload) {
System.out.println(payload);
baseThread.interrupt();
}
});
NetworkMessage ver = new NetworkMessage(localhost,
NetworkMessage ver = new NetworkMessage(
new Version.Builder()
.version(3)
.services(1)
@ -49,7 +50,8 @@ public class NetworkNodeTest {
.nonce(-1)
.userAgent("Test")
.streams(1, 2)
.build());
.build()
);
net.send(localhost, ver);
Thread.sleep(20000);
}