diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java b/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java index 51a388d..e595bef 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/ObjectMessage.java @@ -68,6 +68,10 @@ public class ObjectMessage implements MessagePayload { return expiresTime; } + public long getType() { + return objectType; + } + public ObjectPayload getPayload() { return payload; } diff --git a/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java b/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java new file mode 100644 index 0000000..374f077 --- /dev/null +++ b/domain/src/main/java/ch/dissem/bitmessage/utils/UnixTime.java @@ -0,0 +1,29 @@ +/* + * Copyright 2015 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +/** + * Created by chris on 18.04.15. + */ +public class UnixTime { + /** + * Returns the time in second based Unix time ({@link System#currentTimeMillis()}/1000) + */ + public static long now() { + return System.currentTimeMillis() / 1000; + } +} diff --git a/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java b/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java index eb84886..d355d8f 100644 --- a/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java +++ b/inventory/src/main/java/ch/dissem/bitmessage/inventory/DatabaseRepository.java @@ -33,6 +33,7 @@ import java.util.LinkedList; import java.util.List; import static ch.dissem.bitmessage.utils.Strings.join; +import static ch.dissem.bitmessage.utils.UnixTime.now; /** * Stores everything in a database @@ -80,21 +81,22 @@ public class DatabaseRepository implements Inventory, AddressRepository { public void offerAddresses(List addresses) { try { Connection connection = getConnection(); - PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ?"); + PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ? AND stream = ?"); PreparedStatement insert = connection.prepareStatement( "INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)"); PreparedStatement update = connection.prepareStatement( - "UPDATE Node SET services = ?, stream = ?, time = ? WHERE ip = ? AND port = ?"); + "UPDATE Node SET services = ?, time = ? WHERE ip = ? AND port = ? AND stream = ?"); for (NetworkAddress node : addresses) { exists.setBytes(1, node.getIPv6()); exists.setInt(2, node.getPort()); + exists.setLong(3, node.getStream()); if (exists.executeQuery().next()) { update.setLong(1, node.getServices()); - update.setLong(2, node.getStream()); - update.setLong(3, node.getTime()); + update.setLong(2, node.getTime()); - update.setBytes(4, node.getIPv6()); - update.setInt(5, node.getPort()); + update.setBytes(3, node.getIPv6()); + update.setInt(4, node.getPort()); + update.setLong(5, node.getStream()); update.executeUpdate(); } else { insert.setBytes(1, node.getIPv6()); @@ -115,7 +117,8 @@ public class DatabaseRepository implements Inventory, AddressRepository { List result = new LinkedList<>(); try { Statement stmt = getConnection().createStatement(); - ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE Stream IN (" + join(streams) + ")"); + ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE expires > " + now() + + " AND stream IN (" + join(streams) + ")"); while (rs.next()) { result.add(new InventoryVector(rs.getBytes("hash"))); } @@ -147,9 +150,9 @@ public class DatabaseRepository implements Inventory, AddressRepository { @Override public void storeObject(int version, ObjectMessage object) { try { - PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, version) VALUES (?, ?, ?, ?, ?)"); + PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)"); InventoryVector iv = object.getInventoryVector(); - LOG.error("Storing object " + iv); + LOG.trace("Storing object " + iv); ps.setBytes(1, iv.getHash()); ps.setLong(2, object.getStream()); ps.setLong(3, object.getExpiresTime()); @@ -157,8 +160,11 @@ public class DatabaseRepository implements Inventory, AddressRepository { object.write(os); ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); ps.setBlob(4, is); - ps.setInt(5, version); + ps.setLong(5, object.getType()); + ps.setInt(6, version); ps.executeUpdate(); + } catch (SQLException e) { + LOG.error("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); } catch (Exception e) { LOG.error(e.getMessage(), e); } @@ -167,7 +173,8 @@ public class DatabaseRepository implements Inventory, AddressRepository { @Override public void cleanup() { try { - getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE time < " + (System.currentTimeMillis() / 1000)); + // We delete only objects that expired 5 minutes ago or earlier, so we don't request objects we just deleted + getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300)); } catch (SQLException e) { LOG.debug(e.getMessage(), e); } diff --git a/inventory/src/main/resources/db/migration/V1.0__Create_node_table.sql b/inventory/src/main/resources/db/migration/V1.0__Create_node_table.sql index 874b1f5..2472634 100644 --- a/inventory/src/main/resources/db/migration/V1.0__Create_node_table.sql +++ b/inventory/src/main/resources/db/migration/V1.0__Create_node_table.sql @@ -1,9 +1,9 @@ CREATE TABLE Node ( ip BINARY(16) NOT NULL, port INT NOT NULL, - services BIGINT NOT NULL, stream BIGINT NOT NULL, + services BIGINT NOT NULL, time BIGINT NOT NULL, - PRIMARY KEY (ip, port) + PRIMARY KEY (ip, port, stream) ); \ No newline at end of file diff --git a/inventory/src/main/resources/db/migration/V1.1__Create_inventory_table.sql b/inventory/src/main/resources/db/migration/V1.1__Create_inventory_table.sql index 0edd765..5f3855f 100644 --- a/inventory/src/main/resources/db/migration/V1.1__Create_inventory_table.sql +++ b/inventory/src/main/resources/db/migration/V1.1__Create_inventory_table.sql @@ -3,5 +3,6 @@ CREATE TABLE Inventory ( stream BIGINT NOT NULL, expires BIGINT NOT NULL, data BLOB NOT NULL, + type BIGINT NOT NULL, version INT NOT NULL ); \ No newline at end of file