Added tests for all repositories, fixed some bugs and made database configurable

This commit is contained in:
2015-06-05 13:43:56 +02:00
parent 274c16b748
commit f76864eebd
25 changed files with 860 additions and 184 deletions

View File

@ -34,12 +34,13 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/**
* Created by chris on 23.04.15.
*/
public class JdbcAddressRepository extends JdbcHelper implements AddressRepository {
private static final Logger LOG = LoggerFactory.getLogger(JdbcAddressRepository.class);
public JdbcAddressRepository(JdbcConfig config) {
super(config);
}
@Override
public BitmessageAddress findContact(byte[] ripeOrTag) {
for (BitmessageAddress address : find("public_key is null")) {
@ -81,8 +82,8 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito
private List<BitmessageAddress> find(String where) {
List<BitmessageAddress> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT address, alias, public_key, private_key, subscribed FROM Address WHERE " + where);
while (rs.next()) {
BitmessageAddress address;
@ -114,8 +115,8 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito
}
private boolean exists(BitmessageAddress address) {
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM Address WHERE address='" + address.getAddress() + "'");
rs.next();
return rs.getInt(1) > 0;
@ -139,23 +140,26 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito
}
private void update(BitmessageAddress address) throws IOException, SQLException {
PreparedStatement ps = getConnection().prepareStatement(
try(Connection connection = config.getConnection()){
PreparedStatement ps = connection.prepareStatement(
"UPDATE Address SET alias=?, public_key=?, private_key=? WHERE address=?");
ps.setString(1, address.getAlias());
writePubkey(ps, 2, address.getPubkey());
writeBlob(ps, 3, address.getPrivateKey());
ps.setString(4, address.getAddress());
ps.executeUpdate();
}
}}
private void insert(BitmessageAddress address) throws IOException, SQLException {
PreparedStatement ps = getConnection().prepareStatement(
"INSERT INTO Address (address, alias, public_key, private_key) VALUES (?, ?, ?, ?)");
ps.setString(1, address.getAddress());
ps.setString(2, address.getAlias());
writePubkey(ps, 3, address.getPubkey());
writeBlob(ps, 4, address.getPrivateKey());
ps.executeUpdate();
try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO Address (address, alias, public_key, private_key) VALUES (?, ?, ?, ?)");
ps.setString(1, address.getAddress());
ps.setString(2, address.getAlias());
writePubkey(ps, 3, address.getPubkey());
writeBlob(ps, 4, address.getPrivateKey());
ps.executeUpdate();
}
}
protected void writePubkey(PreparedStatement ps, int parameterIndex, Pubkey data) throws SQLException, IOException {
@ -171,8 +175,8 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito
@Override
public void remove(BitmessageAddress address) {
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
stmt.executeUpdate("DELETE FROM Address WHERE address = '" + address.getAddress() + "'");
} catch (SQLException e) {
LOG.error(e.getMessage(), e);

View File

@ -0,0 +1,55 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.repository;
import org.flywaydb.core.Flyway;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* The base configuration for all JDBC based repositories. You should only make one instance,
* as flyway initializes/updates the database at object creation.
*/
public class JdbcConfig {
protected final Flyway flyway;
private final String dbUrl;
private final String dbUser;
private final String dbPassword;
public JdbcConfig(String dbUrl, String dbUser, String dbPassword) {
this.dbUrl = dbUrl;
this.dbUser = dbUser;
this.dbPassword = dbPassword;
this.flyway = new Flyway();
flyway.setDataSource(dbUrl, dbUser, dbPassword);
flyway.migrate();
}
public JdbcConfig() {
this("jdbc:h2:~/jabit", "sa", null);
}
public Connection getConnection() {
try {
return DriverManager.getConnection(dbUrl, dbUser, dbPassword);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -17,6 +17,7 @@
package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.entity.Streamable;
import ch.dissem.bitmessage.entity.payload.ObjectType;
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,21 +27,54 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.sql.*;
import static ch.dissem.bitmessage.utils.Strings.hex;
/**
* Helper class that does Flyway migration, provides JDBC connections and some helper methods.
*/
abstract class JdbcHelper {
private static final Logger LOG = LoggerFactory.getLogger(JdbcHelper.class);
private static final String DB_URL = "jdbc:h2:~/jabit";
private static final String DB_USER = "sa";
private static final String DB_PWD = null;
protected final JdbcConfig config;
protected JdbcHelper(JdbcConfig config) {
this.config = config;
}
static {
Flyway flyway = new Flyway();
flyway.setDataSource(DB_URL, DB_USER, DB_PWD);
flyway.migrate();
public static StringBuilder join(long... objects) {
StringBuilder streamList = new StringBuilder();
for (int i = 0; i < objects.length; i++) {
if (i > 0) streamList.append(", ");
streamList.append(objects[i]);
}
return streamList;
}
public static StringBuilder join(byte[]... objects) {
StringBuilder streamList = new StringBuilder();
for (int i = 0; i < objects.length; i++) {
if (i > 0) streamList.append(", ");
streamList.append(hex(objects[i]));
}
return streamList;
}
public static StringBuilder join(ObjectType... types) {
StringBuilder streamList = new StringBuilder();
for (int i = 0; i < types.length; i++) {
if (i > 0) streamList.append(", ");
streamList.append(types[i].getNumber());
}
return streamList;
}
public static StringBuilder join(Enum... types) {
StringBuilder streamList = new StringBuilder();
for (int i = 0; i < types.length; i++) {
if (i > 0) streamList.append(", ");
streamList.append('\'').append(types[i].name()).append('\'');
}
return streamList;
}
protected void writeBlob(PreparedStatement ps, int parameterIndex, Streamable data) throws SQLException, IOException {
@ -53,12 +87,4 @@ abstract class JdbcHelper {
ps.setBlob(parameterIndex, (Blob) null);
}
}
protected Connection getConnection() {
try {
return DriverManager.getConnection(DB_URL, DB_USER, DB_PWD);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -28,20 +28,20 @@ import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import static ch.dissem.bitmessage.utils.Strings.join;
import static ch.dissem.bitmessage.utils.UnixTime.now;
/**
* Created by chris on 24.04.15.
*/
public class JdbcInventory extends JdbcHelper implements Inventory {
private static final Logger LOG = LoggerFactory.getLogger(JdbcInventory.class);
public JdbcInventory(JdbcConfig config) {
super(config);
}
@Override
public List<InventoryVector> getInventory(long... streams) {
List<InventoryVector> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE expires > " + now() +
" AND stream IN (" + join(streams) + ")");
while (rs.next()) {
@ -54,8 +54,8 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
}
private List<InventoryVector> getFullInventory(long... streams) {
List<InventoryVector> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE stream IN (" + join(streams) + ")");
while (rs.next()) {
result.add(new InventoryVector(rs.getBytes("hash")));
@ -74,8 +74,8 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
@Override
public ObjectMessage getObject(InventoryVector vector) {
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT data, version FROM Inventory WHERE hash = X'" + vector + "'");
if (rs.next()) {
Blob data = rs.getBlob("data");
@ -92,7 +92,7 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
@Override
public List<ObjectMessage> getObjects(long stream, long version, ObjectType... types) {
try {
try (Connection connection = config.getConnection()) {
StringBuilder query = new StringBuilder("SELECT data, version FROM Inventory WHERE 1=1");
if (stream > 0) {
query.append(" AND stream = ").append(stream);
@ -103,7 +103,7 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
if (types.length > 0) {
query.append(" AND type IN (").append(join(types)).append(")");
}
Statement stmt = getConnection().createStatement();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query.toString());
List<ObjectMessage> result = new LinkedList<>();
while (rs.next()) {
@ -119,8 +119,8 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
@Override
public void storeObject(ObjectMessage object) {
try {
PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)");
try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)");
InventoryVector iv = object.getInventoryVector();
LOG.trace("Storing object " + iv);
ps.setBytes(1, iv.getHash());
@ -139,9 +139,9 @@ public class JdbcInventory extends JdbcHelper implements Inventory {
@Override
public void cleanup() {
try {
try (Connection connection = config.getConnection()) {
// We delete only objects that expired 5 minutes ago or earlier, so we don't request objects we just deleted
getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300));
connection.createStatement().executeUpdate("DELETE FROM Inventory WHERE expires < " + (now() - 300));
} catch (SQLException e) {
LOG.debug(e.getMessage(), e);
}

View File

@ -21,7 +21,6 @@ import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.ports.MessageRepository;
import ch.dissem.bitmessage.utils.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,17 +36,21 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
private InternalContext ctx;
public JdbcMessageRepository(JdbcConfig config) {
super(config);
}
@Override
public List<Label> getLabels() {
List<Label> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT id, label, type, color FROM Label ORDER BY ord");
while (rs.next()) {
result.add(getLabel(rs));
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
return result;
}
@ -67,9 +70,9 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
@Override
public List<Label> getLabels(Label.Type... types) {
List<Label> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery("SELECT id, label, type, color FROM Label WHERE type IN (" + Strings.join(types) +
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT id, label, type, color FROM Label WHERE type IN (" + join(types) +
") ORDER BY ord");
while (rs.next()) {
result.add(getLabel(rs));
@ -97,8 +100,8 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
private List<Plaintext> find(String where) {
List<Plaintext> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT id, sender, recipient, data, sent, received, status FROM Message WHERE " + where);
while (rs.next()) {
Blob data = rs.getBlob("data");
@ -110,7 +113,7 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
builder.sent(rs.getLong("sent"));
builder.received(rs.getLong("received"));
builder.status(Plaintext.Status.valueOf(rs.getString("status")));
builder.labels(findLabels(id));
builder.labels(findLabels(connection, id));
result.add(builder.build());
}
} catch (IOException | SQLException e) {
@ -119,10 +122,10 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
return result;
}
private Collection<Label> findLabels(long messageId) {
private Collection<Label> findLabels(Connection connection, long messageId) {
List<Label> result = new ArrayList<>();
try {
Statement stmt = getConnection().createStatement();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT id, label, type, color FROM Label WHERE id IN (SELECT label_id FROM Message_Label WHERE message_id=" + messageId + ")");
while (rs.next()) {
result.add(getLabel(rs));
@ -146,34 +149,37 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
}
}
Connection connection = getConnection();
try {
connection.setAutoCommit(false);
// save message
if (message.getId() == null) {
insert(connection, message);
// remove existing labels
Statement stmt = connection.createStatement();
stmt.executeUpdate("DELETE FROM Message_Label WHERE message_id=" + message.getId());
} else {
update(connection, message);
}
// save labels
PreparedStatement ps = connection.prepareStatement("INSERT INTO Message_Label VALUES (" + message.getId() + ", ?)");
for (Label label : message.getLabels()) {
ps.setLong(1, (Long) label.getId());
ps.executeUpdate();
}
connection.commit();
} catch (IOException | SQLException e) {
try (Connection connection = config.getConnection()) {
try {
connection.rollback();
} catch (SQLException e1) {
LOG.debug(e1.getMessage(), e);
connection.setAutoCommit(false);
// save message
if (message.getId() == null) {
insert(connection, message);
// remove existing labels
Statement stmt = connection.createStatement();
stmt.executeUpdate("DELETE FROM Message_Label WHERE message_id=" + message.getId());
} else {
update(connection, message);
}
// save labels
PreparedStatement ps = connection.prepareStatement("INSERT INTO Message_Label VALUES (" + message.getId() + ", ?)");
for (Label label : message.getLabels()) {
ps.setLong(1, (Long) label.getId());
ps.executeUpdate();
}
connection.commit();
} catch (IOException | SQLException e) {
try {
connection.rollback();
} catch (SQLException e1) {
LOG.debug(e1.getMessage(), e);
}
throw new RuntimeException(e);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@ -204,18 +210,15 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
"UPDATE Message SET sent=?, received=?, status=? WHERE id=?");
ps.setLong(1, message.getSent());
ps.setLong(2, message.getReceived());
ps.setLong(3, (Long) message.getId());
if (message.getStatus() != null)
ps.setString(3, message.getStatus().name());
else
ps.setString(3, null);
ps.setString(3, message.getStatus() != null ? message.getStatus().name() : null);
ps.setLong(4, (Long) message.getId());
ps.executeUpdate();
}
@Override
public void remove(Plaintext message) {
try {
Statement stmt = getConnection().createStatement();
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
stmt.executeUpdate("DELETE FROM Message WHERE id = " + message.getId());
} catch (SQLException e) {
LOG.error(e.getMessage(), e);

View File

@ -25,28 +25,27 @@ import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import static ch.dissem.bitmessage.utils.Strings.join;
/**
* Created by chris on 24.04.15.
*/
public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(JdbcNodeRegistry.class);
public JdbcNodeRegistry(JdbcConfig config) {
super(config);
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
List<NetworkAddress> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE stream IN (" + join(streams) + ")");
try (Connection connection = config.getConnection()) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE stream IN (" + join(streams) + ") LIMIT " + limit);
while (rs.next()) {
// result.add(new NetworkAddress.Builder()
// .ipv6(rs.getBytes("ip"))
// .port(rs.getByte("port"))
// .services(rs.getLong("services"))
// .stream(rs.getLong("stream"))
// .time(rs.getLong("time"))
// .build());
result.add(new NetworkAddress.Builder()
.ipv6(rs.getBytes("ip"))
.port(rs.getInt("port"))
.services(rs.getLong("services"))
.stream(rs.getLong("stream"))
.time(rs.getLong("time"))
.build());
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
@ -60,8 +59,7 @@ public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry {
@Override
public void offerAddresses(List<NetworkAddress> addresses) {
try {
Connection connection = getConnection();
try (Connection connection = config.getConnection()) {
PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ? AND stream = ?");
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)");