Minor DB updates

A node can support multiple streams. Unfortunately, an address can't so there might be multiple entries of the same address for different streams.
Also, store object types. I think we'll need them later, specially in search for public keys we requested.
This commit is contained in:
Christian Basler 2015-04-18 14:31:17 +02:00
parent e611e5e20a
commit ceae11b5c6
5 changed files with 54 additions and 13 deletions

View File

@ -68,6 +68,10 @@ public class ObjectMessage implements MessagePayload {
return expiresTime; return expiresTime;
} }
public long getType() {
return objectType;
}
public ObjectPayload getPayload() { public ObjectPayload getPayload() {
return payload; return payload;
} }

View File

@ -0,0 +1,29 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
/**
* Created by chris on 18.04.15.
*/
public class UnixTime {
/**
* Returns the time in second based Unix time ({@link System#currentTimeMillis()}/1000)
*/
public static long now() {
return System.currentTimeMillis() / 1000;
}
}

View File

@ -33,6 +33,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import static ch.dissem.bitmessage.utils.Strings.join; import static ch.dissem.bitmessage.utils.Strings.join;
import static ch.dissem.bitmessage.utils.UnixTime.now;
/** /**
* Stores everything in a database * Stores everything in a database
@ -80,21 +81,22 @@ public class DatabaseRepository implements Inventory, AddressRepository {
public void offerAddresses(List<NetworkAddress> addresses) { public void offerAddresses(List<NetworkAddress> addresses) {
try { try {
Connection connection = getConnection(); Connection connection = getConnection();
PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ?"); PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ? AND stream = ?");
PreparedStatement insert = connection.prepareStatement( PreparedStatement insert = connection.prepareStatement(
"INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)"); "INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)");
PreparedStatement update = connection.prepareStatement( PreparedStatement update = connection.prepareStatement(
"UPDATE Node SET services = ?, stream = ?, time = ? WHERE ip = ? AND port = ?"); "UPDATE Node SET services = ?, time = ? WHERE ip = ? AND port = ? AND stream = ?");
for (NetworkAddress node : addresses) { for (NetworkAddress node : addresses) {
exists.setBytes(1, node.getIPv6()); exists.setBytes(1, node.getIPv6());
exists.setInt(2, node.getPort()); exists.setInt(2, node.getPort());
exists.setLong(3, node.getStream());
if (exists.executeQuery().next()) { if (exists.executeQuery().next()) {
update.setLong(1, node.getServices()); update.setLong(1, node.getServices());
update.setLong(2, node.getStream()); update.setLong(2, node.getTime());
update.setLong(3, node.getTime());
update.setBytes(4, node.getIPv6()); update.setBytes(3, node.getIPv6());
update.setInt(5, node.getPort()); update.setInt(4, node.getPort());
update.setLong(5, node.getStream());
update.executeUpdate(); update.executeUpdate();
} else { } else {
insert.setBytes(1, node.getIPv6()); insert.setBytes(1, node.getIPv6());
@ -115,7 +117,8 @@ public class DatabaseRepository implements Inventory, AddressRepository {
List<InventoryVector> result = new LinkedList<>(); List<InventoryVector> result = new LinkedList<>();
try { try {
Statement stmt = getConnection().createStatement(); Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE Stream IN (" + join(streams) + ")"); ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE expires > " + now() +
" AND stream IN (" + join(streams) + ")");
while (rs.next()) { while (rs.next()) {
result.add(new InventoryVector(rs.getBytes("hash"))); result.add(new InventoryVector(rs.getBytes("hash")));
} }
@ -147,9 +150,9 @@ public class DatabaseRepository implements Inventory, AddressRepository {
@Override @Override
public void storeObject(int version, ObjectMessage object) { public void storeObject(int version, ObjectMessage object) {
try { try {
PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, version) VALUES (?, ?, ?, ?, ?)"); PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)");
InventoryVector iv = object.getInventoryVector(); InventoryVector iv = object.getInventoryVector();
LOG.error("Storing object " + iv); LOG.trace("Storing object " + iv);
ps.setBytes(1, iv.getHash()); ps.setBytes(1, iv.getHash());
ps.setLong(2, object.getStream()); ps.setLong(2, object.getStream());
ps.setLong(3, object.getExpiresTime()); ps.setLong(3, object.getExpiresTime());
@ -157,8 +160,11 @@ public class DatabaseRepository implements Inventory, AddressRepository {
object.write(os); object.write(os);
ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray());
ps.setBlob(4, is); ps.setBlob(4, is);
ps.setInt(5, version); ps.setLong(5, object.getType());
ps.setInt(6, version);
ps.executeUpdate(); ps.executeUpdate();
} catch (SQLException e) {
LOG.error("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e);
} catch (Exception e) { } catch (Exception e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
} }
@ -167,7 +173,8 @@ public class DatabaseRepository implements Inventory, AddressRepository {
@Override @Override
public void cleanup() { public void cleanup() {
try { try {
getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE time < " + (System.currentTimeMillis() / 1000)); // We delete only objects that expired 5 minutes ago or earlier, so we don't request objects we just deleted
getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300));
} catch (SQLException e) { } catch (SQLException e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);
} }

View File

@ -1,9 +1,9 @@
CREATE TABLE Node ( CREATE TABLE Node (
ip BINARY(16) NOT NULL, ip BINARY(16) NOT NULL,
port INT NOT NULL, port INT NOT NULL,
services BIGINT NOT NULL,
stream BIGINT NOT NULL, stream BIGINT NOT NULL,
services BIGINT NOT NULL,
time BIGINT NOT NULL, time BIGINT NOT NULL,
PRIMARY KEY (ip, port) PRIMARY KEY (ip, port, stream)
); );

View File

@ -3,5 +3,6 @@ CREATE TABLE Inventory (
stream BIGINT NOT NULL, stream BIGINT NOT NULL,
expires BIGINT NOT NULL, expires BIGINT NOT NULL,
data BLOB NOT NULL, data BLOB NOT NULL,
type BIGINT NOT NULL,
version INT NOT NULL version INT NOT NULL
); );