Code cleanup

This commit is contained in:
2016-02-28 23:03:00 +01:00
parent 57057298a1
commit 3e5e431d6f
6 changed files with 313 additions and 261 deletions

View File

@ -203,51 +203,16 @@ class Connection {
private void receiveMessage(MessagePayload messagePayload) {
switch (messagePayload.getCommand()) {
case INV:
Inv inv = (Inv) messagePayload;
int originalSize = inv.getInventory().size();
updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(commonRequestedObjects);
LOG.debug("Received inventory with " + originalSize + " elements, of which are "
+ missing.size() + " missing.");
send(new GetData.Builder().inventory(missing).build());
receiveMessage((Inv) messagePayload);
break;
case GETDATA:
GetData getData = (GetData) messagePayload;
for (InventoryVector iv : getData.getInventory()) {
ObjectMessage om = ctx.getInventory().getObject(iv);
if (om != null) sendingQueue.offer(om);
}
receiveMessage((GetData) messagePayload);
break;
case OBJECT:
ObjectMessage objectMessage = (ObjectMessage) messagePayload;
try {
requestedObjects.remove(objectMessage.getInventoryVector());
if (ctx.getInventory().contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
break;
}
listener.receive(objectMessage);
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network:
networkHandler.offer(objectMessage.getInventoryVector());
lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) {
LOG.warn(e.getMessage());
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally {
if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) {
LOG.debug("Received object that wasn't requested.");
}
}
receiveMessage((ObjectMessage) messagePayload);
break;
case ADDR:
Addr addr = (Addr) messagePayload;
LOG.debug("Received " + addr.getAddresses().size() + " addresses.");
ctx.getNodeRegistry().offerAddresses(addr.getAddresses());
receiveMessage((Addr) messagePayload);
break;
case CUSTOM:
case VERACK:
@ -257,6 +222,53 @@ class Connection {
}
}
private void receiveMessage(Inv inv) {
int originalSize = inv.getInventory().size();
updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(commonRequestedObjects);
LOG.debug("Received inventory with " + originalSize + " elements, of which are "
+ missing.size() + " missing.");
send(new GetData.Builder().inventory(missing).build());
}
private void receiveMessage(GetData getData) {
for (InventoryVector iv : getData.getInventory()) {
ObjectMessage om = ctx.getInventory().getObject(iv);
if (om != null) sendingQueue.offer(om);
}
}
private void receiveMessage(ObjectMessage objectMessage) {
requestedObjects.remove(objectMessage.getInventoryVector());
if (ctx.getInventory().contains(objectMessage)) {
LOG.trace("Received object " + objectMessage.getInventoryVector() + " - already in inventory");
return;
}
try {
listener.receive(objectMessage);
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network:
networkHandler.offer(objectMessage.getInventoryVector());
lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) {
LOG.warn(e.getMessage());
// DebugUtils.saveToFile(objectMessage); // this line must not be committed active
} catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
} finally {
if (commonRequestedObjects.remove(objectMessage.getInventoryVector())) {
LOG.debug("Received object that wasn't requested.");
}
}
}
private void receiveMessage(Addr addr) {
LOG.debug("Received " + addr.getAddresses().size() + " addresses.");
ctx.getNodeRegistry().offerAddresses(addr.getAddresses());
}
private void sendAddresses() {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(1000, streams);
sendingQueue.offer(new Addr.Builder().addresses(addresses).build());
@ -358,77 +370,10 @@ class Connection {
Thread.sleep(100);
}
}
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
continue;
switch (state) {
case ACTIVE:
receiveMessage(msg.getPayload());
break;
default:
switch (msg.getPayload().getCommand()) {
case VERSION:
Version payload = (Version) msg.getPayload();
if (payload.getNonce() == ctx.getClientNonce()) {
LOG.info("Tried to connect to self, disconnecting.");
disconnect();
} else if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) {
version = payload.getVersion();
streams = payload.getStreams();
send(new VerAck());
switch (mode) {
case SERVER:
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
break;
case CLIENT:
case SYNC:
activateConnection();
break;
default:
// NO OP
}
} else {
LOG.info("Received unsupported version " + payload.getVersion() + ", disconnecting.");
disconnect();
}
break;
case VERACK:
switch (mode) {
case SERVER:
activateConnection();
break;
case CLIENT:
case SYNC:
default:
// NO OP
break;
}
break;
case CUSTOM:
MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) msg.getPayload());
if (response != null) {
send(response);
}
disconnect();
break;
default:
throw new NodeException("Command 'version' or 'verack' expected, but was '"
+ msg.getPayload().getCommand() + "'");
}
}
if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect();
} catch (SocketTimeoutException ignore) {
if (state == ACTIVE) {
if (syncFinished(null)) disconnect();
}
}
receive();
}
} catch (InterruptedException | IOException | NodeException e) {
} catch (Exception e) {
LOG.trace("Reader disconnected from node " + node + ": " + e.getMessage());
} catch (RuntimeException e) {
LOG.trace("Reader disconnecting from node " + node + " due to error: " + e.getMessage(), e);
} finally {
disconnect();
try {
@ -438,6 +383,81 @@ class Connection {
}
}
}
private void receive() throws InterruptedException {
try {
NetworkMessage msg = Factory.getNetworkMessage(version, in);
if (msg == null)
return;
switch (state) {
case ACTIVE:
receiveMessage(msg.getPayload());
break;
default:
handleCommand(msg.getPayload());
break;
}
if (socket.isClosed() || syncFinished(msg) || checkOpenRequests()) disconnect();
} catch (SocketTimeoutException ignore) {
if (state == ACTIVE && syncFinished(null)) disconnect();
}
}
private void handleCommand(MessagePayload payload) {
switch (payload.getCommand()) {
case VERSION:
handleVersion((Version) payload);
break;
case VERACK:
switch (mode) {
case SERVER:
activateConnection();
break;
case CLIENT:
case SYNC:
default:
// NO OP
break;
}
break;
case CUSTOM:
MessagePayload response = ctx.getCustomCommandHandler().handle((CustomMessage) payload);
if (response != null) {
send(response);
}
disconnect();
break;
default:
throw new NodeException("Command 'version' or 'verack' expected, but was '"
+ payload.getCommand() + "'");
}
}
private void handleVersion(Version version) {
if (version.getNonce() == ctx.getClientNonce()) {
LOG.info("Tried to connect to self, disconnecting.");
disconnect();
} else if (version.getVersion() >= BitmessageContext.CURRENT_VERSION) {
Connection.this.version = version.getVersion();
streams = version.getStreams();
send(new VerAck());
switch (mode) {
case SERVER:
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
break;
case CLIENT:
case SYNC:
activateConnection();
break;
default:
// NO OP
}
} else {
LOG.info("Received unsupported version " + version.getVersion() + ", disconnecting.");
disconnect();
}
}
}
private boolean checkOpenRequests() {
@ -450,10 +470,10 @@ class Connection {
try (Socket socket = Connection.this.socket) {
initSocket(socket);
while (state != DISCONNECTED) {
if (!sendingQueue.isEmpty()) {
send(sendingQueue.poll());
} else {
if (sendingQueue.isEmpty()) {
Thread.sleep(1000);
} else {
send(sendingQueue.poll());
}
}
} catch (IOException | InterruptedException e) {