From f00c6018e70b0a99e4a4654dd5a6ad68eafb0606 Mon Sep 17 00:00:00 2001 From: Christian Basler Date: Fri, 24 Apr 2015 11:11:08 +0200 Subject: [PATCH] Some refactoring - I didn't like the way the context was initialized - The DatabaseRepository got too complicated --- .../java/ch/dissem/bitmessage/demo/Main.java | 24 ++- .../dissem/bitmessage/BitmessageContext.java | 174 +++++++++++++++ .../java/ch/dissem/bitmessage/Context.java | 97 --------- .../ch/dissem/bitmessage/entity/Version.java | 4 +- .../bitmessage/ports/NetworkHandler.java | 1 + .../ch/dissem/bitmessage/utils/Security.java | 2 +- .../inventory/DatabaseRepository.java | 199 ------------------ .../inventory/JdbcAddressRepository.java | 4 +- .../bitmessage/inventory/JdbcHelper.java | 63 ++++++ .../bitmessage/inventory/JdbcInventory.java | 103 +++++++++ .../inventory/JdbcNodeRegistry.java | 95 +++++++++ .../bitmessage/networking/Connection.java | 10 +- .../bitmessage/networking/NetworkNode.java | 18 +- .../networking/NetworkNodeTest.java | 2 - 14 files changed, 472 insertions(+), 324 deletions(-) create mode 100644 domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java delete mode 100644 domain/src/main/java/ch/dissem/bitmessage/Context.java delete mode 100644 inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java create mode 100644 inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcHelper.java create mode 100644 inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcInventory.java create mode 100644 inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcNodeRegistry.java diff --git a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java index c7ca4f3..671bb29 100644 --- a/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java +++ b/demo/src/main/java/ch/dissem/bitmessage/demo/Main.java @@ -16,17 +16,17 @@ package ch.dissem.bitmessage.demo; -import ch.dissem.bitmessage.Context; +import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.entity.payload.ObjectPayload; -import ch.dissem.bitmessage.inventory.DatabaseRepository; +import ch.dissem.bitmessage.inventory.JdbcAddressRepository; +import ch.dissem.bitmessage.inventory.JdbcInventory; +import ch.dissem.bitmessage.inventory.JdbcNodeRegistry; import ch.dissem.bitmessage.networking.NetworkNode; import ch.dissem.bitmessage.ports.NetworkHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import java.util.Scanner; /** @@ -36,11 +36,15 @@ public class Main { private final static Logger LOG = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws IOException { - NetworkNode networkNode = new NetworkNode(); - DatabaseRepository repo = new DatabaseRepository(); - Context.init(repo, repo, networkNode, 48444); - Context.getInstance().addStream(1); - networkNode.start(new NetworkHandler.MessageListener() { + BitmessageContext ctx = new BitmessageContext.Builder() + .addressRepo(new JdbcAddressRepository()) + .inventory(new JdbcInventory()) + .nodeRegistry(new JdbcNodeRegistry()) + .networkHandler(new NetworkNode()) + .port(48444) + .streams(1) + .build(); + ctx.getNetworkHandler().start(new NetworkHandler.MessageListener() { @Override public void receive(ObjectPayload payload) { // LOG.info("message received: " + payload); @@ -52,6 +56,6 @@ public class Main { Scanner scanner = new Scanner(System.in); scanner.nextLine(); LOG.info("Shutting down client"); - networkNode.stop(); + ctx.getNetworkHandler().stop(); } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java new file mode 100644 index 0000000..ed355ab --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -0,0 +1,174 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage; + +import ch.dissem.bitmessage.ports.AddressRepository; +import ch.dissem.bitmessage.ports.Inventory; +import ch.dissem.bitmessage.ports.NetworkHandler; +import ch.dissem.bitmessage.ports.NodeRegistry; + +import java.util.Collection; +import java.util.TreeSet; + +/** + * Created by chris on 05.04.15. + */ +public class BitmessageContext { + public static final int CURRENT_VERSION = 3; + + private Inventory inventory; + private NodeRegistry nodeRegistry; + private NetworkHandler networkHandler; + private AddressRepository addressRepo; + + private Collection streams = new TreeSet<>(); + + private int port; + + private long networkNonceTrialsPerByte = 1000; + private long networkExtraBytes = 1000; + + private BitmessageContext(Builder builder) { + port = builder.port; + inventory = builder.inventory; + nodeRegistry = builder.nodeRegistry; + networkHandler = builder.networkHandler; + addressRepo = builder.addressRepo; + streams = builder.streams; + + init(inventory, nodeRegistry, networkHandler, addressRepo); + } + + private void init(Object... objects) { + for (Object o : objects) { + if (o instanceof ContextHolder) { + ((ContextHolder) o).setContext(this); + } + } + } + + public Inventory getInventory() { + return inventory; + } + + public NodeRegistry getAddressRepository() { + return nodeRegistry; + } + + public NetworkHandler getNetworkHandler() { + return networkHandler; + } + + public int getPort() { + return port; + } + + public long[] getStreams() { + long[] result = new long[streams.size()]; + int i = 0; + for (long stream : streams) { + result[i++] = stream; + } + return result; + } + + public void addStream(long stream) { + streams.add(stream); + } + + public void removeStream(long stream) { + streams.remove(stream); + } + + public long getNetworkNonceTrialsPerByte() { + return networkNonceTrialsPerByte; + } + + public long getNetworkExtraBytes() { + return networkExtraBytes; + } + + + public interface ContextHolder { + void setContext(BitmessageContext context); + } + + public static final class Builder { + private int port = 8444; + private Inventory inventory; + private NodeRegistry nodeRegistry; + private NetworkHandler networkHandler; + private AddressRepository addressRepo; + private Collection streams; + + public Builder() { + } + + public Builder port(int port) { + this.port = port; + return this; + } + + public Builder inventory(Inventory inventory) { + this.inventory = inventory; + return this; + } + + public Builder nodeRegistry(NodeRegistry nodeRegistry) { + this.nodeRegistry = nodeRegistry; + return this; + } + + public Builder networkHandler(NetworkHandler networkHandler) { + this.networkHandler = networkHandler; + return this; + } + + public Builder addressRepo(AddressRepository addressRepo) { + this.addressRepo = addressRepo; + return this; + } + + public Builder streams(Collection streams) { + this.streams = streams; + return this; + } + + public Builder streams(long... streams) { + this.streams = new TreeSet<>(); + for (long stream : streams) { + this.streams.add(stream); + } + return this; + } + + public BitmessageContext build() { + nonNull("inventory", inventory); + nonNull("nodeRegistry", nodeRegistry); + nonNull("networkHandler", networkHandler); + nonNull("addressRepo", addressRepo); + if (streams == null) { + streams(1); + } + return new BitmessageContext(this); + } + + private void nonNull(String name, Object o) { + if (o == null) throw new IllegalStateException(name + " must not be null"); + } + } +} diff --git a/domain/src/main/java/ch/dissem/bitmessage/Context.java b/domain/src/main/java/ch/dissem/bitmessage/Context.java deleted file mode 100644 index 02cad5f..0000000 --- a/domain/src/main/java/ch/dissem/bitmessage/Context.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage; - -import ch.dissem.bitmessage.ports.NodeRegistry; -import ch.dissem.bitmessage.ports.Inventory; -import ch.dissem.bitmessage.ports.NetworkHandler; - -import java.util.Collection; -import java.util.TreeSet; - -/** - * Created by chris on 05.04.15. - */ -public class Context { - public static final int CURRENT_VERSION = 3; - - private static Context instance; - - private Inventory inventory; - private NodeRegistry addressRepo; - private NetworkHandler networkHandler; - - private Collection streams = new TreeSet<>(); - - private int port; - - private long networkNonceTrialsPerByte = 1000; - private long networkExtraBytes = 1000; - - private Context(Inventory inventory, NodeRegistry addressRepo, - NetworkHandler networkHandler, int port) { - this.inventory = inventory; - this.addressRepo = addressRepo; - this.networkHandler = networkHandler; - this.port = port; - } - - public static void init(Inventory inventory, NodeRegistry nodeRegistry, NetworkHandler networkHandler, int port) { - instance = new Context(inventory, nodeRegistry, networkHandler, port); - } - - public static Context getInstance() { - return instance; - } - - public Inventory getInventory() { - return inventory; - } - - public NodeRegistry getAddressRepository() { - return addressRepo; - } - - public int getPort() { - return port; - } - - public long[] getStreams() { - long[] result = new long[streams.size()]; - int i = 0; - for (long stream : streams) { - result[i++] = stream; - } - return result; - } - - public void addStream(long stream) { - streams.add(stream); - } - - public void removeStream(long stream) { - streams.remove(stream); - } - - public long getNetworkNonceTrialsPerByte() { - return networkNonceTrialsPerByte; - } - - public long getNetworkExtraBytes() { - return networkExtraBytes; - } -} diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java index 1fc8a21..70c2ad2 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/Version.java @@ -16,7 +16,7 @@ package ch.dissem.bitmessage.entity; -import ch.dissem.bitmessage.Context; +import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.utils.Encode; import ch.dissem.bitmessage.utils.UnixTime; @@ -147,7 +147,7 @@ public class Version implements MessagePayload { } public Builder defaults() { - version = Context.CURRENT_VERSION; + version = BitmessageContext.CURRENT_VERSION; services = 1; timestamp = UnixTime.now(); nonce = new Random().nextInt(); diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java index 70bda06..616673f 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/NetworkHandler.java @@ -16,6 +16,7 @@ package ch.dissem.bitmessage.ports; +import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.entity.payload.ObjectPayload; /** diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java b/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java index 80ae000..2036604 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/Security.java @@ -133,7 +133,7 @@ public class Security { long nonceTrialsPerByte, long extraBytes, Pubkey.Feature... features) { byte[] publicSigningKey = EC_PARAMETERS.getG().multiply(keyToBigInt(privateSigningKey)).getEncoded(false); byte[] publicEncryptionKey = EC_PARAMETERS.getG().multiply(keyToBigInt(privateEncryptionKey)).getEncoded(false); - return Factory.createPubkey(Bytes.subArray(publicSigningKey, 1, 64), Bytes.subArray(publicEncryptionKey, 1, 64), + return Factory.createPubkey(version, Bytes.subArray(publicSigningKey, 1, 64), Bytes.subArray(publicEncryptionKey, 1, 64), nonceTrialsPerByte, extraBytes, features); } diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java deleted file mode 100644 index 4c05d42..0000000 --- a/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright 2015 Christian Basler - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.dissem.bitmessage.inventory; - -import ch.dissem.bitmessage.entity.ObjectMessage; -import ch.dissem.bitmessage.entity.Streamable; -import ch.dissem.bitmessage.entity.valueobject.InventoryVector; -import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.factory.Factory; -import ch.dissem.bitmessage.ports.Inventory; -import ch.dissem.bitmessage.ports.NodeRegistry; -import org.flywaydb.core.Flyway; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.sql.*; -import java.util.LinkedList; -import java.util.List; - -import static ch.dissem.bitmessage.utils.Strings.join; -import static ch.dissem.bitmessage.utils.UnixTime.now; - -/** - * Stores everything in a database - */ -public class DatabaseRepository implements Inventory, NodeRegistry { - private static final Logger LOG = LoggerFactory.getLogger(DatabaseRepository.class); - - private static final String DB_URL = "jdbc:h2:~/jabit"; - private static final String DB_USER = "sa"; - private static final String DB_PWD = null; - - - public DatabaseRepository() { - Flyway flyway = new Flyway(); - flyway.setDataSource(DB_URL, DB_USER, null); - flyway.migrate(); - } - - @Override - public List getKnownAddresses(int limit, long... streams) { - List result = new LinkedList<>(); - try { - Statement stmt = getConnection().createStatement(); - ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE Stream IN (" + join(streams) + ")"); - while (rs.next()) { -// result.add(new NetworkAddress.Builder() -// .ipv6(rs.getBytes("ip")) -// .port(rs.getByte("port")) -// .services(rs.getLong("services")) -// .stream(rs.getLong("stream")) -// .time(rs.getLong("time")) -// .build()); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - if (result.isEmpty()) { - // FIXME: this is for testing purposes, remove it! - result.add(new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build()); - } - return result; - } - - @Override - public void offerAddresses(List addresses) { - try { - Connection connection = getConnection(); - PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ? AND stream = ?"); - PreparedStatement insert = connection.prepareStatement( - "INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)"); - PreparedStatement update = connection.prepareStatement( - "UPDATE Node SET services = ?, time = ? WHERE ip = ? AND port = ? AND stream = ?"); - for (NetworkAddress node : addresses) { - exists.setBytes(1, node.getIPv6()); - exists.setInt(2, node.getPort()); - exists.setLong(3, node.getStream()); - if (exists.executeQuery().next()) { - update.setLong(1, node.getServices()); - update.setLong(2, node.getTime()); - - update.setBytes(3, node.getIPv6()); - update.setInt(4, node.getPort()); - update.setLong(5, node.getStream()); - update.executeUpdate(); - } else { - insert.setBytes(1, node.getIPv6()); - insert.setInt(2, node.getPort()); - insert.setLong(3, node.getServices()); - insert.setLong(4, node.getStream()); - insert.setLong(5, node.getTime()); - insert.executeUpdate(); - } - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public List getInventory(long... streams) { - List result = new LinkedList<>(); - try { - Statement stmt = getConnection().createStatement(); - ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE expires > " + now() + - " AND stream IN (" + join(streams) + ")"); - while (rs.next()) { - result.add(new InventoryVector(rs.getBytes("hash"))); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - return result; - } - - @Override - public List getMissing(List offer, long... streams) { - offer.removeAll(getInventory(streams)); - return offer; - } - - @Override - public ObjectMessage getObject(InventoryVector vector) { - try { - Statement stmt = getConnection().createStatement(); - ResultSet rs = stmt.executeQuery("SELECT data, version FROM Inventory WHERE hash = " + vector); - Blob data = rs.getBlob("data"); - return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e); - } - } - - @Override - public void storeObject(int version, ObjectMessage object) { - try { - PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)"); - InventoryVector iv = object.getInventoryVector(); - LOG.trace("Storing object " + iv); - ps.setBytes(1, iv.getHash()); - ps.setLong(2, object.getStream()); - ps.setLong(3, object.getExpiresTime()); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - object.write(os); - ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); - ps.setBlob(4, is); - ps.setLong(5, object.getType()); - ps.setInt(6, version); - ps.executeUpdate(); - } catch (SQLException e) { - LOG.error("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - protected void writeBlob(PreparedStatement ps, int parameterIndex, Streamable data) throws SQLException, IOException { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - data.write(os); - ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); - ps.setBlob(parameterIndex, is); - } - - @Override - public void cleanup() { - try { - // We delete only objects that expired 5 minutes ago or earlier, so we don't request objects we just deleted - getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300)); - } catch (SQLException e) { - LOG.debug(e.getMessage(), e); - } - } - - protected Connection getConnection() { - try { - return DriverManager.getConnection(DB_URL, DB_USER, DB_PWD); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } -} diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcAddressRepository.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcAddressRepository.java index 7921030..dd5fc3c 100644 --- a/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcAddressRepository.java +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcAddressRepository.java @@ -32,8 +32,8 @@ import java.util.List; /** * Created by chris on 23.04.15. */ -public class JdbcAddressRepository extends DatabaseRepository implements AddressRepository { - private static final Logger LOG = LoggerFactory.getLogger(DatabaseRepository.class); +public class JdbcAddressRepository extends JdbcHelper implements AddressRepository { + private static final Logger LOG = LoggerFactory.getLogger(JdbcAddressRepository.class); @Override public List findIdentities() { diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcHelper.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcHelper.java new file mode 100644 index 0000000..2403302 --- /dev/null +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcHelper.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.inventory; + +import ch.dissem.bitmessage.entity.Streamable; +import org.flywaydb.core.Flyway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Helper class that does Flyway migration, provides JDBC connections and some helper methods. + */ +abstract class JdbcHelper { + private static final Logger LOG = LoggerFactory.getLogger(JdbcHelper.class); + + private static final String DB_URL = "jdbc:h2:~/jabit"; + private static final String DB_USER = "sa"; + private static final String DB_PWD = null; + + + static { + Flyway flyway = new Flyway(); + flyway.setDataSource(DB_URL, DB_USER, null); + flyway.migrate(); + } + + protected void writeBlob(PreparedStatement ps, int parameterIndex, Streamable data) throws SQLException, IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + data.write(os); + ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); + ps.setBlob(parameterIndex, is); + } + + protected Connection getConnection() { + try { + return DriverManager.getConnection(DB_URL, DB_USER, DB_PWD); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcInventory.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcInventory.java new file mode 100644 index 0000000..02db760 --- /dev/null +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcInventory.java @@ -0,0 +1,103 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.inventory; + +import ch.dissem.bitmessage.entity.ObjectMessage; +import ch.dissem.bitmessage.entity.valueobject.InventoryVector; +import ch.dissem.bitmessage.factory.Factory; +import ch.dissem.bitmessage.ports.Inventory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.LinkedList; +import java.util.List; + +import static ch.dissem.bitmessage.utils.Strings.join; +import static ch.dissem.bitmessage.utils.UnixTime.now; + +/** + * Created by chris on 24.04.15. + */ +public class JdbcInventory extends JdbcHelper implements Inventory { + private static final Logger LOG = LoggerFactory.getLogger(JdbcInventory.class); + + @Override + public List getInventory(long... streams) { + List result = new LinkedList<>(); + try { + Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE expires > " + now() + + " AND stream IN (" + join(streams) + ")"); + while (rs.next()) { + result.add(new InventoryVector(rs.getBytes("hash"))); + } + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + return result; + } + + @Override + public List getMissing(List offer, long... streams) { + offer.removeAll(getInventory(streams)); + return offer; + } + + @Override + public ObjectMessage getObject(InventoryVector vector) { + try { + Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT data, version FROM Inventory WHERE hash = " + vector); + Blob data = rs.getBlob("data"); + return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Override + public void storeObject(int version, ObjectMessage object) { + try { + PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)"); + InventoryVector iv = object.getInventoryVector(); + LOG.trace("Storing object " + iv); + ps.setBytes(1, iv.getHash()); + ps.setLong(2, object.getStream()); + ps.setLong(3, object.getExpiresTime()); + writeBlob(ps, 4, object); + ps.setLong(5, object.getType()); + ps.setInt(6, version); + ps.executeUpdate(); + } catch (SQLException e) { + LOG.error("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + @Override + public void cleanup() { + try { + // We delete only objects that expired 5 minutes ago or earlier, so we don't request objects we just deleted + getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300)); + } catch (SQLException e) { + LOG.debug(e.getMessage(), e); + } + } +} diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcNodeRegistry.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcNodeRegistry.java new file mode 100644 index 0000000..13450bd --- /dev/null +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/JdbcNodeRegistry.java @@ -0,0 +1,95 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.inventory; + +import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; +import ch.dissem.bitmessage.ports.NodeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.LinkedList; +import java.util.List; + +import static ch.dissem.bitmessage.utils.Strings.join; + +/** + * Created by chris on 24.04.15. + */ +public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry { + private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class); + + @Override + public List getKnownAddresses(int limit, long... streams) { + List result = new LinkedList<>(); + try { + Statement stmt = getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE Stream IN (" + join(streams) + ")"); + while (rs.next()) { +// result.add(new NetworkAddress.Builder() +// .ipv6(rs.getBytes("ip")) +// .port(rs.getByte("port")) +// .services(rs.getLong("services")) +// .stream(rs.getLong("stream")) +// .time(rs.getLong("time")) +// .build()); + } + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + if (result.isEmpty()) { + // FIXME: this is for testing purposes, remove it! + result.add(new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build()); + } + return result; + } + + @Override + public void offerAddresses(List addresses) { + try { + Connection connection = getConnection(); + PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ? AND stream = ?"); + PreparedStatement insert = connection.prepareStatement( + "INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)"); + PreparedStatement update = connection.prepareStatement( + "UPDATE Node SET services = ?, time = ? WHERE ip = ? AND port = ? AND stream = ?"); + for (NetworkAddress node : addresses) { + exists.setBytes(1, node.getIPv6()); + exists.setInt(2, node.getPort()); + exists.setLong(3, node.getStream()); + if (exists.executeQuery().next()) { + update.setLong(1, node.getServices()); + update.setLong(2, node.getTime()); + + update.setBytes(3, node.getIPv6()); + update.setInt(4, node.getPort()); + update.setLong(5, node.getStream()); + update.executeUpdate(); + } else { + insert.setBytes(1, node.getIPv6()); + insert.setInt(2, node.getPort()); + insert.setLong(3, node.getServices()); + insert.setLong(4, node.getStream()); + insert.setLong(5, node.getTime()); + insert.executeUpdate(); + } + } + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + } +} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java index a6e2d0a..e6cada4 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/Connection.java @@ -16,7 +16,7 @@ package ch.dissem.bitmessage.networking; -import ch.dissem.bitmessage.Context; +import ch.dissem.bitmessage.BitmessageContext; import ch.dissem.bitmessage.entity.*; import ch.dissem.bitmessage.entity.valueobject.InventoryVector; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; @@ -43,7 +43,7 @@ import static ch.dissem.bitmessage.networking.Connection.State.*; public class Connection implements Runnable { private final static Logger LOG = LoggerFactory.getLogger(Connection.class); - private Context ctx; + private BitmessageContext ctx; private State state; private Socket socket; @@ -59,8 +59,8 @@ public class Connection implements Runnable { private Queue sendingQueue = new ConcurrentLinkedDeque<>(); - public Connection(State state, Socket socket, MessageListener listener) throws IOException { - this.ctx = Context.getInstance(); + public Connection(BitmessageContext context, State state, Socket socket, MessageListener listener) throws IOException { + this.ctx = context; this.state = state; this.socket = socket; this.in = socket.getInputStream(); @@ -97,7 +97,7 @@ public class Connection implements Runnable { switch (msg.getPayload().getCommand()) { case VERSION: Version payload = (Version) msg.getPayload(); - if (payload.getVersion() >= Context.CURRENT_VERSION) { + if (payload.getVersion() >= BitmessageContext.CURRENT_VERSION) { this.version = payload.getVersion(); this.streams = payload.getStreams(); send(new VerAck()); diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java index 82a146b..8cd85f6 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/NetworkNode.java @@ -16,7 +16,8 @@ package ch.dissem.bitmessage.networking; -import ch.dissem.bitmessage.Context; +import ch.dissem.bitmessage.BitmessageContext; +import ch.dissem.bitmessage.BitmessageContext.ContextHolder; import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; import ch.dissem.bitmessage.ports.NetworkHandler; @@ -37,8 +38,9 @@ import static ch.dissem.bitmessage.networking.Connection.State.*; /** * Handles all the networky stuff. */ -public class NetworkNode implements NetworkHandler { +public class NetworkNode implements NetworkHandler, ContextHolder { private final static Logger LOG = LoggerFactory.getLogger(NetworkNode.class); + private BitmessageContext ctx; private final ExecutorService pool; private final List connections = new LinkedList<>(); private ServerSocket serverSocket; @@ -48,21 +50,25 @@ public class NetworkNode implements NetworkHandler { pool = Executors.newCachedThreadPool(); } + @Override + public void setContext(BitmessageContext context) { + this.ctx = context; + } + @Override public void start(final MessageListener listener) { - final Context ctx = Context.getInstance(); if (listener == null) { throw new IllegalStateException("Listener must be set at start"); } try { - serverSocket = new ServerSocket(Context.getInstance().getPort()); + serverSocket = new ServerSocket(ctx.getPort()); pool.execute(new Runnable() { @Override public void run() { try { Socket socket = serverSocket.accept(); socket.setSoTimeout(10000); - startConnection(new Connection(SERVER, socket, listener)); + startConnection(new Connection(ctx, SERVER, socket, listener)); } catch (IOException e) { LOG.debug(e.getMessage(), e); } @@ -85,7 +91,7 @@ public class NetworkNode implements NetworkHandler { List addresses = ctx.getAddressRepository().getKnownAddresses(8, ctx.getStreams()); for (NetworkAddress address : addresses) { try { - startConnection(new Connection(CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener)); + startConnection(new Connection(ctx, CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener)); } catch (IOException e) { LOG.debug(e.getMessage(), e); } diff --git a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java index cd2411e..c5ec7bb 100644 --- a/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java +++ b/networking/src/test/java/ch/dissem/bitmessage/networking/NetworkNodeTest.java @@ -18,9 +18,7 @@ package ch.dissem.bitmessage.networking; import ch.dissem.bitmessage.entity.NetworkMessage; import ch.dissem.bitmessage.entity.Version; -import ch.dissem.bitmessage.entity.payload.ObjectPayload; import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; -import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.utils.UnixTime; import org.junit.Test;