Fixes and improvements, SystemTest still broken
This commit is contained in:
@ -21,6 +21,7 @@ import ch.dissem.bitmessage.entity.payload.Pubkey;
|
||||
import ch.dissem.bitmessage.entity.payload.V3Pubkey;
|
||||
import ch.dissem.bitmessage.entity.payload.V4Pubkey;
|
||||
import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
|
||||
import ch.dissem.bitmessage.exception.ApplicationException;
|
||||
import ch.dissem.bitmessage.factory.Factory;
|
||||
import ch.dissem.bitmessage.ports.AddressRepository;
|
||||
import org.slf4j.Logger;
|
||||
@ -137,16 +138,14 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito
|
||||
try (
|
||||
Connection connection = config.getConnection();
|
||||
Statement stmt = connection.createStatement();
|
||||
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM Address " +
|
||||
ResultSet rs = stmt.executeQuery("SELECT '1' FROM Address " +
|
||||
"WHERE address='" + address.getAddress() + "'")
|
||||
) {
|
||||
if (rs.next()) {
|
||||
return rs.getInt(1) > 0;
|
||||
}
|
||||
return rs.next();
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,344 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Christian Basler
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ch.dissem.bitmessage.repository;
|
||||
|
||||
import ch.dissem.bitmessage.entity.Plaintext;
|
||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
|
||||
import ch.dissem.bitmessage.entity.valueobject.Label;
|
||||
import ch.dissem.bitmessage.exception.ApplicationException;
|
||||
import ch.dissem.bitmessage.ports.AbstractMessageRepository;
|
||||
import ch.dissem.bitmessage.ports.MessageRepository;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.sql.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static ch.dissem.bitmessage.repository.JdbcHelper.writeBlob;
|
||||
|
||||
public class JdbcMessageRepository extends AbstractMessageRepository implements MessageRepository {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JdbcMessageRepository.class);
|
||||
|
||||
private final JdbcConfig config;
|
||||
|
||||
public JdbcMessageRepository(JdbcConfig config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Label> findLabels(String where) {
|
||||
try (
|
||||
Connection connection = config.getConnection()
|
||||
) {
|
||||
return findLabels(connection, where);
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
private Label getLabel(ResultSet rs) throws SQLException {
|
||||
String typeName = rs.getString("type");
|
||||
Label.Type type = null;
|
||||
if (typeName != null) {
|
||||
type = Label.Type.valueOf(typeName);
|
||||
}
|
||||
Label label = new Label(rs.getString("label"), type, rs.getInt("color"));
|
||||
label.setId(rs.getLong("id"));
|
||||
|
||||
return label;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countUnread(Label label) {
|
||||
String where;
|
||||
if (label == null) {
|
||||
where = "";
|
||||
} else {
|
||||
where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ") AND ";
|
||||
}
|
||||
where += "id IN (SELECT message_id FROM Message_Label WHERE label_id IN (" +
|
||||
"SELECT id FROM Label WHERE type = '" + Label.Type.UNREAD.name() + "'))";
|
||||
|
||||
try (
|
||||
Connection connection = config.getConnection();
|
||||
Statement stmt = connection.createStatement();
|
||||
ResultSet rs = stmt.executeQuery("SELECT count(*) FROM Message WHERE " + where)
|
||||
) {
|
||||
if (rs.next()) {
|
||||
return rs.getInt(1);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Plaintext> find(String where) {
|
||||
List<Plaintext> result = new LinkedList<>();
|
||||
try (
|
||||
Connection connection = config.getConnection();
|
||||
Statement stmt = connection.createStatement();
|
||||
ResultSet rs = stmt.executeQuery(
|
||||
"SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try, conversation " +
|
||||
"FROM Message WHERE " + where)
|
||||
) {
|
||||
while (rs.next()) {
|
||||
byte[] iv = rs.getBytes("iv");
|
||||
InputStream data = rs.getBinaryStream("data");
|
||||
Plaintext.Type type = Plaintext.Type.valueOf(rs.getString("type"));
|
||||
Plaintext.Builder builder = Plaintext.readWithoutSignature(type, data);
|
||||
long id = rs.getLong("id");
|
||||
builder.id(id);
|
||||
builder.IV(InventoryVector.fromHash(iv));
|
||||
builder.from(getCtx().getAddressRepository().getAddress(rs.getString("sender")));
|
||||
builder.to(getCtx().getAddressRepository().getAddress(rs.getString("recipient")));
|
||||
builder.ackData(rs.getBytes("ack_data"));
|
||||
builder.sent(rs.getObject("sent", Long.class));
|
||||
builder.received(rs.getObject("received", Long.class));
|
||||
builder.status(Plaintext.Status.valueOf(rs.getString("status")));
|
||||
builder.ttl(rs.getLong("ttl"));
|
||||
builder.retries(rs.getInt("retries"));
|
||||
builder.nextTry(rs.getObject("next_try", Long.class));
|
||||
builder.conversation(rs.getObject("conversation", UUID.class));
|
||||
builder.labels(findLabels(connection,
|
||||
"id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord"));
|
||||
Plaintext message = builder.build();
|
||||
message.setInitialHash(rs.getBytes("initial_hash"));
|
||||
result.add(message);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private List<Label> findLabels(Connection connection, String where) {
|
||||
List<Label> result = new ArrayList<>();
|
||||
try (
|
||||
Statement stmt = connection.createStatement();
|
||||
ResultSet rs = stmt.executeQuery("SELECT id, label, type, color FROM Label WHERE " + where)
|
||||
) {
|
||||
while (rs.next()) {
|
||||
result.add(getLabel(rs));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(Plaintext message) {
|
||||
saveContactIfNecessary(message.getFrom());
|
||||
saveContactIfNecessary(message.getTo());
|
||||
|
||||
try (Connection connection = config.getConnection()) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
save(connection, message);
|
||||
updateParents(connection, message);
|
||||
updateLabels(connection, message);
|
||||
connection.commit();
|
||||
} catch (IOException | SQLException e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
} catch (IOException | SQLException e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void save(Connection connection, Plaintext message) throws IOException, SQLException {
|
||||
if (message.getId() == null) {
|
||||
insert(connection, message);
|
||||
} else {
|
||||
update(connection, message);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateLabels(Connection connection, Plaintext message) throws SQLException {
|
||||
// remove existing labels
|
||||
try (Statement stmt = connection.createStatement()) {
|
||||
stmt.executeUpdate("DELETE FROM Message_Label WHERE message_id=" + message.getId());
|
||||
}
|
||||
// save new labels
|
||||
try (PreparedStatement ps = connection.prepareStatement("INSERT INTO Message_Label VALUES (" +
|
||||
message.getId() + ", ?)")) {
|
||||
for (Label label : message.getLabels()) {
|
||||
ps.setLong(1, (Long) label.getId());
|
||||
ps.executeUpdate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateParents(Connection connection, Plaintext message) throws SQLException {
|
||||
if (message.getInventoryVector() == null || message.getParents().isEmpty()) {
|
||||
// There are no parents to save yet (they are saved in the extended data, that's enough for now)
|
||||
return;
|
||||
}
|
||||
// remove existing parents
|
||||
try (PreparedStatement ps = connection.prepareStatement("DELETE FROM Message_Parent WHERE child=?")) {
|
||||
ps.setBytes(1, message.getInitialHash());
|
||||
ps.executeUpdate();
|
||||
}
|
||||
byte[] childIV = message.getInventoryVector().getHash();
|
||||
// save new parents
|
||||
int order = 0;
|
||||
try (PreparedStatement ps = connection.prepareStatement("INSERT INTO Message_Parent VALUES (?, ?, ?, ?)")) {
|
||||
for (InventoryVector parentIV : message.getParents()) {
|
||||
Plaintext parent = getMessage(parentIV);
|
||||
mergeConversations(connection, parent.getConversationId(), message.getConversationId());
|
||||
order++;
|
||||
ps.setBytes(1, parentIV.getHash());
|
||||
ps.setBytes(2, childIV);
|
||||
ps.setInt(3, order); // FIXME: this might not be necessary
|
||||
ps.setObject(4, message.getConversationId());
|
||||
ps.executeUpdate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void insert(Connection connection, Plaintext message) throws SQLException, IOException {
|
||||
try (PreparedStatement ps = connection.prepareStatement(
|
||||
"INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, " +
|
||||
"status, initial_hash, ttl, retries, next_try, conversation) " +
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
Statement.RETURN_GENERATED_KEYS)
|
||||
) {
|
||||
ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash());
|
||||
ps.setString(2, message.getType().name());
|
||||
ps.setString(3, message.getFrom().getAddress());
|
||||
ps.setString(4, message.getTo() == null ? null : message.getTo().getAddress());
|
||||
writeBlob(ps, 5, message);
|
||||
ps.setBytes(6, message.getAckData());
|
||||
ps.setObject(7, message.getSent());
|
||||
ps.setObject(8, message.getReceived());
|
||||
ps.setString(9, message.getStatus() == null ? null : message.getStatus().name());
|
||||
ps.setBytes(10, message.getInitialHash());
|
||||
ps.setLong(11, message.getTTL());
|
||||
ps.setInt(12, message.getRetries());
|
||||
ps.setObject(13, message.getNextTry());
|
||||
ps.setObject(14, message.getConversationId());
|
||||
|
||||
ps.executeUpdate();
|
||||
// get generated id
|
||||
try (ResultSet rs = ps.getGeneratedKeys()) {
|
||||
rs.next();
|
||||
message.setId(rs.getLong(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void update(Connection connection, Plaintext message) throws SQLException, IOException {
|
||||
try (PreparedStatement ps = connection.prepareStatement(
|
||||
"UPDATE Message SET iv=?, type=?, sender=?, recipient=?, data=?, ack_data=?, sent=?, received=?, " +
|
||||
"status=?, initial_hash=?, ttl=?, retries=?, next_try=? " +
|
||||
"WHERE id=?")) {
|
||||
ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash());
|
||||
ps.setString(2, message.getType().name());
|
||||
ps.setString(3, message.getFrom().getAddress());
|
||||
ps.setString(4, message.getTo() == null ? null : message.getTo().getAddress());
|
||||
writeBlob(ps, 5, message);
|
||||
ps.setBytes(6, message.getAckData());
|
||||
ps.setObject(7, message.getSent());
|
||||
ps.setObject(8, message.getReceived());
|
||||
ps.setString(9, message.getStatus() == null ? null : message.getStatus().name());
|
||||
ps.setBytes(10, message.getInitialHash());
|
||||
ps.setLong(11, message.getTTL());
|
||||
ps.setInt(12, message.getRetries());
|
||||
ps.setObject(13, message.getNextTry());
|
||||
ps.setLong(14, (Long) message.getId());
|
||||
ps.executeUpdate();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(Plaintext message) {
|
||||
try (Connection connection = config.getConnection()) {
|
||||
connection.setAutoCommit(false);
|
||||
try (Statement stmt = connection.createStatement()) {
|
||||
stmt.executeUpdate("DELETE FROM Message_Label WHERE message_id = " + message.getId());
|
||||
stmt.executeUpdate("DELETE FROM Message WHERE id = " + message.getId());
|
||||
connection.commit();
|
||||
} catch (SQLException e) {
|
||||
try {
|
||||
connection.rollback();
|
||||
} catch (SQLException e1) {
|
||||
LOG.debug(e1.getMessage(), e);
|
||||
}
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<UUID> findConversations(Label label) {
|
||||
String where;
|
||||
if (label == null) {
|
||||
where = "id NOT IN (SELECT message_id FROM Message_Label)";
|
||||
} else {
|
||||
where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ")";
|
||||
}
|
||||
List<UUID> result = new LinkedList<>();
|
||||
try (
|
||||
Connection connection = config.getConnection();
|
||||
Statement stmt = connection.createStatement();
|
||||
ResultSet rs = stmt.executeQuery(
|
||||
"SELECT DISTINCT conversation FROM Message WHERE " + where)
|
||||
) {
|
||||
while (rs.next()) {
|
||||
result.add((UUID) rs.getObject(1));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces every occurrence of the source conversation ID with the target ID
|
||||
*
|
||||
* @param source ID of the conversation to be merged
|
||||
* @param target ID of the merge target
|
||||
*/
|
||||
private void mergeConversations(Connection connection, UUID source, UUID target) {
|
||||
try (
|
||||
PreparedStatement ps1 = connection.prepareStatement(
|
||||
"UPDATE Message SET conversation=? WHERE conversation=?");
|
||||
PreparedStatement ps2 = connection.prepareStatement(
|
||||
"UPDATE Message_Parent SET conversation=? WHERE conversation=?")
|
||||
) {
|
||||
ps1.setObject(1, target);
|
||||
ps1.setObject(2, source);
|
||||
ps1.executeUpdate();
|
||||
ps2.setObject(1, target);
|
||||
ps2.setObject(2, source);
|
||||
ps2.executeUpdate();
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,344 @@
|
||||
/*
|
||||
* Copyright 2015 Christian Basler
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ch.dissem.bitmessage.repository
|
||||
|
||||
import ch.dissem.bitmessage.entity.Plaintext
|
||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
|
||||
import ch.dissem.bitmessage.entity.valueobject.Label
|
||||
import ch.dissem.bitmessage.ports.AbstractMessageRepository
|
||||
import ch.dissem.bitmessage.ports.MessageRepository
|
||||
import ch.dissem.bitmessage.repository.JdbcHelper.writeBlob
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.IOException
|
||||
import java.sql.Connection
|
||||
import java.sql.ResultSet
|
||||
import java.sql.SQLException
|
||||
import java.sql.Statement
|
||||
import java.util.*
|
||||
|
||||
class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRepository(), MessageRepository {
|
||||
|
||||
override fun findLabels(where: String): List<Label> {
|
||||
try {
|
||||
config.connection.use {
|
||||
connection ->
|
||||
return findLabels(connection, where)
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
LOG.error(e.message, e)
|
||||
return ArrayList()
|
||||
}
|
||||
}
|
||||
|
||||
private fun getLabel(rs: ResultSet): Label {
|
||||
val typeName = rs.getString("type")
|
||||
val type = if (typeName == null) {
|
||||
null
|
||||
} else {
|
||||
Label.Type.valueOf(typeName)
|
||||
}
|
||||
val label = Label(rs.getString("label"), type, rs.getInt("color"))
|
||||
label.id = rs.getLong("id")
|
||||
|
||||
return label
|
||||
}
|
||||
|
||||
override fun countUnread(label: Label?): Int {
|
||||
val where = if (label == null) {
|
||||
""
|
||||
} else {
|
||||
"id IN (SELECT message_id FROM Message_Label WHERE label_id=${label.id}) AND "
|
||||
} + "id IN (SELECT message_id FROM Message_Label WHERE label_id IN (" +
|
||||
"SELECT id FROM Label WHERE type = '" + Label.Type.UNREAD.name + "'))"
|
||||
|
||||
try {
|
||||
config.connection.use { connection ->
|
||||
connection.createStatement().use { stmt ->
|
||||
stmt.executeQuery("SELECT count(*) FROM Message WHERE $where").use { rs ->
|
||||
if (rs.next()) {
|
||||
return rs.getInt(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
LOG.error(e.message, e)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
override fun find(where: String): List<Plaintext> {
|
||||
val result = LinkedList<Plaintext>()
|
||||
try {
|
||||
config.connection.use { connection ->
|
||||
connection.createStatement().use { stmt ->
|
||||
stmt.executeQuery(
|
||||
"""SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try, conversation
|
||||
FROM Message WHERE $where""").use { rs ->
|
||||
while (rs.next()) {
|
||||
val iv = rs.getBytes("iv")
|
||||
val data = rs.getBinaryStream("data")
|
||||
val type = Plaintext.Type.valueOf(rs.getString("type"))
|
||||
val builder = Plaintext.readWithoutSignature(type, data)
|
||||
val id = rs.getLong("id")
|
||||
builder.id(id)
|
||||
builder.IV(InventoryVector.fromHash(iv))
|
||||
builder.from(ctx.addressRepository.getAddress(rs.getString("sender"))!!)
|
||||
builder.to(ctx.addressRepository.getAddress(rs.getString("recipient")))
|
||||
builder.ackData(rs.getBytes("ack_data"))
|
||||
builder.sent(rs.getObject("sent", Long::class.java))
|
||||
builder.received(rs.getObject("received", Long::class.java))
|
||||
builder.status(Plaintext.Status.valueOf(rs.getString("status")))
|
||||
builder.ttl(rs.getLong("ttl"))
|
||||
builder.retries(rs.getInt("retries"))
|
||||
builder.nextTry(rs.getObject("next_try", Long::class.java))
|
||||
builder.conversation(rs.getObject("conversation", UUID::class.java))
|
||||
builder.labels(findLabels(connection,
|
||||
"id IN (SELECT label_id FROM Message_Label WHERE message_id=$id) ORDER BY ord"))
|
||||
val message = builder.build()
|
||||
message.initialHash = rs.getBytes("initial_hash")
|
||||
result.add(message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
LOG.error(e.message, e)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
private fun findLabels(connection: Connection, where: String): List<Label> {
|
||||
val result = ArrayList<Label>()
|
||||
try {
|
||||
connection.createStatement().use { stmt ->
|
||||
stmt.executeQuery("SELECT id, label, type, color FROM Label WHERE $where").use { rs ->
|
||||
while (rs.next()) {
|
||||
result.add(getLabel(rs))
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
LOG.error(e.message, e)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
override fun save(message: Plaintext) {
|
||||
saveContactIfNecessary(message.from)
|
||||
saveContactIfNecessary(message.to)
|
||||
|
||||
config.connection.use { connection ->
|
||||
try {
|
||||
connection.autoCommit = false
|
||||
save(connection, message)
|
||||
updateParents(connection, message)
|
||||
updateLabels(connection, message)
|
||||
connection.commit()
|
||||
} catch (e: Exception) {
|
||||
connection.rollback()
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun save(connection: Connection, message: Plaintext) {
|
||||
if (message.id == null) {
|
||||
insert(connection, message)
|
||||
} else {
|
||||
update(connection, message)
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateLabels(connection: Connection, message: Plaintext) {
|
||||
// remove existing labels
|
||||
connection.createStatement().use { stmt -> stmt.executeUpdate("DELETE FROM Message_Label WHERE message_id=${message.id!!}") }
|
||||
// save new labels
|
||||
connection.prepareStatement("INSERT INTO Message_Label VALUES (${message.id}, ?)").use { ps ->
|
||||
for (label in message.labels) {
|
||||
ps.setLong(1, (label.id as Long?)!!)
|
||||
ps.executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateParents(connection: Connection, message: Plaintext) {
|
||||
if (message.inventoryVector == null || message.parents.isEmpty()) {
|
||||
// There are no parents to save yet (they are saved in the extended data, that's enough for now)
|
||||
return
|
||||
}
|
||||
// remove existing parents
|
||||
connection.prepareStatement("DELETE FROM Message_Parent WHERE child=?").use { ps ->
|
||||
ps.setBytes(1, message.initialHash)
|
||||
ps.executeUpdate()
|
||||
}
|
||||
val childIV = message.inventoryVector!!.hash
|
||||
// save new parents
|
||||
var order = 0
|
||||
connection.prepareStatement("INSERT INTO Message_Parent VALUES (?, ?, ?, ?)").use { ps ->
|
||||
for (parentIV in message.parents) {
|
||||
val parent = getMessage(parentIV)
|
||||
mergeConversations(connection, parent!!.conversationId, message.conversationId)
|
||||
order++
|
||||
ps.setBytes(1, parentIV.hash)
|
||||
ps.setBytes(2, childIV)
|
||||
ps.setInt(3, order) // FIXME: this might not be necessary
|
||||
ps.setObject(4, message.conversationId)
|
||||
ps.executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun insert(connection: Connection, message: Plaintext) {
|
||||
connection.prepareStatement(
|
||||
"INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, " +
|
||||
"status, initial_hash, ttl, retries, next_try, conversation) " +
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
Statement.RETURN_GENERATED_KEYS).use { ps ->
|
||||
ps.setBytes(1, if (message.inventoryVector == null) null else message.inventoryVector!!.hash)
|
||||
ps.setString(2, message.type.name)
|
||||
ps.setString(3, message.from.address)
|
||||
ps.setString(4, if (message.to == null) null else message.to!!.address)
|
||||
writeBlob(ps, 5, message)
|
||||
ps.setBytes(6, message.ackData)
|
||||
ps.setObject(7, message.sent)
|
||||
ps.setObject(8, message.received)
|
||||
ps.setString(9, message.status.name)
|
||||
ps.setBytes(10, message.initialHash)
|
||||
ps.setLong(11, message.ttl)
|
||||
ps.setInt(12, message.retries)
|
||||
ps.setObject(13, message.nextTry)
|
||||
ps.setObject(14, message.conversationId)
|
||||
|
||||
ps.executeUpdate()
|
||||
// get generated id
|
||||
ps.generatedKeys.use { rs ->
|
||||
rs.next()
|
||||
message.id = rs.getLong(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(SQLException::class, IOException::class)
|
||||
private fun update(connection: Connection, message: Plaintext) {
|
||||
connection.prepareStatement(
|
||||
"UPDATE Message SET iv=?, type=?, sender=?, recipient=?, data=?, ack_data=?, sent=?, received=?, " +
|
||||
"status=?, initial_hash=?, ttl=?, retries=?, next_try=? " +
|
||||
"WHERE id=?").use { ps ->
|
||||
ps.setBytes(1, if (message.inventoryVector == null) null else message.inventoryVector!!.hash)
|
||||
ps.setString(2, message.type.name)
|
||||
ps.setString(3, message.from.address)
|
||||
ps.setString(4, if (message.to == null) null else message.to!!.address)
|
||||
writeBlob(ps, 5, message)
|
||||
ps.setBytes(6, message.ackData)
|
||||
ps.setObject(7, message.sent)
|
||||
ps.setObject(8, message.received)
|
||||
ps.setString(9, message.status.name)
|
||||
ps.setBytes(10, message.initialHash)
|
||||
ps.setLong(11, message.ttl)
|
||||
ps.setInt(12, message.retries)
|
||||
ps.setObject(13, message.nextTry)
|
||||
ps.setLong(14, (message.id as Long?)!!)
|
||||
ps.executeUpdate()
|
||||
}
|
||||
}
|
||||
|
||||
override fun remove(message: Plaintext) {
|
||||
try {
|
||||
config.connection.use { connection ->
|
||||
connection.autoCommit = false
|
||||
try {
|
||||
connection.createStatement().use { stmt ->
|
||||
stmt.executeUpdate("DELETE FROM Message_Label WHERE message_id = " + message.id!!)
|
||||
stmt.executeUpdate("DELETE FROM Message WHERE id = " + message.id!!)
|
||||
connection.commit()
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
try {
|
||||
connection.rollback()
|
||||
} catch (e1: SQLException) {
|
||||
LOG.debug(e1.message, e)
|
||||
}
|
||||
|
||||
LOG.error(e.message, e)
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
LOG.error(e.message, e)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override fun findConversations(label: Label?): List<UUID> {
|
||||
val where: String
|
||||
if (label == null) {
|
||||
where = "id NOT IN (SELECT message_id FROM Message_Label)"
|
||||
} else {
|
||||
where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.id + ")"
|
||||
}
|
||||
val result = LinkedList<UUID>()
|
||||
try {
|
||||
config.connection.use { connection ->
|
||||
connection.createStatement().use { stmt ->
|
||||
stmt.executeQuery(
|
||||
"SELECT DISTINCT conversation FROM Message WHERE " + where).use { rs ->
|
||||
while (rs.next()) {
|
||||
result.add(rs.getObject(1) as UUID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
LOG.error(e.message, e)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces every occurrence of the source conversation ID with the target ID
|
||||
|
||||
* @param source ID of the conversation to be merged
|
||||
* *
|
||||
* @param target ID of the merge target
|
||||
*/
|
||||
private fun mergeConversations(connection: Connection, source: UUID, target: UUID) {
|
||||
try {
|
||||
connection.prepareStatement(
|
||||
"UPDATE Message SET conversation=? WHERE conversation=?").use { ps1 ->
|
||||
connection.prepareStatement(
|
||||
"UPDATE Message_Parent SET conversation=? WHERE conversation=?").use { ps2 ->
|
||||
ps1.setObject(1, target)
|
||||
ps1.setObject(2, source)
|
||||
ps1.executeUpdate()
|
||||
ps2.setObject(1, target)
|
||||
ps2.setObject(2, source)
|
||||
ps2.executeUpdate()
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
LOG.error(e.message, e)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val LOG = LoggerFactory.getLogger(JdbcMessageRepository::class.java)
|
||||
}
|
||||
}
|
@ -105,9 +105,9 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork
|
||||
"nonce_trials_per_byte, extra_bytes, expiration_time, message_id) " +
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)")
|
||||
) {
|
||||
ps.setBytes(1, cryptography().getInitialHash(item.getObject()));
|
||||
writeBlob(ps, 2, item.getObject());
|
||||
ps.setLong(3, item.getObject().getVersion());
|
||||
ps.setBytes(1, cryptography().getInitialHash(item.getObjectMessage()));
|
||||
writeBlob(ps, 2, item.getObjectMessage());
|
||||
ps.setLong(3, item.getObjectMessage().getVersion());
|
||||
ps.setLong(4, item.getNonceTrialsPerByte());
|
||||
ps.setLong(5, item.getExtraBytes());
|
||||
|
||||
@ -120,7 +120,7 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork
|
||||
}
|
||||
ps.executeUpdate();
|
||||
} catch (IOException | SQLException e) {
|
||||
LOG.debug("Error storing object of type " + item.getObject().getPayload().getClass().getSimpleName(), e);
|
||||
LOG.debug("Error storing object of type " + item.getObjectMessage().getPayload().getClass().getSimpleName(), e);
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ class JdbcProofOfWorkRepositoryTest : TestBase() {
|
||||
fun `ensure item can be retrieved`() {
|
||||
val item = repo.getItem(initialHash1)
|
||||
assertThat(item, notNullValue())
|
||||
assertThat<ObjectPayload>(item.`object`.payload, instanceOf<ObjectPayload>(GetPubkey::class.java))
|
||||
assertThat<ObjectPayload>(item.objectMessage.payload, instanceOf<ObjectPayload>(GetPubkey::class.java))
|
||||
assertThat(item.nonceTrialsPerByte, `is`(1000L))
|
||||
assertThat(item.extraBytes, `is`(1000L))
|
||||
}
|
||||
@ -138,7 +138,7 @@ class JdbcProofOfWorkRepositoryTest : TestBase() {
|
||||
fun `ensure ack item can be retrieved`() {
|
||||
val item = repo.getItem(initialHash2)
|
||||
assertThat(item, notNullValue())
|
||||
assertThat<ObjectPayload>(item.`object`.payload, instanceOf<ObjectPayload>(GenericPayload::class.java))
|
||||
assertThat<ObjectPayload>(item.objectMessage.payload, instanceOf<ObjectPayload>(GenericPayload::class.java))
|
||||
assertThat(item.nonceTrialsPerByte, `is`(1000L))
|
||||
assertThat(item.extraBytes, `is`(1000L))
|
||||
assertThat(item.expirationTime, not<Number>(0))
|
||||
|
@ -21,16 +21,11 @@ import ch.dissem.bitmessage.ports.MultiThreadedPOWEngine
|
||||
import ch.dissem.bitmessage.utils.Singleton
|
||||
import ch.dissem.bitmessage.utils.TestUtils.mockedInternalContext
|
||||
|
||||
/**
|
||||
* Created by chris on 20.07.15.
|
||||
*/
|
||||
open class TestBase {
|
||||
companion object {
|
||||
init {
|
||||
val security = BouncyCryptography()
|
||||
Singleton.initialize(security)
|
||||
mockedInternalContext(
|
||||
cryptography = security,
|
||||
cryptography = BouncyCryptography(),
|
||||
proofOfWorkEngine = MultiThreadedPOWEngine()
|
||||
)
|
||||
}
|
||||
|
Reference in New Issue
Block a user