Some code to work with conversations

This commit is contained in:
2017-03-13 22:49:40 +01:00
parent 10a45cc79c
commit d9090eb70c
16 changed files with 573 additions and 39 deletions

View File

@ -31,6 +31,7 @@ import java.sql.*;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import static ch.dissem.bitmessage.repository.JdbcHelper.writeBlob;
@ -99,7 +100,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
Connection connection = config.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(
"SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try " +
"SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try, conversation " +
"FROM Message WHERE " + where)
) {
while (rs.next()) {
@ -119,6 +120,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
builder.ttl(rs.getLong("ttl"));
builder.retries(rs.getInt("retries"));
builder.nextTry(rs.getLong("next_try"));
builder.conversation((UUID) rs.getObject("conversation"));
builder.labels(findLabels(connection,
"id IN (SELECT label_id FROM Message_Label WHERE message_id=" + id + ") ORDER BY ord"));
Plaintext message = builder.build();
@ -155,6 +157,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
try {
connection.setAutoCommit(false);
save(connection, message);
updateParents(connection, message);
updateLabels(connection, message);
connection.commit();
} catch (IOException | SQLException e) {
@ -189,11 +192,38 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
}
}
private void updateParents(Connection connection, Plaintext message) throws SQLException {
if (message.getInventoryVector() == null || message.getParents().isEmpty()) {
// There are no parents to save yet (they are saved in the extended data, that's enough for now)
return;
}
// remove existing parents
try (PreparedStatement ps = connection.prepareStatement("DELETE FROM Message_Parent WHERE child=?")) {
ps.setBytes(1, message.getInitialHash());
ps.executeUpdate();
}
byte[] childIV = message.getInventoryVector().getHash();
// save new parents
int order = 0;
for (InventoryVector parentIV : message.getParents()) {
Plaintext parent = getMessage(parentIV);
mergeConversations(connection, parent.getConversationId(), message.getConversationId());
order++;
try (PreparedStatement ps = connection.prepareStatement("INSERT INTO Message_Parent VALUES (?, ?, ?, ?)")) {
ps.setBytes(1, parentIV.getHash());
ps.setBytes(2, childIV);
ps.setInt(3, order); // FIXME: this might not be necessary
ps.setObject(4, message.getConversationId());
ps.executeUpdate();
}
}
}
private void insert(Connection connection, Plaintext message) throws SQLException, IOException {
try (PreparedStatement ps = connection.prepareStatement(
"INSERT INTO Message (iv, type, sender, recipient, data, ack_data, sent, received, " +
"status, initial_hash, ttl, retries, next_try) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"status, initial_hash, ttl, retries, next_try, conversation) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
Statement.RETURN_GENERATED_KEYS)
) {
ps.setBytes(1, message.getInventoryVector() == null ? null : message.getInventoryVector().getHash());
@ -209,6 +239,7 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
ps.setLong(11, message.getTTL());
ps.setInt(12, message.getRetries());
ps.setObject(13, message.getNextTry());
ps.setObject(14, message.getConversationId());
ps.executeUpdate();
// get generated id
@ -262,4 +293,52 @@ public class JdbcMessageRepository extends AbstractMessageRepository implements
LOG.error(e.getMessage(), e);
}
}
@Override
public List<UUID> findConversations(Label label) {
String where;
if (label == null) {
where = "id NOT IN (SELECT message_id FROM Message_Label)";
} else {
where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ")";
}
List<UUID> result = new LinkedList<>();
try (
Connection connection = config.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(
"SELECT DISTINCT conversation FROM Message WHERE " + where)
) {
while (rs.next()) {
result.add((UUID) rs.getObject(1));
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return result;
}
/**
* Replaces every occurrence of the source conversation ID with the target ID
*
* @param source ID of the conversation to be merged
* @param target ID of the merge target
*/
private void mergeConversations(Connection connection, UUID source, UUID target) {
try (
PreparedStatement ps1 = connection.prepareStatement(
"UPDATE Message SET conversation=? WHERE conversation=?");
PreparedStatement ps2 = connection.prepareStatement(
"UPDATE Message_Parent SET conversation=? WHERE conversation=?")
) {
ps1.setObject(1, target);
ps1.setObject(2, source);
ps1.executeUpdate();
ps2.setObject(1, target);
ps2.setObject(2, source);
ps2.executeUpdate();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
}

View File

@ -69,6 +69,19 @@ public class JdbcNodeRegistry extends JdbcHelper implements NodeRegistry {
}
}
@Override
public void clear() {
try (
Connection connection = config.getConnection();
PreparedStatement ps = connection.prepareStatement(
"DELETE FROM Node")
) {
ps.executeUpdate();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
List<NetworkAddress> result = new LinkedList<>();

View File

@ -0,0 +1,11 @@
ALTER TABLE Message ADD COLUMN conversation UUID NOT NULL DEFAULT RANDOM_UUID();
CREATE TABLE Message_Parent (
parent BINARY(64) NOT NULL,
child BINARY(64) NOT NULL,
pos INT NOT NULL,
conversation UUID,
PRIMARY KEY (parent, child),
FOREIGN KEY (child) REFERENCES Message (iv)
);

View File

@ -21,18 +21,23 @@ import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.ExtendedEncoding;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.entity.valueobject.PrivateKey;
import ch.dissem.bitmessage.entity.valueobject.extended.Message;
import ch.dissem.bitmessage.ports.AddressRepository;
import ch.dissem.bitmessage.ports.MessageRepository;
import ch.dissem.bitmessage.utils.TestUtils;
import ch.dissem.bitmessage.utils.UnixTime;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
import static ch.dissem.bitmessage.entity.payload.Pubkey.Feature.DOES_ACK;
@ -49,6 +54,7 @@ public class JdbcMessageRepositoryTest extends TestBase {
private MessageRepository repo;
private Label inbox;
private Label sent;
private Label drafts;
private Label unread;
@ -59,9 +65,9 @@ public class JdbcMessageRepositoryTest extends TestBase {
AddressRepository addressRepo = new JdbcAddressRepository(config);
repo = new JdbcMessageRepository(config);
new InternalContext(new BitmessageContext.Builder()
.cryptography(cryptography())
.addressRepo(addressRepo)
.messageRepo(repo)
.cryptography(cryptography())
.addressRepo(addressRepo)
.messageRepo(repo)
);
BitmessageAddress tmp = new BitmessageAddress(new PrivateKey(false, 1, 1000, 1000, DOES_ACK));
@ -75,6 +81,7 @@ public class JdbcMessageRepositoryTest extends TestBase {
addressRepo.save(identity);
inbox = repo.getLabels(Label.Type.INBOX).get(0);
sent = repo.getLabels(Label.Type.SENT).get(0);
drafts = repo.getLabels(Label.Type.DRAFT).get(0);
unread = repo.getLabels(Label.Type.UNREAD).get(0);
@ -163,12 +170,12 @@ public class JdbcMessageRepositoryTest extends TestBase {
@Test
public void testSave() throws Exception {
Plaintext message = new Plaintext.Builder(MSG)
.IV(TestUtils.randomInventoryVector())
.from(identity)
.to(contactA)
.message("Subject", "Message")
.status(Plaintext.Status.DOING_PROOF_OF_WORK)
.build();
.IV(TestUtils.randomInventoryVector())
.from(identity)
.to(contactA)
.message("Subject", "Message")
.status(Plaintext.Status.DOING_PROOF_OF_WORK)
.build();
repo.save(message);
assertNotNull(message.getId());
@ -207,19 +214,19 @@ public class JdbcMessageRepositoryTest extends TestBase {
@Test
public void ensureUnacknowledgedMessagesAreFoundForResend() throws Exception {
Plaintext message = new Plaintext.Builder(MSG)
.IV(TestUtils.randomInventoryVector())
.from(identity)
.to(contactA)
.message("Subject", "Message")
.status(Plaintext.Status.SENT)
.ttl(2)
.build();
.IV(TestUtils.randomInventoryVector())
.from(identity)
.to(contactA)
.message("Subject", "Message")
.status(Plaintext.Status.SENT)
.ttl(2)
.build();
message.updateNextTry();
assertThat(message.getRetries(), is(1));
assertThat(message.getNextTry(), greaterThan(UnixTime.now()));
assertThat(message.getNextTry(), lessThanOrEqualTo(UnixTime.now(+2)));
repo.save(message);
Thread.sleep(4100);
Thread.sleep(4100); // somewhat longer than 2*TTL
List<Plaintext> messagesToResend = repo.findMessagesToResend();
assertThat(messagesToResend, hasSize(1));
@ -231,14 +238,105 @@ public class JdbcMessageRepositoryTest extends TestBase {
assertThat(messagesToResend, empty());
}
private void addMessage(BitmessageAddress from, BitmessageAddress to, Plaintext.Status status, Label... labels) {
@Test
public void ensureParentsAreSaved() {
Plaintext parent = storeConversation();
List<Plaintext> responses = repo.findResponses(parent);
assertThat(responses, hasSize(2));
assertThat(responses, hasItem(hasMessage("Re: new test", "Nice!")));
assertThat(responses, hasItem(hasMessage("Re: new test", "PS: it did work!")));
}
@Test
public void ensureConversationCanBeRetrieved() {
Plaintext root = storeConversation();
List<UUID> conversations = repo.findConversations(inbox);
assertThat(conversations, hasSize(2));
assertThat(conversations, hasItem(root.getConversationId()));
}
private Plaintext addMessage(BitmessageAddress from, BitmessageAddress to, Plaintext.Status status, Label... labels) {
ExtendedEncoding content = new Message.Builder()
.subject("Subject")
.body("Message")
.build();
return addMessage(from, to, content, status, labels);
}
private Plaintext addMessage(BitmessageAddress from, BitmessageAddress to,
ExtendedEncoding content, Plaintext.Status status, Label... labels) {
Plaintext message = new Plaintext.Builder(MSG)
.from(from)
.to(to)
.message("Subject", "Message")
.status(status)
.labels(Arrays.asList(labels))
.build();
.IV(TestUtils.randomInventoryVector())
.from(from)
.to(to)
.message(content)
.status(status)
.labels(Arrays.asList(labels))
.build();
repo.save(message);
return message;
}
private Plaintext storeConversation() {
Plaintext older = addMessage(identity, contactA,
new Message.Builder()
.subject("hey there")
.body("does it work?")
.build(),
Plaintext.Status.SENT, sent);
Plaintext root = addMessage(identity, contactA,
new Message.Builder()
.subject("new test")
.body("There's a new test in town!")
.build(),
Plaintext.Status.SENT, sent);
addMessage(contactA, identity,
new Message.Builder()
.subject("Re: new test")
.body("Nice!")
.addParent(root)
.build(),
Plaintext.Status.RECEIVED, inbox);
addMessage(contactA, identity,
new Message.Builder()
.subject("Re: new test")
.body("PS: it did work!")
.addParent(root)
.addParent(older)
.build(),
Plaintext.Status.RECEIVED, inbox);
return repo.getMessage(root.getId());
}
private Matcher<Plaintext> hasMessage(String subject, String body) {
return new BaseMatcher<Plaintext>() {
@Override
public void describeTo(Description description) {
description.appendText("Subject: ").appendText(subject);
description.appendText(", ");
description.appendText("Body: ").appendText(body);
}
@Override
public boolean matches(Object item) {
if (item instanceof Plaintext) {
Plaintext message = (Plaintext) item;
if (subject != null && !subject.equals(message.getSubject())) {
return false;
}
if (body != null && !body.equals(message.getText())) {
return false;
}
return true;
} else {
return false;
}
}
};
}
}

View File

@ -16,11 +16,27 @@
package ch.dissem.bitmessage.repository;
import org.h2.tools.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
/**
* JdbcConfig to be used for tests. Uses an in-memory database and adds a useful {@link #reset()} method resetting
* the database.
*/
public class TestJdbcConfig extends JdbcConfig {
private static final Logger LOG = LoggerFactory.getLogger(TestJdbcConfig.class);
static {
try {
Server.createTcpServer().start();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
public TestJdbcConfig() {
super("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1", "sa", null);
}