Added option to save labels
and other improvements and fixes used for exports and imports
This commit is contained in:
parent
cf6b3e2603
commit
c8dfc3b459
@ -1,5 +1,5 @@
|
|||||||
buildscript {
|
buildscript {
|
||||||
ext.kotlin_version = '1.1.3-2'
|
ext.kotlin_version = '1.1.4-2'
|
||||||
repositories {
|
repositories {
|
||||||
mavenCentral()
|
mavenCentral()
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import ch.dissem.bitmessage.entity.Plaintext.Status.PUBKEY_REQUESTED
|
|||||||
import ch.dissem.bitmessage.entity.payload.*
|
import ch.dissem.bitmessage.entity.payload.*
|
||||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
|
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
|
||||||
import ch.dissem.bitmessage.exception.DecryptionFailedException
|
import ch.dissem.bitmessage.exception.DecryptionFailedException
|
||||||
|
import ch.dissem.bitmessage.ports.AlreadyStoredException
|
||||||
import ch.dissem.bitmessage.ports.Labeler
|
import ch.dissem.bitmessage.ports.Labeler
|
||||||
import ch.dissem.bitmessage.ports.NetworkHandler
|
import ch.dissem.bitmessage.ports.NetworkHandler
|
||||||
import ch.dissem.bitmessage.utils.Strings.hex
|
import ch.dissem.bitmessage.utils.Strings.hex
|
||||||
@ -65,7 +66,7 @@ open class DefaultMessageListener(
|
|||||||
|
|
||||||
protected fun receive(objectMessage: ObjectMessage, getPubkey: GetPubkey) {
|
protected fun receive(objectMessage: ObjectMessage, getPubkey: GetPubkey) {
|
||||||
val identity = ctx.addressRepository.findIdentity(getPubkey.ripeTag)
|
val identity = ctx.addressRepository.findIdentity(getPubkey.ripeTag)
|
||||||
if (identity != null && identity.privateKey != null && !identity.isChan) {
|
if (identity?.privateKey != null && !identity.isChan) {
|
||||||
LOG.info("Got pubkey request for identity " + identity)
|
LOG.info("Got pubkey request for identity " + identity)
|
||||||
// FIXME: only send pubkey if it wasn't sent in the last TTL.pubkey() days
|
// FIXME: only send pubkey if it wasn't sent in the last TTL.pubkey() days
|
||||||
ctx.sendPubkey(identity, objectMessage.stream)
|
ctx.sendPubkey(identity, objectMessage.stream)
|
||||||
@ -90,7 +91,6 @@ open class DefaultMessageListener(
|
|||||||
}
|
}
|
||||||
} catch (_: DecryptionFailedException) {
|
} catch (_: DecryptionFailedException) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updatePubkey(address: BitmessageAddress, pubkey: Pubkey) {
|
private fun updatePubkey(address: BitmessageAddress, pubkey: Pubkey) {
|
||||||
@ -157,6 +157,7 @@ open class DefaultMessageListener(
|
|||||||
|
|
||||||
msg.inventoryVector = iv
|
msg.inventoryVector = iv
|
||||||
labeler.setLabels(msg)
|
labeler.setLabels(msg)
|
||||||
|
try {
|
||||||
ctx.messageRepository.save(msg)
|
ctx.messageRepository.save(msg)
|
||||||
listener.receive(msg)
|
listener.receive(msg)
|
||||||
|
|
||||||
@ -166,6 +167,9 @@ open class DefaultMessageListener(
|
|||||||
ctx.networkHandler.offer(it.inventoryVector)
|
ctx.networkHandler.offer(it.inventoryVector)
|
||||||
} ?: LOG.debug("ack message expected")
|
} ?: LOG.debug("ack message expected")
|
||||||
}
|
}
|
||||||
|
} catch (e: AlreadyStoredException) {
|
||||||
|
LOG.trace("Message was already received before.", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
@ -25,7 +25,8 @@ data class Label(
|
|||||||
/**
|
/**
|
||||||
* RGBA representation for the color.
|
* RGBA representation for the color.
|
||||||
*/
|
*/
|
||||||
var color: Int = 0
|
var color: Int = 0,
|
||||||
|
var ord: Int = 1000
|
||||||
) : Serializable {
|
) : Serializable {
|
||||||
|
|
||||||
var id: Any? = null
|
var id: Any? = null
|
||||||
|
@ -0,0 +1,23 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2017 Christian Basler
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package ch.dissem.bitmessage.ports
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Should be thrown if a received and decrypted message can't be stored because it has already been received and stored.
|
||||||
|
* (So it's not announced again to the client.)
|
||||||
|
*/
|
||||||
|
class AlreadyStoredException(message: String? = null, cause: Throwable? = null) : Exception(message, cause)
|
@ -28,6 +28,8 @@ interface MessageRepository {
|
|||||||
|
|
||||||
fun getLabels(vararg types: Label.Type): List<Label>
|
fun getLabels(vararg types: Label.Type): List<Label>
|
||||||
|
|
||||||
|
fun save(label: Label)
|
||||||
|
|
||||||
fun countUnread(label: Label?): Int
|
fun countUnread(label: Label?): Int
|
||||||
|
|
||||||
fun getAllMessages(): List<Plaintext>
|
fun getAllMessages(): List<Plaintext>
|
||||||
|
@ -218,7 +218,7 @@ class BitmessageContextTest {
|
|||||||
verify(ctx.internals.proofOfWorkRepository, timeout(10000)).putObject(
|
verify(ctx.internals.proofOfWorkRepository, timeout(10000)).putObject(
|
||||||
argThat { payload.type == ObjectType.MSG }, eq(1000L), eq(1000L))
|
argThat { payload.type == ObjectType.MSG }, eq(1000L), eq(1000L))
|
||||||
assertEquals(2, testPowRepo.added)
|
assertEquals(2, testPowRepo.added)
|
||||||
verify(ctx.messages, timeout(10000).atLeastOnce()).save(argThat { type == Type.MSG })
|
verify(ctx.messages, timeout(10000).atLeastOnce()).save(argThat<Plaintext> { type == Type.MSG })
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -228,7 +228,7 @@ class BitmessageContextTest {
|
|||||||
"Subject", "Message")
|
"Subject", "Message")
|
||||||
verify(testPowRepo, timeout(10000).atLeastOnce())
|
verify(testPowRepo, timeout(10000).atLeastOnce())
|
||||||
.putObject(argThat { payload.type == ObjectType.GET_PUBKEY }, eq(1000L), eq(1000L))
|
.putObject(argThat { payload.type == ObjectType.GET_PUBKEY }, eq(1000L), eq(1000L))
|
||||||
verify(ctx.messages, timeout(10000).atLeastOnce()).save(argThat { type == Type.MSG })
|
verify(ctx.messages, timeout(10000).atLeastOnce()).save(argThat<Plaintext> { type == Type.MSG })
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException::class)
|
@Test(expected = IllegalArgumentException::class)
|
||||||
@ -245,7 +245,7 @@ class BitmessageContextTest {
|
|||||||
verify(ctx.internals.proofOfWorkRepository, timeout(1000).atLeastOnce())
|
verify(ctx.internals.proofOfWorkRepository, timeout(1000).atLeastOnce())
|
||||||
.putObject(argThat { payload.type == ObjectType.BROADCAST }, eq(1000L), eq(1000L))
|
.putObject(argThat { payload.type == ObjectType.BROADCAST }, eq(1000L), eq(1000L))
|
||||||
verify(testPowEngine).calculateNonce(any(), any(), any())
|
verify(testPowEngine).calculateNonce(any(), any(), any())
|
||||||
verify(ctx.messages, timeout(10000).atLeastOnce()).save(argThat { type == Type.BROADCAST })
|
verify(ctx.messages, timeout(10000).atLeastOnce()).save(argThat<Plaintext> { type == Type.BROADCAST })
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException::class)
|
@Test(expected = IllegalArgumentException::class)
|
||||||
|
@ -108,7 +108,7 @@ class DefaultMessageListenerTest : TestBase() {
|
|||||||
|
|
||||||
listener.receive(objectMessage)
|
listener.receive(objectMessage)
|
||||||
|
|
||||||
verify(ctx.messageRepository, atLeastOnce()).save(argThat { type == MSG })
|
verify(ctx.messageRepository, atLeastOnce()).save(argThat<Plaintext> { type == MSG })
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -131,6 +131,6 @@ class DefaultMessageListenerTest : TestBase() {
|
|||||||
|
|
||||||
listener.receive(objectMessage)
|
listener.receive(objectMessage)
|
||||||
|
|
||||||
verify(ctx.messageRepository, atLeastOnce()).save(argThat { type == BROADCAST })
|
verify(ctx.messageRepository, atLeastOnce()).save(argThat<Plaintext> { type == BROADCAST })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -91,7 +91,7 @@ object MessageExport {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun importLabels(input: JsonArray<Any?>): List<Label> {
|
fun importLabels(input: JsonArray<*>): List<Label> {
|
||||||
return input.filterIsInstance(JsonObject::class.java).map { json ->
|
return input.filterIsInstance(JsonObject::class.java).map { json ->
|
||||||
Label(
|
Label(
|
||||||
label = json.string("label") ?: throw IllegalArgumentException("label expected"),
|
label = json.string("label") ?: throw IllegalArgumentException("label expected"),
|
||||||
|
@ -20,22 +20,18 @@ import ch.dissem.bitmessage.entity.Plaintext
|
|||||||
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
|
import ch.dissem.bitmessage.entity.valueobject.InventoryVector
|
||||||
import ch.dissem.bitmessage.entity.valueobject.Label
|
import ch.dissem.bitmessage.entity.valueobject.Label
|
||||||
import ch.dissem.bitmessage.ports.AbstractMessageRepository
|
import ch.dissem.bitmessage.ports.AbstractMessageRepository
|
||||||
|
import ch.dissem.bitmessage.ports.AlreadyStoredException
|
||||||
import ch.dissem.bitmessage.ports.MessageRepository
|
import ch.dissem.bitmessage.ports.MessageRepository
|
||||||
import ch.dissem.bitmessage.repository.JdbcHelper.Companion.writeBlob
|
import ch.dissem.bitmessage.repository.JdbcHelper.Companion.writeBlob
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.io.IOException
|
import java.sql.*
|
||||||
import java.sql.Connection
|
|
||||||
import java.sql.ResultSet
|
|
||||||
import java.sql.SQLException
|
|
||||||
import java.sql.Statement
|
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRepository(), MessageRepository {
|
class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRepository(), MessageRepository {
|
||||||
|
|
||||||
override fun findLabels(where: String): List<Label> {
|
override fun findLabels(where: String): List<Label> {
|
||||||
try {
|
try {
|
||||||
config.getConnection().use {
|
config.getConnection().use { connection ->
|
||||||
connection ->
|
|
||||||
return findLabels(connection, where)
|
return findLabels(connection, where)
|
||||||
}
|
}
|
||||||
} catch (e: SQLException) {
|
} catch (e: SQLException) {
|
||||||
@ -51,12 +47,61 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
|
|||||||
} else {
|
} else {
|
||||||
Label.Type.valueOf(typeName)
|
Label.Type.valueOf(typeName)
|
||||||
}
|
}
|
||||||
val label = Label(rs.getString("label"), type, rs.getInt("color"))
|
val label = Label(rs.getString("label"), type, rs.getInt("color"), rs.getInt("ord"))
|
||||||
label.id = rs.getLong("id")
|
label.id = rs.getLong("id")
|
||||||
|
|
||||||
return label
|
return label
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun save(label: Label) {
|
||||||
|
config.getConnection().use { connection ->
|
||||||
|
if (label.id != null) {
|
||||||
|
connection.prepareStatement("UPDATE Label SET label=?, type=?, color=?, ord=? WHERE id=?").use { ps ->
|
||||||
|
ps.setString(1, label.toString())
|
||||||
|
ps.setString(2, label.type?.name)
|
||||||
|
ps.setInt(3, label.color)
|
||||||
|
ps.setInt(4, label.ord)
|
||||||
|
ps.setInt(5, label.id as Int)
|
||||||
|
ps.executeUpdate()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
connection.autoCommit = false
|
||||||
|
var exists = false
|
||||||
|
connection.prepareStatement("SELECT COUNT(1) FROM Label WHERE label=?").use { ps ->
|
||||||
|
ps.setString(1, label.toString())
|
||||||
|
val rs = ps.executeQuery()
|
||||||
|
if (rs.next()) {
|
||||||
|
exists = rs.getInt(1) > 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (exists) {
|
||||||
|
connection.prepareStatement("UPDATE Label SET type=?, color=?, ord=? WHERE label=?").use { ps ->
|
||||||
|
ps.setString(1, label.type?.name)
|
||||||
|
ps.setInt(2, label.color)
|
||||||
|
ps.setInt(3, label.ord)
|
||||||
|
ps.setString(4, label.toString())
|
||||||
|
ps.executeUpdate()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
connection.prepareStatement("INSERT INTO Label (label, type, color, ord) VALUES (?, ?, ?, ?)").use { ps ->
|
||||||
|
ps.setString(1, label.toString())
|
||||||
|
ps.setString(2, label.type?.name)
|
||||||
|
ps.setInt(3, label.color)
|
||||||
|
ps.setInt(4, label.ord)
|
||||||
|
ps.executeUpdate()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
connection.commit()
|
||||||
|
} catch (e: Exception) {
|
||||||
|
connection.rollback()
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun countUnread(label: Label?): Int {
|
override fun countUnread(label: Label?): Int {
|
||||||
val where = if (label == null) {
|
val where = if (label == null) {
|
||||||
""
|
""
|
||||||
@ -68,7 +113,7 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
|
|||||||
try {
|
try {
|
||||||
config.getConnection().use { connection ->
|
config.getConnection().use { connection ->
|
||||||
connection.createStatement().use { stmt ->
|
connection.createStatement().use { stmt ->
|
||||||
stmt.executeQuery("SELECT count(*) FROM Message WHERE $where").use { rs ->
|
stmt.executeQuery("SELECT count(1) FROM Message WHERE $where").use { rs ->
|
||||||
if (rs.next()) {
|
if (rs.next()) {
|
||||||
return rs.getInt(1)
|
return rs.getInt(1)
|
||||||
}
|
}
|
||||||
@ -127,7 +172,7 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
|
|||||||
val result = ArrayList<Label>()
|
val result = ArrayList<Label>()
|
||||||
try {
|
try {
|
||||||
connection.createStatement().use { stmt ->
|
connection.createStatement().use { stmt ->
|
||||||
stmt.executeQuery("SELECT id, label, type, color FROM Label WHERE $where").use { rs ->
|
stmt.executeQuery("SELECT id, label, type, color, ord FROM Label WHERE $where").use { rs ->
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
result.add(getLabel(rs))
|
result.add(getLabel(rs))
|
||||||
}
|
}
|
||||||
@ -226,12 +271,16 @@ class JdbcMessageRepository(private val config: JdbcConfig) : AbstractMessageRep
|
|||||||
ps.setObject(13, message.nextTry)
|
ps.setObject(13, message.nextTry)
|
||||||
ps.setObject(14, message.conversationId)
|
ps.setObject(14, message.conversationId)
|
||||||
|
|
||||||
|
try {
|
||||||
ps.executeUpdate()
|
ps.executeUpdate()
|
||||||
// get generated id
|
// get generated id
|
||||||
ps.generatedKeys.use { rs ->
|
ps.generatedKeys.use { rs ->
|
||||||
rs.next()
|
rs.next()
|
||||||
message.id = rs.getLong(1)
|
message.id = rs.getLong(1)
|
||||||
}
|
}
|
||||||
|
} catch (e: SQLIntegrityConstraintViolationException) {
|
||||||
|
throw AlreadyStoredException(cause = e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user