Different improvements

- catch all kinds of errors when selector binding fails
- pimped DefaultLabeler to support listeners
- added pagination capabilities to AbstractMessageRepository
This commit is contained in:
Christian Basler 2017-09-13 08:06:06 +02:00
parent 273d229709
commit bf0c946c52
5 changed files with 91 additions and 53 deletions

View File

@ -1,5 +1,5 @@
buildscript { buildscript {
ext.kotlin_version = '1.1.4-2' ext.kotlin_version = '1.1.4-3'
repositories { repositories {
mavenCentral() mavenCentral()
} }

View File

@ -60,52 +60,58 @@ abstract class AbstractMessageRepository : MessageRepository, InternalContext.Co
} }
override fun getMessage(iv: InventoryVector): Plaintext? { override fun getMessage(iv: InventoryVector): Plaintext? {
return single(find("iv=X'" + Strings.hex(iv.hash) + "'")) return single(find("iv=X'${Strings.hex(iv.hash)}'"))
} }
override fun getMessage(initialHash: ByteArray): Plaintext? { override fun getMessage(initialHash: ByteArray): Plaintext? {
return single(find("initial_hash=X'" + Strings.hex(initialHash) + "'")) return single(find("initial_hash=X'${Strings.hex(initialHash)}'"))
} }
override fun getMessageForAck(ackData: ByteArray): Plaintext? { override fun getMessageForAck(ackData: ByteArray): Plaintext? {
return single(find("ack_data=X'" + Strings.hex(ackData) + "' AND status='" + Plaintext.Status.SENT + "'")) return single(find("ack_data=X'${Strings.hex(ackData)}' AND status='${Plaintext.Status.SENT}'"))
} }
override fun findMessages(label: Label?): List<Plaintext> { /**
if (label == null) { * Finds messages that have a specific label, with optional offset and limit. If the limit is set to 0,
return find("id NOT IN (SELECT message_id FROM Message_Label)") * offset and limit are ignored.
*/
open fun findMessages(label: Label?, offset: Int = 0, limit: Int = 0) = if (label == null) {
find("id NOT IN (SELECT message_id FROM Message_Label)", offset, limit)
} else { } else {
return find("id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.id + ")") find("id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.id + ")", offset, limit)
} }
override fun findMessages(label: Label?) = if (label == null) {
find("id NOT IN (SELECT message_id FROM Message_Label)")
} else {
find("id IN (SELECT message_id FROM Message_Label WHERE label_id=${label.id})")
} }
override fun findMessages(status: Plaintext.Status, recipient: BitmessageAddress): List<Plaintext> { override fun findMessages(status: Plaintext.Status, recipient: BitmessageAddress): List<Plaintext> {
return find("status='" + status.name + "' AND recipient='" + recipient.address + "'") return find("status='${status.name}' AND recipient='${recipient.address}'")
} }
override fun findMessages(status: Plaintext.Status): List<Plaintext> { override fun findMessages(status: Plaintext.Status): List<Plaintext> {
return find("status='" + status.name + "'") return find("status='${status.name}'")
} }
override fun findMessages(sender: BitmessageAddress): List<Plaintext> { override fun findMessages(sender: BitmessageAddress): List<Plaintext> {
return find("sender='" + sender.address + "'") return find("sender='${sender.address}'")
} }
override fun findMessagesToResend(): List<Plaintext> { override fun findMessagesToResend(): List<Plaintext> {
return find("status='" + Plaintext.Status.SENT.name + "'" + return find("status='${Plaintext.Status.SENT.name}' AND next_try < ${UnixTime.now}")
" AND next_try < " + UnixTime.now)
} }
override fun findResponses(parent: Plaintext): List<Plaintext> { override fun findResponses(parent: Plaintext): List<Plaintext> {
if (parent.inventoryVector == null) { if (parent.inventoryVector == null) {
return emptyList() return emptyList()
} }
return find("iv IN (SELECT child FROM Message_Parent" return find("iv IN (SELECT child FROM Message_Parent WHERE parent=X'${Strings.hex(parent.inventoryVector!!.hash)}')")
+ " WHERE parent=X'" + Strings.hex(parent.inventoryVector!!.hash) + "')")
} }
override fun getConversation(conversationId: UUID): List<Plaintext> { override fun getConversation(conversationId: UUID): List<Plaintext> {
return find("conversation=X'" + conversationId.toString().replace("-", "") + "'") return find("conversation=X'${conversationId.toString().replace("-", "")}'")
} }
override fun getLabels(): List<Label> { override fun getLabels(): List<Label> {
@ -113,20 +119,23 @@ abstract class AbstractMessageRepository : MessageRepository, InternalContext.Co
} }
override fun getLabels(vararg types: Label.Type): List<Label> { override fun getLabels(vararg types: Label.Type): List<Label> {
return findLabels("type IN (" + join(*types) + ")") return findLabels("type IN (${join(*types)})")
} }
protected abstract fun findLabels(where: String): List<Label> protected abstract fun findLabels(where: String): List<Label>
protected fun <T> single(collection: Collection<T>): T? { protected fun <T> single(collection: Collection<T>): T? {
when (collection.size) { return when (collection.size) {
0 -> return null 0 -> null
1 -> return collection.iterator().next() 1 -> collection.iterator().next()
else -> throw ApplicationException("This shouldn't happen, found " + collection.size + else -> throw ApplicationException("This shouldn't happen, found ${collection.size} items, one or none was expected")
" items, one or none was expected")
} }
} }
protected abstract fun find(where: String): List<Plaintext> /**
* Finds messages that mach the given where statement, with optional offset and limit. If the limit is set to 0,
* offset and limit are ignored.
*/
protected abstract fun find(where: String, offset: Int = 0, limit: Int = 0): List<Plaintext>
} }

View File

@ -25,22 +25,29 @@ import ch.dissem.bitmessage.entity.valueobject.Label
open class DefaultLabeler : Labeler, InternalContext.ContextHolder { open class DefaultLabeler : Labeler, InternalContext.ContextHolder {
private lateinit var ctx: InternalContext private lateinit var ctx: InternalContext
var listener: ((message: Plaintext, added: Collection<Label>, removed: Collection<Label>) -> Unit)? = null
override fun setContext(context: InternalContext) { override fun setContext(context: InternalContext) {
ctx = context ctx = context
} }
override fun setLabels(msg: Plaintext) { override fun setLabels(msg: Plaintext) {
msg.status = RECEIVED msg.status = RECEIVED
val labelsToAdd =
if (msg.type == BROADCAST) { if (msg.type == BROADCAST) {
msg.addLabels(ctx.messageRepository.getLabels(Label.Type.INBOX, Label.Type.BROADCAST, Label.Type.UNREAD)) ctx.messageRepository.getLabels(Label.Type.INBOX, Label.Type.BROADCAST, Label.Type.UNREAD)
} else { } else {
msg.addLabels(ctx.messageRepository.getLabels(Label.Type.INBOX, Label.Type.UNREAD)) ctx.messageRepository.getLabels(Label.Type.INBOX, Label.Type.UNREAD)
} }
msg.addLabels(labelsToAdd)
listener?.invoke(msg, labelsToAdd, emptyList())
} }
override fun markAsDraft(msg: Plaintext) { override fun markAsDraft(msg: Plaintext) {
msg.status = DRAFT msg.status = DRAFT
msg.addLabels(ctx.messageRepository.getLabels(Label.Type.DRAFT)) val labelsToAdd = ctx.messageRepository.getLabels(Label.Type.DRAFT)
msg.addLabels(labelsToAdd)
listener?.invoke(msg, labelsToAdd, emptyList())
} }
override fun markAsSending(msg: Plaintext) { override fun markAsSending(msg: Plaintext) {
@ -49,14 +56,20 @@ open class DefaultLabeler : Labeler, InternalContext.ContextHolder {
} else { } else {
msg.status = DOING_PROOF_OF_WORK msg.status = DOING_PROOF_OF_WORK
} }
val labelsToRemove = msg.labels.filter { it.type == Label.Type.DRAFT }
msg.removeLabel(Label.Type.DRAFT) msg.removeLabel(Label.Type.DRAFT)
msg.addLabels(ctx.messageRepository.getLabels(Label.Type.OUTBOX)) val labelsToAdd = ctx.messageRepository.getLabels(Label.Type.OUTBOX)
msg.addLabels(labelsToAdd)
listener?.invoke(msg, labelsToAdd, labelsToRemove)
} }
override fun markAsSent(msg: Plaintext) { override fun markAsSent(msg: Plaintext) {
msg.status = SENT msg.status = SENT
val labelsToRemove = msg.labels.filter { it.type == Label.Type.OUTBOX }
msg.removeLabel(Label.Type.OUTBOX) msg.removeLabel(Label.Type.OUTBOX)
msg.addLabels(ctx.messageRepository.getLabels(Label.Type.SENT)) val labelsToAdd = ctx.messageRepository.getLabels(Label.Type.SENT)
msg.addLabels(labelsToAdd)
listener?.invoke(msg, labelsToAdd, labelsToRemove)
} }
override fun markAsAcknowledged(msg: Plaintext) { override fun markAsAcknowledged(msg: Plaintext) {
@ -64,19 +77,28 @@ open class DefaultLabeler : Labeler, InternalContext.ContextHolder {
} }
override fun markAsRead(msg: Plaintext) { override fun markAsRead(msg: Plaintext) {
val labelsToRemove = msg.labels.filter { it.type == Label.Type.UNREAD }
msg.removeLabel(Label.Type.UNREAD) msg.removeLabel(Label.Type.UNREAD)
listener?.invoke(msg, emptyList(), labelsToRemove)
} }
override fun markAsUnread(msg: Plaintext) { override fun markAsUnread(msg: Plaintext) {
msg.addLabels(ctx.messageRepository.getLabels(Label.Type.UNREAD)) val labelsToAdd = ctx.messageRepository.getLabels(Label.Type.UNREAD)
msg.addLabels(labelsToAdd)
listener?.invoke(msg, labelsToAdd, emptyList())
} }
override fun delete(msg: Plaintext) { override fun delete(msg: Plaintext) {
val labelsToRemove = msg.labels.filterNot { it.type == Label.Type.TRASH }
msg.labels.clear() msg.labels.clear()
msg.addLabels(ctx.messageRepository.getLabels(Label.Type.TRASH)) val labelsToAdd = ctx.messageRepository.getLabels(Label.Type.TRASH)
msg.addLabels(labelsToAdd)
listener?.invoke(msg, labelsToAdd, labelsToRemove)
} }
override fun archive(msg: Plaintext) { override fun archive(msg: Plaintext) {
val labelsToRemove = msg.labels.toSet()
msg.labels.clear() msg.labels.clear()
listener?.invoke(msg, emptyList(), labelsToRemove)
} }
} }

View File

@ -318,7 +318,13 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
} }
} }
selector.close() selector.close()
} catch (_: ClosedSelectorException) { } catch (e: Exception) {
// There are various exceptions that may occur when the selector can't be bound:
// ClosedSelectorException, BindException, NullPointerException, SocketException,
// ClosedChannelException
// I'm not sure if I give a damn, or what to do about it. Crashing the application
// isn't nice though.
LOG.error(e.message, e)
} }
}) })
} }

View File

@ -126,14 +126,15 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
return 0 return 0
} }
override fun find(where: String): List<Plaintext> { override fun find(where: String, offset: Int, limit: Int): List<Plaintext> {
val result = LinkedList<Plaintext>() val result = LinkedList<Plaintext>()
val limit = if (limit == 0) "" else "LIMIT $limit OFFSET $offset"
try { try {
config.getConnection().use { connection -> config.getConnection().use { connection ->
connection.createStatement().use { stmt -> connection.createStatement().use { stmt ->
stmt.executeQuery( stmt.executeQuery(
"""SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try, conversation """SELECT id, iv, type, sender, recipient, data, ack_data, sent, received, initial_hash, status, ttl, retries, next_try, conversation
FROM Message WHERE $where""").use { rs -> FROM Message WHERE $where $limit""").use { rs ->
while (rs.next()) { while (rs.next()) {
val iv = rs.getBytes("iv") val iv = rs.getBytes("iv")
val data = rs.getBinaryStream("data") val data = rs.getBinaryStream("data")
@ -224,7 +225,8 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
} }
private fun updateParents(connection: Connection, message: Plaintext) { private fun updateParents(connection: Connection, message: Plaintext) {
if (message.inventoryVector == null || message.parents.isEmpty()) { val childIV = message.inventoryVector?.hash
if (childIV == null || message.parents.isEmpty()) {
// There are no parents to save yet (they are saved in the extended data, that's enough for now) // There are no parents to save yet (they are saved in the extended data, that's enough for now)
return return
} }
@ -233,13 +235,12 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
ps.setBytes(1, message.initialHash) ps.setBytes(1, message.initialHash)
ps.executeUpdate() ps.executeUpdate()
} }
val childIV = message.inventoryVector!!.hash
// save new parents // save new parents
var order = 0 var order = 0
connection.prepareStatement("INSERT INTO Message_Parent VALUES (?, ?, ?, ?)").use { ps -> connection.prepareStatement("INSERT INTO Message_Parent VALUES (?, ?, ?, ?)").use { ps ->
for (parentIV in message.parents) { for (parentIV in message.parents) {
val parent = getMessage(parentIV) getMessage(parentIV)?.let { parent ->
mergeConversations(connection, parent!!.conversationId, message.conversationId) mergeConversations(connection, parent.conversationId, message.conversationId)
order++ order++
ps.setBytes(1, parentIV.hash) ps.setBytes(1, parentIV.hash)
ps.setBytes(2, childIV) ps.setBytes(2, childIV)
@ -249,6 +250,7 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
} }
} }
} }
}
private fun insert(connection: Connection, message: Plaintext) { private fun insert(connection: Connection, message: Plaintext) {
connection.prepareStatement( connection.prepareStatement(
@ -334,18 +336,17 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
} }
override fun findConversations(label: Label?): List<UUID> { override fun findConversations(label: Label?): List<UUID> {
val where: String val where = if (label == null) {
if (label == null) { "id NOT IN (SELECT message_id FROM Message_Label)"
where = "id NOT IN (SELECT message_id FROM Message_Label)"
} else { } else {
where = "id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.id + ")" "id IN (SELECT message_id FROM Message_Label WHERE label_id=${label.id})"
} }
val result = LinkedList<UUID>() val result = LinkedList<UUID>()
try { try {
config.getConnection().use { connection -> config.getConnection().use { connection ->
connection.createStatement().use { stmt -> connection.createStatement().use { stmt ->
stmt.executeQuery( stmt.executeQuery(
"SELECT DISTINCT conversation FROM Message WHERE " + where).use { rs -> "SELECT DISTINCT conversation FROM Message WHERE $where").use { rs ->
while (rs.next()) { while (rs.next()) {
result.add(rs.getObject(1) as UUID) result.add(rs.getObject(1) as UUID)
} }