Code cleanup & improvements

- most notably removed some unnecessary synchronize blocks in the DefaultNetworkHandler
This commit is contained in:
2016-02-26 14:34:08 +01:00
parent 382cb80a87
commit bc68a5d3ec
19 changed files with 286 additions and 217 deletions

View File

@ -21,7 +21,6 @@ import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.Factory;

View File

@ -0,0 +1,120 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.DefaultNetworkHandler.NETWORK_MAGIC_NUMBER;
/**
* @author Christian Basler
*/
public class ConnectionOrganizer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class);
private final InternalContext ctx;
private final DefaultNetworkHandler networkHandler;
private final NetworkHandler.MessageListener listener;
private Connection initialConnection;
public ConnectionOrganizer(InternalContext ctx,
DefaultNetworkHandler networkHandler,
NetworkHandler.MessageListener listener) {
this.ctx = ctx;
this.networkHandler = networkHandler;
this.listener = listener;
}
@Override
public void run() {
try {
while (networkHandler.isRunning()) {
try {
int active = 0;
long now = UnixTime.now();
int diff = networkHandler.connections.size() - ctx.getConnectionLimit();
if (diff > 0) {
for (Connection c : networkHandler.connections) {
c.disconnect();
diff--;
if (diff == 0) break;
}
}
boolean forcedDisconnect = false;
for (Iterator<Connection> iterator = networkHandler.connections.iterator(); iterator.hasNext(); ) {
Connection c = iterator.next();
// Just in case they were all created at the same time, don't disconnect
// all at once.
if (!forcedDisconnect && now - c.getStartTime() > ctx.getConnectionTTL()) {
c.disconnect();
forcedDisconnect = true;
}
switch (c.getState()) {
case DISCONNECTED:
iterator.remove();
break;
case ACTIVE:
active++;
break;
default:
// nothing to do
}
}
if (active < NETWORK_MAGIC_NUMBER) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) {
Connection c = new Connection(ctx, CLIENT, address, listener, networkHandler.requestedObjects);
if (first) {
initialConnection = c;
first = false;
}
networkHandler.startConnection(c);
}
Thread.sleep(10000);
} else if (initialConnection != null) {
initialConnection.disconnect();
initialConnection = null;
Thread.sleep(10000);
} else {
Thread.sleep(30000);
}
} catch (InterruptedException e) {
networkHandler.stop();
} catch (Exception e) {
LOG.error("Error in connection manager. Ignored.", e);
}
}
} finally {
LOG.debug("Connection manager shutting down.");
networkHandler.stop();
}
}
}

View File

@ -22,25 +22,21 @@ import ch.dissem.bitmessage.entity.CustomMessage;
import ch.dissem.bitmessage.entity.GetData;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.exception.ApplicationException;
import ch.dissem.bitmessage.exception.NodeException;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.Collections;
import ch.dissem.bitmessage.utils.Property;
import ch.dissem.bitmessage.utils.UnixTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.*;
import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT;
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE;
import static ch.dissem.bitmessage.utils.DebugUtils.inc;
@ -54,13 +50,13 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
public final static int NETWORK_MAGIC_NUMBER = 8;
private final Collection<Connection> connections = new ConcurrentLinkedQueue<>();
final Collection<Connection> connections = new ConcurrentLinkedQueue<>();
private final ExecutorService pool;
private InternalContext ctx;
private ServerSocket serverSocket;
private ServerRunnable server;
private volatile boolean running;
private Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000));
final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000));
public DefaultNetworkHandler() {
pool = Executors.newCachedThreadPool(new ThreadFactory() {
@ -122,93 +118,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
try {
running = true;
connections.clear();
serverSocket = new ServerSocket(ctx.getPort());
pool.execute(new Runnable() {
@Override
public void run() {
while (!serverSocket.isClosed()) {
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(Connection.READ_TIMEOUT);
startConnection(new Connection(ctx, SERVER, socket, listener, requestedObjects));
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
}
});
pool.execute(new Runnable() {
public Connection initialConnection;
@Override
public void run() {
try {
while (running) {
try {
int active = 0;
long now = UnixTime.now();
synchronized (connections) {
int diff = connections.size() - ctx.getConnectionLimit();
if (diff > 0) {
for (Connection c : connections) {
c.disconnect();
diff--;
if (diff == 0) break;
}
}
boolean forcedDisconnect = false;
for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) {
Connection c = iterator.next();
// Just in case they were all created at the same time, don't disconnect
// all at once.
if (!forcedDisconnect && now - c.getStartTime() > ctx.getConnectionTTL()) {
c.disconnect();
forcedDisconnect = true;
}
switch (c.getState()) {
case DISCONNECTED:
iterator.remove();
break;
case ACTIVE:
active++;
break;
default:
// nothing to do
}
}
}
if (active < NETWORK_MAGIC_NUMBER) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(
NETWORK_MAGIC_NUMBER - active, ctx.getStreams());
boolean first = active == 0 && initialConnection == null;
for (NetworkAddress address : addresses) {
Connection c = new Connection(ctx, CLIENT, address, listener, requestedObjects);
if (first) {
initialConnection = c;
first = false;
}
startConnection(c);
}
Thread.sleep(10000);
} else if (initialConnection != null) {
initialConnection.disconnect();
initialConnection = null;
Thread.sleep(10000);
} else {
Thread.sleep(30000);
}
} catch (InterruptedException e) {
running = false;
} catch (Exception e) {
LOG.error("Error in connection manager. Ignored.", e);
}
}
} finally {
LOG.debug("Connection manager shutting down.");
running = false;
}
}
});
server = new ServerRunnable(ctx, this, listener);
pool.execute(server);
pool.execute(new ConnectionOrganizer(ctx, this, listener));
} catch (IOException e) {
throw new ApplicationException(e);
}
@ -221,13 +133,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
@Override
public void stop() {
running = false;
try {
serverSocket.close();
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
server.close();
synchronized (connections) {
running = false;
for (Connection c : connections) {
c.disconnect();
}
@ -235,8 +143,12 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
requestedObjects.clear();
}
private void startConnection(Connection c) {
void startConnection(Connection c) {
if (!running) return;
synchronized (connections) {
if (!running) return;
// prevent connecting twice to the same node
if (connections.contains(c)) {
return;
@ -250,11 +162,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
@Override
public void offer(final InventoryVector iv) {
List<Connection> target = new LinkedList<>();
synchronized (connections) {
for (Connection connection : connections) {
if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
target.add(connection);
}
for (Connection connection : connections) {
if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) {
target.add(connection);
}
}
List<Connection> randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target);
@ -269,16 +179,14 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
TreeMap<Long, Integer> incomingConnections = new TreeMap<>();
TreeMap<Long, Integer> outgoingConnections = new TreeMap<>();
synchronized (connections) {
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
long stream = connection.getNode().getStream();
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
long stream = connection.getNode().getStream();
streams.add(stream);
if (connection.getMode() == SERVER) {
inc(incomingConnections, stream);
} else {
inc(outgoingConnections, stream);
}
}
}
@ -303,53 +211,47 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder {
void request(Set<InventoryVector> inventoryVectors) {
if (!running || inventoryVectors.isEmpty()) return;
synchronized (connections) {
Map<Connection, List<InventoryVector>> distribution = new HashMap<>();
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>());
}
}
Iterator<InventoryVector> iterator = inventoryVectors.iterator();
InventoryVector next;
if (iterator.hasNext()) {
next = iterator.next();
} else {
return;
}
boolean firstRound = true;
while (firstRound || iterator.hasNext()) {
if (!firstRound) {
next = iterator.next();
firstRound = true;
} else {
firstRound = false;
}
for (Connection connection : distribution.keySet()) {
if (connection.knowsOf(next)) {
List<InventoryVector> ivs = distribution.get(connection);
if (ivs.size() == 50_000) {
connection.send(new GetData.Builder().inventory(ivs).build());
ivs.clear();
}
ivs.add(next);
iterator.remove();
if (iterator.hasNext()) {
next = iterator.next();
firstRound = true;
} else {
firstRound = false;
break;
}
Map<Connection, List<InventoryVector>> distribution = new HashMap<>();
for (Connection connection : connections) {
if (connection.getState() == ACTIVE) {
distribution.put(connection, new LinkedList<InventoryVector>());
}
}
Iterator<InventoryVector> iterator = inventoryVectors.iterator();
if (!iterator.hasNext()) {
return;
}
InventoryVector next = iterator.next();
Connection previous = null;
do {
for (Connection connection : distribution.keySet()) {
if (connection == previous) {
next = iterator.next();
}
if (connection.knowsOf(next)) {
List<InventoryVector> ivs = distribution.get(connection);
if (ivs.size() == GetData.MAX_INVENTORY_SIZE) {
connection.send(new GetData.Builder().inventory(ivs).build());
ivs.clear();
}
ivs.add(next);
iterator.remove();
if (iterator.hasNext()) {
next = iterator.next();
previous = connection;
} else {
break;
}
}
}
for (Connection connection : distribution.keySet()) {
List<InventoryVector> ivs = distribution.get(connection);
if (!ivs.isEmpty()) {
connection.send(new GetData.Builder().inventory(ivs).build());
}
} while (iterator.hasNext());
for (Connection connection : distribution.keySet()) {
List<InventoryVector> ivs = distribution.get(connection);
if (!ivs.isEmpty()) {
connection.send(new GetData.Builder().inventory(ivs).build());
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2016 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.networking;
import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.ports.NetworkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER;
/**
* @author Christian Basler
*/
public class ServerRunnable implements Runnable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class);
private final InternalContext ctx;
private final ServerSocket serverSocket;
private final DefaultNetworkHandler networkHandler;
private final NetworkHandler.MessageListener listener;
public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, NetworkHandler.MessageListener listener) throws IOException {
this.ctx = ctx;
this.networkHandler = networkHandler;
this.listener = listener;
this.serverSocket = new ServerSocket(ctx.getPort());
}
@Override
public void run() {
while (!serverSocket.isClosed()) {
try {
Socket socket = serverSocket.accept();
socket.setSoTimeout(Connection.READ_TIMEOUT);
networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener,
networkHandler.requestedObjects));
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
}
@Override
public void close() {
try {
serverSocket.close();
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
}
}