diff --git a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index 60b7267..9f03926 100644 --- a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -302,9 +302,6 @@ public class BitmessageContext { long connectionTTL = 30 * MINUTE; boolean sendPubkeyOnIdentityCreation = true; - public Builder() { - } - public Builder port(int port) { this.port = port; return this; diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/Addr.java b/core/src/main/java/ch/dissem/bitmessage/entity/Addr.java index d00623e..2c0eb3e 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/Addr.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/Addr.java @@ -55,9 +55,6 @@ public class Addr implements MessagePayload { public static final class Builder { private List addresses = new ArrayList(); - public Builder() { - } - public Builder addresses(Collection addresses){ this.addresses.addAll(addresses); return this; diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/BitmessageAddress.java b/core/src/main/java/ch/dissem/bitmessage/entity/BitmessageAddress.java index 632bd70..62fabc3 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/BitmessageAddress.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/BitmessageAddress.java @@ -188,7 +188,7 @@ public class BitmessageAddress implements Serializable { @Override public String toString() { - return alias != null ? alias : address; + return alias == null ? address : alias; } public byte[] getRipe() { diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/GetData.java b/core/src/main/java/ch/dissem/bitmessage/entity/GetData.java index e272bbc..9569929 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/GetData.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/GetData.java @@ -28,6 +28,8 @@ import java.util.List; * The 'getdata' command is used to request objects from a node. */ public class GetData implements MessagePayload { + public static final int MAX_INVENTORY_SIZE = 50_000; + List inventory; private GetData(Builder builder) { @@ -54,9 +56,6 @@ public class GetData implements MessagePayload { public static final class Builder { private List inventory = new LinkedList<>(); - public Builder() { - } - public Builder addInventoryVector(InventoryVector inventoryVector) { this.inventory.add(inventoryVector); return this; diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java b/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java index 0135ec0..7130fd7 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/Inv.java @@ -54,9 +54,6 @@ public class Inv implements MessagePayload { public static final class Builder { private List inventory = new LinkedList<>(); - public Builder() { - } - public Builder addInventoryVector(InventoryVector inventoryVector) { this.inventory.add(inventoryVector); return this; diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java b/core/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java index a24bf6b..89ddddb 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java @@ -156,10 +156,10 @@ public class ObjectMessage implements MessagePayload { @Override public void write(OutputStream out) throws IOException { - if (nonce != null) { - out.write(nonce); - } else { + if (nonce == null) { out.write(new byte[8]); + } else { + out.write(nonce); } out.write(getPayloadBytesWithoutNonce()); } diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/Version.java b/core/src/main/java/ch/dissem/bitmessage/entity/Version.java index 70c2ad2..6d24d92 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/Version.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/Version.java @@ -143,9 +143,6 @@ public class Version implements MessagePayload { private String userAgent; private long[] streamNumbers; - public Builder() { - } - public Builder defaults() { version = BitmessageContext.CURRENT_VERSION; services = 1; diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java b/core/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java index da16929..d915941 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/payload/V2Pubkey.java @@ -96,9 +96,6 @@ public class V2Pubkey extends Pubkey { private byte[] publicSigningKey; private byte[] publicEncryptionKey; - public Builder() { - } - public Builder stream(long streamNumber) { this.streamNumber = streamNumber; return this; diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java b/core/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java index bf91afe..a3b4da2 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/payload/V3Pubkey.java @@ -123,9 +123,6 @@ public class V3Pubkey extends V2Pubkey { private long extraBytes; private byte[] signature = new byte[0]; - public Builder() { - } - public Builder stream(long streamNumber) { this.streamNumber = streamNumber; return this; diff --git a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java index da99350..0637da9 100644 --- a/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java +++ b/core/src/main/java/ch/dissem/bitmessage/entity/valueobject/NetworkAddress.java @@ -134,9 +134,6 @@ public class NetworkAddress implements Streamable { private byte[] ipv6; private int port; - public Builder() { - } - public Builder time(final long time) { this.time = time; return this; diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/SimplePOWEngine.java b/core/src/main/java/ch/dissem/bitmessage/ports/SimplePOWEngine.java index 4fa1116..a7d0d57 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/SimplePOWEngine.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/SimplePOWEngine.java @@ -20,6 +20,7 @@ import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.utils.Bytes; import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import static ch.dissem.bitmessage.utils.Bytes.inc; @@ -33,18 +34,17 @@ import static ch.dissem.bitmessage.utils.Bytes.inc; public class SimplePOWEngine implements ProofOfWorkEngine { @Override public void calculateNonce(byte[] initialHash, byte[] target, Callback callback) { - byte[] nonce = new byte[8]; - MessageDigest mda; try { - mda = MessageDigest.getInstance("SHA-512"); - } catch (Exception e) { + MessageDigest mda = MessageDigest.getInstance("SHA-512"); + byte[] nonce = new byte[8]; + do { + inc(nonce); + mda.update(nonce); + mda.update(initialHash); + } while (Bytes.lt(target, mda.digest(mda.digest()), 8)); + callback.onNonceCalculated(initialHash, nonce); + } catch (NoSuchAlgorithmException e) { throw new ApplicationException(e); } - do { - inc(nonce); - mda.update(nonce); - mda.update(initialHash); - } while (Bytes.lt(target, mda.digest(mda.digest()), 8)); - callback.onNonceCalculated(initialHash, nonce); } } diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java index 26d6652..dab43f3 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Application.java @@ -289,18 +289,18 @@ public class Application { System.out.println("Stream: " + address.getStream()); System.out.println("Version: " + address.getVersion()); if (address.getPrivateKey() == null) { - if (address.getPubkey() != null) { - System.out.println("Public key available"); - } else { + if (address.getPubkey() == null) { System.out.println("Public key still missing"); + } else { + System.out.println("Public key available"); } } } private void messages() { String command; - List messages = ctx.messages().findMessages(Plaintext.Status.RECEIVED); do { + List<Plaintext> messages = ctx.messages().findMessages(Plaintext.Status.RECEIVED); System.out.println(); int i = 0; for (Plaintext message : messages) { diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index 4f99144..43f1e30 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -21,7 +21,6 @@ import ch.dissem.bitmessage.InternalContext; import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.InsufficientProofOfWorkException; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.Factory; diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java new file mode 100644 index 0000000..3f667e6 --- /dev/null +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ConnectionOrganizer.java @@ -0,0 +1,120 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking; + +import ch.dissem.bitmessage.InternalContext; +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.ports.NetworkHandler; +import ch.dissem.bitmessage.utils.UnixTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; + +import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; +import static ch.dissem.bitmessage.networking.DefaultNetworkHandler.NETWORK_MAGIC_NUMBER; + +/** + * @author Christian Basler + */ +public class ConnectionOrganizer implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class); + + private final InternalContext ctx; + private final DefaultNetworkHandler networkHandler; + private final NetworkHandler.MessageListener listener; + + private Connection initialConnection; + + public ConnectionOrganizer(InternalContext ctx, + DefaultNetworkHandler networkHandler, + NetworkHandler.MessageListener listener) { + this.ctx = ctx; + this.networkHandler = networkHandler; + this.listener = listener; + } + + @Override + public void run() { + try { + while (networkHandler.isRunning()) { + try { + int active = 0; + long now = UnixTime.now(); + + int diff = networkHandler.connections.size() - ctx.getConnectionLimit(); + if (diff > 0) { + for (Connection c : networkHandler.connections) { + c.disconnect(); + diff--; + if (diff == 0) break; + } + } + boolean forcedDisconnect = false; + for (Iterator<Connection> iterator = networkHandler.connections.iterator(); iterator.hasNext(); ) { + Connection c = iterator.next(); + // Just in case they were all created at the same time, don't disconnect + // all at once. + if (!forcedDisconnect && now - c.getStartTime() > ctx.getConnectionTTL()) { + c.disconnect(); + forcedDisconnect = true; + } + switch (c.getState()) { + case DISCONNECTED: + iterator.remove(); + break; + case ACTIVE: + active++; + break; + default: + // nothing to do + } + } + + if (active < NETWORK_MAGIC_NUMBER) { + List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses( + NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); + boolean first = active == 0 && initialConnection == null; + for (NetworkAddress address : addresses) { + Connection c = new Connection(ctx, CLIENT, address, listener, networkHandler.requestedObjects); + if (first) { + initialConnection = c; + first = false; + } + networkHandler.startConnection(c); + } + Thread.sleep(10000); + } else if (initialConnection != null) { + initialConnection.disconnect(); + initialConnection = null; + Thread.sleep(10000); + } else { + Thread.sleep(30000); + } + } catch (InterruptedException e) { + networkHandler.stop(); + } catch (Exception e) { + LOG.error("Error in connection manager. Ignored.", e); + } + } + } finally { + LOG.debug("Connection manager shutting down."); + networkHandler.stop(); + } + } +} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index 44dd437..43333ac 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -22,25 +22,21 @@ import ch.dissem.bitmessage.entity.CustomMessage; import ch.dissem.bitmessage.entity.GetData; import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.exception.ApplicationException; import ch.dissem.bitmessage.exception.NodeException; import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.utils.Collections; import ch.dissem.bitmessage.utils.Property; -import ch.dissem.bitmessage.utils.UnixTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; -import java.net.ServerSocket; import java.net.Socket; import java.util.*; import java.util.concurrent.*; -import static ch.dissem.bitmessage.networking.Connection.Mode.CLIENT; import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; import static ch.dissem.bitmessage.utils.DebugUtils.inc; @@ -54,13 +50,13 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { public final static int NETWORK_MAGIC_NUMBER = 8; - private final Collection<Connection> connections = new ConcurrentLinkedQueue<>(); + final Collection<Connection> connections = new ConcurrentLinkedQueue<>(); private final ExecutorService pool; private InternalContext ctx; - private ServerSocket serverSocket; + private ServerRunnable server; private volatile boolean running; - private Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000)); + final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000)); public DefaultNetworkHandler() { pool = Executors.newCachedThreadPool(new ThreadFactory() { @@ -122,93 +118,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { try { running = true; connections.clear(); - serverSocket = new ServerSocket(ctx.getPort()); - pool.execute(new Runnable() { - @Override - public void run() { - while (!serverSocket.isClosed()) { - try { - Socket socket = serverSocket.accept(); - socket.setSoTimeout(Connection.READ_TIMEOUT); - startConnection(new Connection(ctx, SERVER, socket, listener, requestedObjects)); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); - } - } - } - }); - pool.execute(new Runnable() { - public Connection initialConnection; - - @Override - public void run() { - try { - while (running) { - try { - int active = 0; - long now = UnixTime.now(); - synchronized (connections) { - int diff = connections.size() - ctx.getConnectionLimit(); - if (diff > 0) { - for (Connection c : connections) { - c.disconnect(); - diff--; - if (diff == 0) break; - } - } - boolean forcedDisconnect = false; - for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext(); ) { - Connection c = iterator.next(); - // Just in case they were all created at the same time, don't disconnect - // all at once. - if (!forcedDisconnect && now - c.getStartTime() > ctx.getConnectionTTL()) { - c.disconnect(); - forcedDisconnect = true; - } - switch (c.getState()) { - case DISCONNECTED: - iterator.remove(); - break; - case ACTIVE: - active++; - break; - default: - // nothing to do - } - } - } - if (active < NETWORK_MAGIC_NUMBER) { - List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses( - NETWORK_MAGIC_NUMBER - active, ctx.getStreams()); - boolean first = active == 0 && initialConnection == null; - for (NetworkAddress address : addresses) { - Connection c = new Connection(ctx, CLIENT, address, listener, requestedObjects); - if (first) { - initialConnection = c; - first = false; - } - startConnection(c); - } - Thread.sleep(10000); - } else if (initialConnection != null) { - initialConnection.disconnect(); - initialConnection = null; - Thread.sleep(10000); - } else { - Thread.sleep(30000); - } - } catch (InterruptedException e) { - running = false; - } catch (Exception e) { - LOG.error("Error in connection manager. Ignored.", e); - } - } - } finally { - LOG.debug("Connection manager shutting down."); - running = false; - } - } - }); + server = new ServerRunnable(ctx, this, listener); + pool.execute(server); + pool.execute(new ConnectionOrganizer(ctx, this, listener)); } catch (IOException e) { throw new ApplicationException(e); } @@ -221,13 +133,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { @Override public void stop() { - running = false; - try { - serverSocket.close(); - } catch (IOException e) { - LOG.debug(e.getMessage(), e); - } + server.close(); synchronized (connections) { + running = false; for (Connection c : connections) { c.disconnect(); } @@ -235,8 +143,12 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { requestedObjects.clear(); } - private void startConnection(Connection c) { + void startConnection(Connection c) { + if (!running) return; + synchronized (connections) { + if (!running) return; + // prevent connecting twice to the same node if (connections.contains(c)) { return; @@ -250,11 +162,9 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { @Override public void offer(final InventoryVector iv) { List<Connection> target = new LinkedList<>(); - synchronized (connections) { - for (Connection connection : connections) { - if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { - target.add(connection); - } + for (Connection connection : connections) { + if (connection.getState() == ACTIVE && !connection.knowsOf(iv)) { + target.add(connection); } } List<Connection> randomSubset = Collections.selectRandom(NETWORK_MAGIC_NUMBER, target); @@ -269,16 +179,14 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { TreeMap<Long, Integer> incomingConnections = new TreeMap<>(); TreeMap<Long, Integer> outgoingConnections = new TreeMap<>(); - synchronized (connections) { - for (Connection connection : connections) { - if (connection.getState() == ACTIVE) { - long stream = connection.getNode().getStream(); - streams.add(stream); - if (connection.getMode() == SERVER) { - inc(incomingConnections, stream); - } else { - inc(outgoingConnections, stream); - } + for (Connection connection : connections) { + if (connection.getState() == ACTIVE) { + long stream = connection.getNode().getStream(); + streams.add(stream); + if (connection.getMode() == SERVER) { + inc(incomingConnections, stream); + } else { + inc(outgoingConnections, stream); } } } @@ -303,53 +211,47 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { void request(Set<InventoryVector> inventoryVectors) { if (!running || inventoryVectors.isEmpty()) return; - synchronized (connections) { - Map<Connection, List<InventoryVector>> distribution = new HashMap<>(); - for (Connection connection : connections) { - if (connection.getState() == ACTIVE) { - distribution.put(connection, new LinkedList<InventoryVector>()); - } - } - Iterator<InventoryVector> iterator = inventoryVectors.iterator(); - InventoryVector next; - if (iterator.hasNext()) { - next = iterator.next(); - } else { - return; - } - boolean firstRound = true; - while (firstRound || iterator.hasNext()) { - if (!firstRound) { - next = iterator.next(); - firstRound = true; - } else { - firstRound = false; - } - for (Connection connection : distribution.keySet()) { - if (connection.knowsOf(next)) { - List<InventoryVector> ivs = distribution.get(connection); - if (ivs.size() == 50_000) { - connection.send(new GetData.Builder().inventory(ivs).build()); - ivs.clear(); - } - ivs.add(next); - iterator.remove(); - if (iterator.hasNext()) { - next = iterator.next(); - firstRound = true; - } else { - firstRound = false; - break; - } + Map<Connection, List<InventoryVector>> distribution = new HashMap<>(); + for (Connection connection : connections) { + if (connection.getState() == ACTIVE) { + distribution.put(connection, new LinkedList<InventoryVector>()); + } + } + Iterator<InventoryVector> iterator = inventoryVectors.iterator(); + if (!iterator.hasNext()) { + return; + } + InventoryVector next = iterator.next(); + Connection previous = null; + do { + for (Connection connection : distribution.keySet()) { + if (connection == previous) { + next = iterator.next(); + } + if (connection.knowsOf(next)) { + List<InventoryVector> ivs = distribution.get(connection); + if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { + connection.send(new GetData.Builder().inventory(ivs).build()); + ivs.clear(); + } + ivs.add(next); + iterator.remove(); + + if (iterator.hasNext()) { + next = iterator.next(); + previous = connection; + } else { + break; } } } - for (Connection connection : distribution.keySet()) { - List<InventoryVector> ivs = distribution.get(connection); - if (!ivs.isEmpty()) { - connection.send(new GetData.Builder().inventory(ivs).build()); - } + } while (iterator.hasNext()); + + for (Connection connection : distribution.keySet()) { + List<InventoryVector> ivs = distribution.get(connection); + if (!ivs.isEmpty()) { + connection.send(new GetData.Builder().inventory(ivs).build()); } } } diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java new file mode 100644 index 0000000..bf6d6f6 --- /dev/null +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/ServerRunnable.java @@ -0,0 +1,70 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.networking; + +import ch.dissem.bitmessage.InternalContext; +import ch.dissem.bitmessage.ports.NetworkHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; + +/** + * @author Christian Basler + */ +public class ServerRunnable implements Runnable, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class); + private final InternalContext ctx; + private final ServerSocket serverSocket; + private final DefaultNetworkHandler networkHandler; + private final NetworkHandler.MessageListener listener; + + public ServerRunnable(InternalContext ctx, DefaultNetworkHandler networkHandler, NetworkHandler.MessageListener listener) throws IOException { + this.ctx = ctx; + this.networkHandler = networkHandler; + this.listener = listener; + this.serverSocket = new ServerSocket(ctx.getPort()); + } + + @Override + public void run() { + while (!serverSocket.isClosed()) { + try { + Socket socket = serverSocket.accept(); + socket.setSoTimeout(Connection.READ_TIMEOUT); + networkHandler.startConnection(new Connection(ctx, SERVER, socket, listener, + networkHandler.requestedObjects)); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } + } + } + + @Override + public void close() { + try { + serverSocket.close(); + } catch (IOException e) { + LOG.debug(e.getMessage(), e); + } + } +} diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java index 6e58f09..97202d1 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcAddressRepository.java @@ -99,10 +99,7 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito BitmessageAddress address; InputStream privateKeyStream = rs.getBinaryStream("private_key"); - if (privateKeyStream != null) { - PrivateKey privateKey = PrivateKey.read(privateKeyStream); - address = new BitmessageAddress(privateKey); - } else { + if (privateKeyStream == null) { address = new BitmessageAddress(rs.getString("address")); Blob publicKeyBlob = rs.getBlob("public_key"); if (publicKeyBlob != null) { @@ -113,6 +110,9 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito } address.setPubkey(pubkey); } + } else { + PrivateKey privateKey = PrivateKey.read(privateKeyStream); + address = new BitmessageAddress(privateKey); } address.setAlias(rs.getString("alias")); address.setSubscribed(rs.getBoolean("subscribed")); diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java index d583a71..1df12f6 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcHelper.java @@ -77,12 +77,12 @@ public abstract class JdbcHelper { } protected void writeBlob(PreparedStatement ps, int parameterIndex, Streamable data) throws SQLException, IOException { - if (data != null) { + if (data == null) { + ps.setBytes(parameterIndex, null); + } else { ByteArrayOutputStream os = new ByteArrayOutputStream(); data.write(os); ps.setBytes(parameterIndex, os.toByteArray()); - } else { - ps.setBytes(parameterIndex, null); } } } diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java index c01e147..d77cdfb 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcMessageRepository.java @@ -90,10 +90,10 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito @Override public int countUnread(Label label) { String where; - if (label != null) { - where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ") AND "; - } else { + if (label == null) { where = ""; + } else { + where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ") AND "; } where += "id IN (SELECT message_id FROM Message_Label WHERE label_id IN (" + "SELECT id FROM Label WHERE type = '" + Label.Type.UNREAD.name() + "'))"; @@ -237,14 +237,14 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito PreparedStatement ps = connection.prepareStatement( "INSERT INTO Message (iv, type, sender, recipient, data, sent, received, status, initial_hash) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS); - ps.setBytes(1, message.getInventoryVector() != null ? message.getInventoryVector().getHash() : null); + ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash()); ps.setString(2, message.getType().name()); ps.setString(3, message.getFrom().getAddress()); - ps.setString(4, message.getTo() != null ? message.getTo().getAddress() : null); + ps.setString(4, message.getTo() == null ? null : message.getTo().getAddress()); writeBlob(ps, 5, message); ps.setLong(6, message.getSent()); ps.setLong(7, message.getReceived()); - ps.setString(8, message.getStatus() != null ? message.getStatus().name() : null); + ps.setString(8, message.getStatus() == null ? null : message.getStatus().name()); ps.setBytes(9, message.getInitialHash()); ps.executeUpdate(); @@ -258,10 +258,10 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito private void update(Connection connection, Plaintext message) throws SQLException, IOException { PreparedStatement ps = connection.prepareStatement( "UPDATE Message SET iv=?, sent=?, received=?, status=?, initial_hash=? WHERE id=?"); - ps.setBytes(1, message.getInventoryVector() != null ? message.getInventoryVector().getHash() : null); + ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash()); ps.setLong(2, message.getSent()); ps.setLong(3, message.getReceived()); - ps.setString(4, message.getStatus() != null ? message.getStatus().name() : null); + ps.setString(4, message.getStatus() == null ? null : message.getStatus().name()); ps.setBytes(5, message.getInitialHash()); ps.setLong(6, (Long) message.getId()); ps.executeUpdate();