Some basics seem to work now like storing messages, addresses, inventory vectors etc.

This commit is contained in:
Christian Basler 2015-04-14 19:47:53 +02:00
parent 751ed0c637
commit 32de01bbf5
23 changed files with 556 additions and 85 deletions
demo/src/main/java/ch/dissem/bitmessage/demo
domain/src
inventory
networking/src/main/java/ch/dissem/bitmessage/networking

View File

@ -18,10 +18,11 @@ package ch.dissem.bitmessage.demo;
import ch.dissem.bitmessage.Context;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.inventory.SimpleAddressRepository;
import ch.dissem.bitmessage.inventory.SimpleInventory;
import ch.dissem.bitmessage.inventory.DatabaseRepository;
import ch.dissem.bitmessage.networking.NetworkNode;
import ch.dissem.bitmessage.ports.NetworkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
@ -31,14 +32,18 @@ import java.io.InputStreamReader;
* Created by chris on 06.04.15.
*/
public class Main {
private final static Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws IOException {
NetworkNode networkNode = new NetworkNode();
Context.init(new SimpleInventory(), new SimpleAddressRepository(), networkNode, 48444);
DatabaseRepository repo = new DatabaseRepository();
Context.init(repo, repo, networkNode, 48444);
Context.getInstance().addStream(1);
networkNode.setListener(new NetworkHandler.MessageListener() {
@Override
public void receive(ObjectPayload payload) {
// TODO
// LOG.info("message received: " + payload);
// System.out.print('.');
}
});
networkNode.start();

View File

@ -21,8 +21,6 @@ import ch.dissem.bitmessage.ports.Inventory;
import ch.dissem.bitmessage.ports.NetworkHandler;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;
/**
@ -41,6 +39,9 @@ public class Context {
private int port;
private long networkNonceTrialsPerByte = 1000;
private long networkExtraBytes = 1000;
private Context(Inventory inventory, AddressRepository addressRepo,
NetworkHandler networkHandler, int port) {
this.inventory = inventory;
@ -85,4 +86,12 @@ public class Context {
public void removeStream(long stream) {
streams.remove(stream);
}
public long getNetworkNonceTrialsPerByte() {
return networkNonceTrialsPerByte;
}
public long getNetworkExtraBytes() {
return networkExtraBytes;
}
}

View File

@ -18,7 +18,9 @@ package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.utils.Bytes;
import ch.dissem.bitmessage.utils.Encode;
import ch.dissem.bitmessage.utils.Security;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -35,7 +37,7 @@ public class ObjectMessage implements MessagePayload {
* The object's version
*/
private long version;
private long streamNumber;
private long stream;
private ObjectPayload payload;
private byte[] payloadBytes;
@ -45,7 +47,7 @@ public class ObjectMessage implements MessagePayload {
expiresTime = builder.expiresTime;
objectType = builder.objectType;
version = builder.version;
streamNumber = builder.streamNumber;
stream = builder.streamNumber;
payload = builder.payload;
}
@ -58,6 +60,10 @@ public class ObjectMessage implements MessagePayload {
return nonce;
}
public void setNonce(byte[] nonce) {
this.nonce = nonce;
}
public long getExpiresTime() {
return expiresTime;
}
@ -66,34 +72,33 @@ public class ObjectMessage implements MessagePayload {
return payload;
}
public InventoryVector getInventoryVector() {
// TODO
return null;
public long getStream() {
return stream;
}
public InventoryVector getInventoryVector() throws IOException {
return new InventoryVector(Bytes.truncate(Security.doubleSha512(nonce, getPayloadBytesWithoutNonce()), 32));
}
@Override
public void write(OutputStream stream) throws IOException {
stream.write(nonce);
stream.write(getPayloadBytes());
stream.write(getPayloadBytesWithoutNonce());
}
public byte[] getPayloadBytes() throws IOException {
public byte[] getPayloadBytesWithoutNonce() throws IOException {
if (payloadBytes == null) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encode.int64(expiresTime, stream);
Encode.int32(objectType, stream);
Encode.varInt(version, stream);
Encode.varInt(streamNumber, stream);
Encode.varInt(this.stream, stream);
payload.write(stream);
payloadBytes = stream.toByteArray();
}
return payloadBytes;
}
public void setNonce(byte[] nonce) {
this.nonce = nonce;
}
public static final class Builder {
private byte[] nonce;
private long expiresTime;

View File

@ -69,7 +69,7 @@ public class Version implements MessagePayload {
* The stream numbers that the emitting node is interested in. Sending nodes must not include more than 160000
* stream numbers.
*/
private final long[] streamNumbers;
private final long[] streams;
private Version(Builder builder) {
version = builder.version;
@ -79,7 +79,7 @@ public class Version implements MessagePayload {
addrFrom = builder.addrFrom;
nonce = builder.nonce;
userAgent = builder.userAgent;
streamNumbers = builder.streamNumbers;
streams = builder.streamNumbers;
}
public int getVersion() {
@ -111,7 +111,7 @@ public class Version implements MessagePayload {
}
public long[] getStreams() {
return streamNumbers;
return streams;
}
@Override
@ -128,7 +128,7 @@ public class Version implements MessagePayload {
addrFrom.write(stream, true);
Encode.int64(nonce, stream);
Encode.varString(userAgent, stream);
Encode.varIntList(streamNumbers, stream);
Encode.varIntList(streams, stream);
}

View File

@ -20,6 +20,17 @@ package ch.dissem.bitmessage.entity.payload;
* Public keys for signing and encryption, the answer to a 'getpubkey' request.
*/
public interface Pubkey extends ObjectPayload {
// bits 0 through 29 are yet undefined
/**
* Receiving node expects that the RIPE hash encoded in their address preceedes the encrypted message data of msg
* messages bound for them.
*/
int FEATURE_INCLUDE_DESTINATION = 30;
/**
* If true, the receiving node does send acknowledgements (rather than dropping them).
*/
int FEATURE_DOES_ACK = 31;
long getVersion();
byte[] getSigningKey();

View File

@ -17,6 +17,7 @@
package ch.dissem.bitmessage.entity.valueobject;
import ch.dissem.bitmessage.entity.Streamable;
import ch.dissem.bitmessage.utils.Strings;
import java.io.IOException;
import java.io.OutputStream;
@ -30,6 +31,10 @@ public class InventoryVector implements Streamable {
*/
private final byte[] hash;
public byte[] getHash() {
return hash;
}
public InventoryVector(byte[] hash) {
this.hash = hash;
}
@ -38,4 +43,9 @@ public class InventoryVector implements Streamable {
public void write(OutputStream stream) throws IOException {
stream.write(hash);
}
@Override
public String toString() {
return Strings.hex(hash).toString();
}
}

View File

@ -56,10 +56,26 @@ public class NetworkAddress implements Streamable {
port = builder.port;
}
public byte[] getIPv6() {
return ipv6;
}
public int getPort() {
return port;
}
public long getServices() {
return services;
}
public long getStream() {
return stream;
}
public long getTime() {
return time;
}
public InetAddress toInetAddress() {
try {
return InetAddress.getByAddress(ipv6);

View File

@ -17,6 +17,7 @@
package ch.dissem.bitmessage.factory;
import ch.dissem.bitmessage.entity.NetworkMessage;
import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.payload.*;
import ch.dissem.bitmessage.utils.Decode;
import org.slf4j.Logger;
@ -24,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
/**
* Creates {@link NetworkMessage} objects from {@link InputStream InputStreams}
@ -31,8 +33,24 @@ import java.io.InputStream;
public class Factory {
public static final Logger LOG = LoggerFactory.getLogger(Factory.class);
public static NetworkMessage getNetworkMessage(int version, InputStream stream) throws IOException {
public static NetworkMessage getNetworkMessage(int version, InputStream stream) throws SocketTimeoutException {
try {
return new V3MessageFactory().read(stream);
} catch (SocketTimeoutException e) {
throw e;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
}
}
public static ObjectMessage getObjectMessage(int version, InputStream stream, int length) {
try {
return new V3MessageFactory().readObject(stream, length);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
return null;
}
}
static ObjectPayload getObjectPayload(long objectType, long version, long streamNumber, InputStream stream, int length) throws IOException {
@ -46,11 +64,12 @@ public class Factory {
return parseMsg((int) version, streamNumber, stream, length);
case 3: // broadcast
return parseBroadcast((int) version, streamNumber, stream, length);
}
default:
LOG.error("This should not happen, someone broke something in the code!");
}
}
// fallback: just store the message - we don't really care what it is
LOG.error("Unexpected object type: " + objectType);
LOG.warn("Unexpected object type: " + objectType);
return new GenericPayload(streamNumber, Decode.bytes(stream, length));
}

View File

@ -20,6 +20,7 @@ import ch.dissem.bitmessage.entity.*;
import ch.dissem.bitmessage.entity.payload.ObjectPayload;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.utils.AccessCounter;
import ch.dissem.bitmessage.utils.Decode;
import ch.dissem.bitmessage.utils.Security;
import org.slf4j.Logger;
@ -68,21 +69,22 @@ class V3MessageFactory {
case "getdata":
return parseGetData(stream);
case "object":
return parseObject(stream, length);
return readObject(stream, length);
default:
LOG.debug("Unknown command: " + command);
return null;
}
}
private ObjectMessage parseObject(InputStream stream, int length) throws IOException {
byte nonce[] = Decode.bytes(stream, 8);
long expiresTime = Decode.int64(stream);
long objectType = Decode.uint32(stream);
long version = Decode.varInt(stream);
long streamNumber = Decode.varInt(stream);
public ObjectMessage readObject(InputStream stream, int length) throws IOException {
AccessCounter counter = new AccessCounter();
byte nonce[] = Decode.bytes(stream, 8, counter);
long expiresTime = Decode.int64(stream, counter);
long objectType = Decode.uint32(stream, counter);
long version = Decode.varInt(stream, counter);
long streamNumber = Decode.varInt(stream, counter);
ObjectPayload payload = Factory.getObjectPayload(objectType, version, streamNumber, stream, length);
ObjectPayload payload = Factory.getObjectPayload(objectType, version, streamNumber, stream, length-counter.length());
return new ObjectMessage.Builder()
.nonce(nonce)

View File

@ -27,11 +27,11 @@ import java.util.List;
public interface Inventory {
List<InventoryVector> getInventory(long... streams);
List<InventoryVector> getMissing(List<InventoryVector> offer);
List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams);
ObjectMessage getObject(InventoryVector vector);
void storeObject(ObjectMessage object);
void storeObject(int version, ObjectMessage object);
void cleanup();
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
/**
* Created by chris on 13.04.15.
*/
public class AccessCounter {
private int count;
public static void inc(AccessCounter counter) {
if (counter != null) counter.inc();
}
public static void inc(AccessCounter counter, int count) {
if (counter != null) counter.inc(count);
}
private void inc() {
count++;
}
private void inc(int length) {
count += length;
}
public int length() {
return count;
}
}

View File

@ -63,4 +63,10 @@ public class Bytes {
System.arraycopy(source, 0, result, size - source.length, source.length);
return result;
}
public static byte[] truncate(byte[] source, int size) {
byte[] result = new byte[size];
System.arraycopy(source, 0, result, 0, size);
return result;
}
}

View File

@ -20,17 +20,28 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import static ch.dissem.bitmessage.utils.AccessCounter.inc;
/**
* This class handles decoding simple types from byte stream, according to
* https://bitmessage.org/wiki/Protocol_specification#Common_structures
*/
public class Decode {
public static byte[] bytes(InputStream stream, int count) throws IOException {
return bytes(stream, count, null);
}
public static byte[] bytes(InputStream stream, int count, AccessCounter counter) throws IOException {
byte[] result = new byte[count];
int off = 0;
while (off < count) {
off += stream.read(result, off, count - off);
int read = stream.read(result, off, count - off);
if (read < 0) {
throw new IOException("Unexpected end of stream, wanted to read " + count + " bytes but only got " + off);
}
off += read;
}
inc(counter, count);
return result;
}
@ -45,14 +56,19 @@ public class Decode {
}
public static long varInt(InputStream stream) throws IOException {
return varInt(stream, null);
}
public static long varInt(InputStream stream, AccessCounter counter) throws IOException {
int first = stream.read();
inc(counter);
switch (first) {
case 0xfd:
return uint16(stream);
return uint16(stream, counter);
case 0xfe:
return uint32(stream);
return uint32(stream, counter);
case 0xff:
return int64(stream);
return int64(stream, counter);
default:
return first;
}
@ -63,10 +79,20 @@ public class Decode {
}
public static int uint16(InputStream stream) throws IOException {
return uint16(stream, null);
}
public static int uint16(InputStream stream, AccessCounter counter) throws IOException {
inc(counter, 2);
return stream.read() * 256 + stream.read();
}
public static long uint32(InputStream stream) throws IOException {
return uint32(stream, null);
}
public static long uint32(InputStream stream, AccessCounter counter) throws IOException {
inc(counter, 4);
return stream.read() * 16777216L + stream.read() * 65536L + stream.read() * 256L + stream.read();
}
@ -75,6 +101,11 @@ public class Decode {
}
public static long int64(InputStream stream) throws IOException {
return int64(stream, null);
}
public static long int64(InputStream stream, AccessCounter counter) throws IOException {
inc(counter, 8);
return ByteBuffer.wrap(bytes(stream, 8)).getLong();
}

View File

@ -22,7 +22,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static ch.dissem.bitmessage.utils.AccessCounter.inc;
/**
* This class handles encoding simple types from byte stream, according to
@ -37,41 +38,69 @@ public class Encode {
}
public static void varInt(long value, OutputStream stream) throws IOException {
varInt(value, stream, null);
}
public static void varInt(long value, OutputStream stream, AccessCounter counter) throws IOException {
if (value < 0) {
// This is due to the fact that Java doesn't really support unsigned values.
// Please be aware that this might be an error due to a smaller negative value being cast to long.
// Normally, negative values shouldn't occur within the protocol, and I large enough longs
// to being recognized as negatives aren't realistic.
stream.write(0xff);
int64(value, stream);
inc(counter);
int64(value, stream, counter);
} else if (value < 0xfd) {
int8(value, stream);
int8(value, stream, counter);
} else if (value <= 0xffffL) {
stream.write(0xfd);
int16(value, stream);
inc(counter);
int16(value, stream, counter);
} else if (value <= 0xffffffffL) {
stream.write(0xfe);
int32(value, stream);
inc(counter);
int32(value, stream, counter);
} else {
stream.write(0xff);
int64(value, stream);
inc(counter);
int64(value, stream, counter);
}
}
public static void int8(long value, OutputStream stream) throws IOException {
int8(value, stream, null);
}
public static void int8(long value, OutputStream stream, AccessCounter counter) throws IOException {
stream.write((int) value);
inc(counter);
}
public static void int16(long value, OutputStream stream) throws IOException {
int16(value, stream, null);
}
public static void int16(long value, OutputStream stream, AccessCounter counter) throws IOException {
stream.write(ByteBuffer.allocate(4).putInt((int) value).array(), 2, 2);
inc(counter, 2);
}
public static void int32(long value, OutputStream stream) throws IOException {
int32(value, stream, null);
}
public static void int32(long value, OutputStream stream, AccessCounter counter) throws IOException {
stream.write(ByteBuffer.allocate(4).putInt((int) value).array());
inc(counter, 4);
}
public static void int64(long value, OutputStream stream) throws IOException {
int64(value, stream, null);
}
public static void int64(long value, OutputStream stream, AccessCounter counter) throws IOException {
stream.write(ByteBuffer.allocate(8).putLong(value).array());
inc(counter, 8);
}
public static void varString(String value, OutputStream stream) throws IOException {

View File

@ -69,20 +69,12 @@ public class Security {
}
public static void doProofOfWork(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException {
// payload = embeddedTime + encodedObjectVersion + encodedStreamNumber + encrypted
byte[] payload = object.getPayloadBytes();
// payloadLength = the length of payload, in bytes, + 8 (to account for the nonce which we will append later)
// TTL = the number of seconds in between now and the object expiresTime.
// initialHash = hash(payload)
byte[] initialHash = getInitialHash(object);
byte[] target = getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes);
if (target.length < 8) {
target = Bytes.expand(target, 8);
}
// also start with nonce = 0 where nonce is 8 bytes in length and can be hashed as if it is a string.
byte[] nonce = new byte[8];
MessageDigest mda = null;
MessageDigest mda;
try {
mda = MessageDigest.getInstance("SHA-512");
} catch (NoSuchAlgorithmException e) {
@ -100,30 +92,25 @@ public class Security {
* @throws IOException if proof of work doesn't check out
*/
public static void checkProofOfWork(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException {
// nonce = the first 8 bytes of payload
byte[] nonce = object.getNonce();
byte[] initialHash = getInitialHash(object);
// resultHash = hash(hash( nonce || initialHash ))
byte[] resultHash = Security.doubleSha512(nonce, initialHash);
// POWValue = the first eight bytes of resultHash converted to an integer
byte[] powValue = bytes(resultHash, 8);
if (Bytes.lt(getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes), powValue)) {
if (Bytes.lt(
getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes),
Security.doubleSha512(object.getNonce(), getInitialHash(object)),
8)) {
throw new IOException("Insufficient proof of work");
}
}
private static byte[] getInitialHash(ObjectMessage object) throws IOException {
return Security.sha512(object.getPayloadBytes());
return Security.sha512(object.getPayloadBytesWithoutNonce());
}
private static byte[] getProofOfWorkTarget(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) throws IOException {
BigInteger TTL = BigInteger.valueOf(object.getExpiresTime() - (System.currentTimeMillis() / 1000));
LOG.debug("TTL: " + TTL + "s");
BigInteger numerator = TWO.pow(64);
BigInteger powLength = BigInteger.valueOf(object.getPayloadBytes().length + extraBytes);
BigInteger powLength = BigInteger.valueOf(object.getPayloadBytesWithoutNonce().length + extraBytes);
BigInteger denominator = BigInteger.valueOf(nonceTrialsPerByte).multiply(powLength.add(powLength.multiply(TTL).divide(BigInteger.valueOf(2).pow(16))));
return numerator.divide(denominator).toByteArray();
return Bytes.expand(numerator.divide(denominator).toByteArray(), 8);
}
private static byte[] hash(String algorithm, byte[]... data) {
@ -141,10 +128,4 @@ public class Security {
throw new RuntimeException(e);
}
}
private static byte[] bytes(byte[] data, int count) {
byte[] result = new byte[count];
System.arraycopy(data, 0, result, 0, count);
return result;
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
/**
* Created by chris on 13.04.15.
*/
public class Strings {
public static CharSequence join(byte[]... objects) {
StringBuilder streamList = new StringBuilder();
for (int i = 0; i < objects.length; i++) {
if (i > 0) streamList.append(", ");
streamList.append(hex(objects[i]));
}
return streamList;
}
public static CharSequence join(long... objects) {
StringBuilder streamList = new StringBuilder();
for (int i = 0; i < objects.length; i++) {
if (i > 0) streamList.append(", ");
streamList.append(objects[i]);
}
return streamList;
}
public static CharSequence join(Object... objects) {
StringBuilder streamList = new StringBuilder();
for (int i = 0; i < objects.length; i++) {
if (i > 0) streamList.append(", ");
streamList.append(objects[i]);
}
return streamList;
}
public static CharSequence hex(byte[] bytes) {
StringBuilder hex = new StringBuilder(bytes.length + 2);
hex.append("0x");
for (byte b : bytes) {
hex.append(String.format("%02x", b));
}
return hex;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class StringsTest {
@Test
public void ensureJoinWorksWithLongArray() {
long[] test = {1L, 2L};
assertEquals("1, 2", Strings.join(test).toString());
}
@Test
public void testHexString() {
assertEquals("0x48656c6c6f21", Strings.hex("Hello!".getBytes()));
}
}

View File

@ -9,5 +9,7 @@ repositories {
dependencies {
compile project(':domain')
testCompile group: 'junit', name: 'junit', version: '4.11'
compile 'com.h2database:h2:1.4.187'
compile 'org.flywaydb:flyway-core:3.2.1'
testCompile 'junit:junit:4.11'
}

View File

@ -0,0 +1,183 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.inventory;
import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.AddressRepository;
import ch.dissem.bitmessage.ports.Inventory;
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import static ch.dissem.bitmessage.utils.Strings.join;
/**
* Stores everything in a database
*/
public class DatabaseRepository implements Inventory, AddressRepository {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseRepository.class);
private static final String DB_URL = "jdbc:h2:~/jabit";
private static final String DB_USER = "sa";
private static final String DB_PWD = null;
public DatabaseRepository() {
Flyway flyway = new Flyway();
flyway.setDataSource(DB_URL, DB_USER, null);
flyway.migrate();
}
@Override
public List<NetworkAddress> getKnownAddresses(int limit, long... streams) {
List<NetworkAddress> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM Node WHERE Stream IN (" + join(streams) + ")");
while (rs.next()) {
// result.add(new NetworkAddress.Builder()
// .ipv6(rs.getBytes("ip"))
// .port(rs.getByte("port"))
// .services(rs.getLong("services"))
// .stream(rs.getLong("stream"))
// .time(rs.getLong("time"))
// .build());
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
if (result.isEmpty()) {
// FIXME: this is for testing purposes, remove it!
result.add(new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build());
}
return result;
}
@Override
public void offerAddresses(List<NetworkAddress> addresses) {
try {
Connection connection = getConnection();
PreparedStatement exists = connection.prepareStatement("SELECT port FROM Node WHERE ip = ? AND port = ?");
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO Node (ip, port, services, stream, time) VALUES (?, ?, ?, ?, ?)");
PreparedStatement update = connection.prepareStatement(
"UPDATE Node SET services = ?, stream = ?, time = ? WHERE ip = ? AND port = ?");
for (NetworkAddress node : addresses) {
exists.setBytes(1, node.getIPv6());
exists.setInt(2, node.getPort());
if (exists.executeQuery().next()) {
update.setLong(1, node.getServices());
update.setLong(2, node.getStream());
update.setLong(3, node.getTime());
update.setBytes(4, node.getIPv6());
update.setInt(5, node.getPort());
update.executeUpdate();
} else {
insert.setBytes(1, node.getIPv6());
insert.setInt(2, node.getPort());
insert.setLong(3, node.getServices());
insert.setLong(4, node.getStream());
insert.setLong(5, node.getTime());
insert.executeUpdate();
}
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
@Override
public List<InventoryVector> getInventory(long... streams) {
List<InventoryVector> result = new LinkedList<>();
try {
Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery("SELECT hash FROM Inventory WHERE Stream IN (" + join(streams) + ")");
while (rs.next()) {
result.add(new InventoryVector(rs.getBytes("hash")));
}
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return result;
}
@Override
public List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams) {
offer.removeAll(getInventory(streams));
return offer;
}
@Override
public ObjectMessage getObject(InventoryVector vector) {
try {
Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery("SELECT data, version FROM Inventory WHERE hash = " + vector);
Blob data = rs.getBlob("data");
return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public void storeObject(int version, ObjectMessage object) {
try {
PreparedStatement ps = getConnection().prepareStatement("INSERT INTO Inventory (hash, stream, expires, data, version) VALUES (?, ?, ?, ?, ?)");
InventoryVector iv = object.getInventoryVector();
LOG.error("Storing object " + iv);
ps.setBytes(1, iv.getHash());
ps.setLong(2, object.getStream());
ps.setLong(3, object.getExpiresTime());
ByteArrayOutputStream os = new ByteArrayOutputStream();
object.write(os);
ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray());
ps.setBlob(4, is);
ps.setInt(5, version);
ps.executeUpdate();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
@Override
public void cleanup() {
try {
getConnection().createStatement().executeUpdate("DELETE FROM Inventory WHERE time < " + (System.currentTimeMillis() / 1000));
} catch (SQLException e) {
LOG.debug(e.getMessage(), e);
}
}
private Connection getConnection() {
try {
return DriverManager.getConnection(DB_URL, DB_USER, DB_PWD);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -34,7 +34,7 @@ public class SimpleInventory implements Inventory {
}
@Override
public List<InventoryVector> getMissing(List<InventoryVector> offer) {
public List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams) {
return offer;
}
@ -44,7 +44,7 @@ public class SimpleInventory implements Inventory {
}
@Override
public void storeObject(ObjectMessage object) {
public void storeObject(int version, ObjectMessage object) {
throw new NotImplementedException();
}

View File

@ -0,0 +1,9 @@
CREATE TABLE Node (
ip BINARY(16) NOT NULL,
port INT NOT NULL,
services BIGINT NOT NULL,
stream BIGINT NOT NULL,
time BIGINT NOT NULL,
PRIMARY KEY (ip, port)
);

View File

@ -0,0 +1,7 @@
CREATE TABLE Inventory (
hash BINARY(32) NOT NULL PRIMARY KEY,
stream BIGINT NOT NULL,
expires BIGINT NOT NULL,
data BLOB NOT NULL,
version INT NOT NULL
);

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.NetworkHandler.MessageListener;
import ch.dissem.bitmessage.utils.Security;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,7 +67,7 @@ public class Connection implements Runnable {
this.out = socket.getOutputStream();
this.listener = listener;
this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build();
this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).build();
this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build();
}
public State getState() {
@ -124,8 +125,6 @@ public class Connection implements Runnable {
send(msg);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@ -134,23 +133,34 @@ public class Connection implements Runnable {
switch (messagePayload.getCommand()) {
case INV:
Inv inv = (Inv) messagePayload;
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are "
+ missing.size() + " missing.");
send(new GetData.Builder().inventory(missing).build());
break;
case GETDATA:
GetData getData = (GetData) messagePayload;
for (InventoryVector iv : getData.getInventory()) {
ObjectMessage om = ctx.getInventory().getObject(iv);
sendingQueue.offer(om);
}
// for (InventoryVector iv : getData.getInventory()) {
// ObjectMessage om = ctx.getInventory().getObject(iv);
// sendingQueue.offer(om);
// }
LOG.error("Node requests data!!!! This shouldn't happen, the hash is done wrong!!!");
break;
case OBJECT:
ObjectMessage objectMessage = (ObjectMessage) messagePayload;
ctx.getInventory().storeObject(objectMessage);
try {
LOG.debug("Received object " + objectMessage.getInventoryVector());
Security.checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
ctx.getInventory().storeObject(version, objectMessage);
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
}
// It's probably pointless, but let the listener decide if we accept the message for the client.
listener.receive(objectMessage.getPayload());
break;
case ADDR:
Addr addr = (Addr) messagePayload;
LOG.debug("Received " + addr.getAddresses().size() + " addresses.");
ctx.getAddressRepository().offerAddresses(addr.getAddresses());
break;
case VERACK: