Fix issues with Java 7 compatibility

This commit is contained in:
Christian Basler 2018-02-16 16:57:08 +01:00
parent 18f870a4cc
commit 00e4461043
3 changed files with 35 additions and 26 deletions

View File

@ -41,7 +41,7 @@ object NodeRegistryHelper {
val line = scanner.nextLine().trim { it <= ' ' }
if (line.startsWith("[stream")) {
stream = java.lang.Long.parseLong(line.substring(8, line.lastIndexOf(']')))
streamSet = HashSet<NetworkAddress>()
streamSet = HashSet()
result.put(stream, streamSet)
} else if (streamSet != null && !line.isEmpty() && !line.startsWith("#")) {
val portIndex = line.lastIndexOf(':')

View File

@ -59,8 +59,8 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
private var selector: Selector? = null
private var serverChannel: ServerSocketChannel? = null
private val connectionQueue = ConcurrentLinkedQueue<NetworkAddress>()
private val connections = ConcurrentHashMap<Connection, SelectionKey>()
private val requestedObjects = ConcurrentHashMap<InventoryVector, Long>(10000)
private val connections: MutableMap<Connection, SelectionKey> = ConcurrentHashMap()
private val requestedObjects: MutableMap<InventoryVector, Long> = ConcurrentHashMap(10000)
private var starter: Thread? = null
@ -74,7 +74,7 @@ class NioNetworkHandler : NetworkHandler, InternalContext.ContextHolder {
channel.configureBlocking(false)
val connection = Connection(ctx, SYNC,
NetworkAddress.Builder().ip(server).port(port).stream(1).build(),
HashMap<InventoryVector, Long>(), timeoutInSeconds)
HashMap(), timeoutInSeconds)
while (channel.isConnected && !connection.isSyncFinished) {
write(channel, connection.io)
read(channel, connection.io)

View File

@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap
class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory {
private val cache = ConcurrentHashMap<Long, MutableMap<InventoryVector, Long>>()
private val cache: MutableMap<Long, MutableMap<InventoryVector, Long>> = ConcurrentHashMap()
override fun getInventory(vararg streams: Long): List<InventoryVector> {
val result = LinkedList<InventoryVector>()
@ -48,14 +48,15 @@ class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory {
if (result == null) {
synchronized(cache) {
if (cache[stream] == null) {
val map = ConcurrentHashMap<InventoryVector, Long>()
cache.put(stream, map)
val map: MutableMap<InventoryVector, Long> = ConcurrentHashMap()
result = map
cache[stream] = map
try {
config.getConnection().use { connection ->
connection.createStatement().use { stmt ->
stmt.executeQuery("SELECT hash, expires FROM Inventory " +
"WHERE expires > " + (now - 5 * MINUTE) + " AND stream = " + stream).use { rs ->
stmt.executeQuery(
"SELECT hash, expires FROM Inventory WHERE expires > ${now - 5 * MINUTE} AND stream = $stream"
).use { rs ->
while (rs.next()) {
map.put(InventoryVector(rs.getBytes("hash")), rs.getLong("expires"))
}
@ -71,7 +72,8 @@ class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory {
return result!!
}
override fun getMissing(offer: List<InventoryVector>, vararg streams: Long): List<InventoryVector> = offer - streams.flatMap { getCache(it).keys }
override fun getMissing(offer: List<InventoryVector>, vararg streams: Long): List<InventoryVector> =
offer - streams.flatMap { getCache(it).keys }
override fun getObject(vector: InventoryVector): ObjectMessage? {
config.getConnection().use { connection ->
@ -81,7 +83,7 @@ class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory {
val data = rs.getBlob("data")
return Factory.getObjectMessage(rs.getInt("version"), data.binaryStream, data.length().toInt())
} else {
LOG.info("Object requested that we don't have. IV: " + vector)
LOG.info("Object requested that we don't have. IV: $vector")
return null
}
}
@ -106,7 +108,13 @@ class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory {
val result = LinkedList<ObjectMessage>()
while (rs.next()) {
val data = rs.getBlob("data")
result.add(Factory.getObjectMessage(rs.getInt("version"), data.binaryStream, data.length().toInt())!!)
result.add(
Factory.getObjectMessage(
rs.getInt("version"),
data.binaryStream,
data.length().toInt()
)!!
)
}
return result
}
@ -120,21 +128,22 @@ class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory {
try {
config.getConnection().use { connection ->
connection.prepareStatement("INSERT INTO Inventory " + "(hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)").use { ps ->
val iv = objectMessage.inventoryVector
LOG.trace("Storing object " + iv)
ps.setBytes(1, iv.hash)
ps.setLong(2, objectMessage.stream)
ps.setLong(3, objectMessage.expiresTime)
JdbcHelper.Companion.writeBlob(ps, 4, objectMessage)
ps.setLong(5, objectMessage.type)
ps.setLong(6, objectMessage.version)
ps.executeUpdate()
getCache(objectMessage.stream).put(iv, objectMessage.expiresTime)
}
connection.prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, type, version) VALUES (?, ?, ?, ?, ?, ?)")
.use { ps ->
val iv = objectMessage.inventoryVector
LOG.trace("Storing object " + iv)
ps.setBytes(1, iv.hash)
ps.setLong(2, objectMessage.stream)
ps.setLong(3, objectMessage.expiresTime)
JdbcHelper.Companion.writeBlob(ps, 4, objectMessage)
ps.setLong(5, objectMessage.type)
ps.setLong(6, objectMessage.version)
ps.executeUpdate()
getCache(objectMessage.stream).put(iv, objectMessage.expiresTime)
}
}
} catch (e: SQLException) {
LOG.debug("Error storing object of type " + objectMessage.payload.javaClass.simpleName, e)
LOG.debug("Error storing object of type ${objectMessage.payload.javaClass.simpleName}", e)
} catch (e: Exception) {
LOG.error(e.message, e)
}
@ -148,7 +157,7 @@ class JdbcInventory(config: JdbcConfig) : JdbcHelper(config), Inventory {
try {
config.getConnection().use { connection ->
connection.createStatement().use { stmt ->
stmt.executeUpdate("DELETE FROM Inventory WHERE expires < " + (now - 5 * MINUTE))
stmt.executeUpdate("DELETE FROM Inventory WHERE expires < ${now - 5 * MINUTE}")
}
}
} catch (e: SQLException) {