The POW callback is now a service and its state stored.

The proof of work engine therefore just has to remember its initial hash making server based POW easier.
This commit is contained in:
Christian Basler 2015-12-08 20:27:32 +01:00
parent 991a0e5f86
commit ab6a3c56dd
25 changed files with 289 additions and 103 deletions

View File

@ -22,10 +22,7 @@ import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.payload.Pubkey;
import ch.dissem.bitmessage.networking.DefaultNetworkHandler;
import ch.dissem.bitmessage.ports.MemoryNodeRegistry;
import ch.dissem.bitmessage.repository.JdbcAddressRepository;
import ch.dissem.bitmessage.repository.JdbcConfig;
import ch.dissem.bitmessage.repository.JdbcInventory;
import ch.dissem.bitmessage.repository.JdbcMessageRepository;
import ch.dissem.bitmessage.repository.*;
import ch.dissem.bitmessage.security.bc.BouncySecurity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,6 +47,7 @@ public class Application {
.inventory(new JdbcInventory(jdbcConfig))
.nodeRegistry(new MemoryNodeRegistry())
.messageRepo(new JdbcMessageRepository(jdbcConfig))
.powRepo(new JdbcProofOfWorkRepository(jdbcConfig))
.networkHandler(new DefaultNetworkHandler())
.security(new BouncySecurity())
.port(48444)

View File

@ -51,6 +51,7 @@ public class Main {
.inventory(new JdbcInventory(jdbcConfig))
.nodeRegistry(new MemoryNodeRegistry())
.messageRepo(new JdbcMessageRepository(jdbcConfig))
.powRepo(new JdbcProofOfWorkRepository(jdbcConfig))
.networkHandler(new DefaultNetworkHandler())
.security(new BouncySecurity())
.port(48444)

View File

@ -75,7 +75,7 @@ public class BitmessageContext {
}
public AddressRepository addresses() {
return ctx.getAddressRepo();
return ctx.getAddressRepository();
}
public MessageRepository messages() {
@ -90,7 +90,7 @@ public class BitmessageContext {
ctx.getNetworkExtraBytes(),
features
));
ctx.getAddressRepo().save(identity);
ctx.getAddressRepository().save(identity);
pool.submit(new Runnable() {
@Override
public void run() {
@ -102,6 +102,7 @@ public class BitmessageContext {
public void addDistributedMailingList(String address, String alias) {
// TODO
throw new RuntimeException("not implemented");
}
public void broadcast(final BitmessageAddress from, final String subject, final String message) {
@ -120,9 +121,7 @@ public class BitmessageContext {
from,
from,
Factory.getBroadcast(from, msg),
+2 * DAY,
0,
0
+2 * DAY
);
msg.setStatus(SENT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.BROADCAST, Label.Type.SENT));
@ -159,9 +158,7 @@ public class BitmessageContext {
from,
to,
new Msg(msg),
+2 * DAY,
ctx.getNonceTrialsPerByte(to),
ctx.getExtraBytes(to)
+2 * DAY
);
msg.setStatus(SENT);
msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT));
@ -176,9 +173,7 @@ public class BitmessageContext {
requestingIdentity,
address,
new GetPubkey(address),
+28 * DAY,
ctx.getNetworkNonceTrialsPerByte(),
ctx.getNetworkExtraBytes()
+28 * DAY
);
}
@ -220,7 +215,7 @@ public class BitmessageContext {
}
public void addContact(BitmessageAddress contact) {
ctx.getAddressRepo().save(contact);
ctx.getAddressRepository().save(contact);
tryToFindMatchingPubkey(contact);
if (contact.getPubkey() == null) {
ctx.requestPubkey(contact);
@ -237,7 +232,7 @@ public class BitmessageContext {
v4Pubkey.decrypt(address.getPublicDecryptionKey());
if (object.isSignatureValid(v4Pubkey)) {
address.setPubkey(v4Pubkey);
ctx.getAddressRepo().save(address);
ctx.getAddressRepository().save(address);
break;
} else {
LOG.info("Found pubkey for " + address + " but signature is invalid");
@ -246,7 +241,7 @@ public class BitmessageContext {
} else {
if (Arrays.equals(pubkey.getRipe(), address.getRipe())) {
address.setPubkey(pubkey);
ctx.getAddressRepo().save(address);
ctx.getAddressRepository().save(address);
break;
}
}
@ -258,7 +253,7 @@ public class BitmessageContext {
public void addSubscribtion(BitmessageAddress address) {
address.setSubscribed(true);
ctx.getAddressRepo().save(address);
ctx.getAddressRepository().save(address);
tryToFindBroadcastsForAddress(address);
}
@ -292,6 +287,7 @@ public class BitmessageContext {
NetworkHandler networkHandler;
AddressRepository addressRepo;
MessageRepository messageRepo;
ProofOfWorkRepository proofOfWorkRepository;
ProofOfWorkEngine proofOfWorkEngine;
Security security;
MessageCallback messageCallback;
@ -333,6 +329,11 @@ public class BitmessageContext {
return this;
}
public Builder powRepo(ProofOfWorkRepository proofOfWorkRepository) {
this.proofOfWorkRepository = proofOfWorkRepository;
return this;
}
public Builder security(Security security) {
this.security = security;
return this;
@ -374,6 +375,7 @@ public class BitmessageContext {
nonNull("networkHandler", networkHandler);
nonNull("addressRepo", addressRepo);
nonNull("messageRepo", messageRepo);
nonNull("proofOfWorkRepo", proofOfWorkRepository);
if (proofOfWorkEngine == null) {
proofOfWorkEngine = new MultiThreadedPOWEngine();
}

View File

@ -69,7 +69,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
}
protected void receive(ObjectMessage object, GetPubkey getPubkey) {
BitmessageAddress identity = ctx.getAddressRepo().findIdentity(getPubkey.getRipeTag());
BitmessageAddress identity = ctx.getAddressRepository().findIdentity(getPubkey.getRipeTag());
if (identity != null && identity.getPrivateKey() != null) {
LOG.info("Got pubkey request for identity " + identity);
// FIXME: only send pubkey if it wasn't sent in the last 28 days
@ -82,17 +82,17 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
try {
if (pubkey instanceof V4Pubkey) {
V4Pubkey v4Pubkey = (V4Pubkey) pubkey;
address = ctx.getAddressRepo().findContact(v4Pubkey.getTag());
address = ctx.getAddressRepository().findContact(v4Pubkey.getTag());
if (address != null) {
v4Pubkey.decrypt(address.getPublicDecryptionKey());
}
} else {
address = ctx.getAddressRepo().findContact(pubkey.getRipe());
address = ctx.getAddressRepository().findContact(pubkey.getRipe());
}
if (address != null) {
address.setPubkey(pubkey);
LOG.info("Got pubkey for contact " + address);
ctx.getAddressRepo().save(address);
ctx.getAddressRepository().save(address);
List<Plaintext> messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address);
LOG.info("Sending " + messages.size() + " messages for contact " + address);
for (Plaintext msg : messages) {
@ -102,9 +102,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
msg.getFrom(),
msg.getTo(),
new Msg(msg),
+2 * DAY,
ctx.getNonceTrialsPerByte(msg.getTo()),
ctx.getExtraBytes(msg.getTo())
+2 * DAY
);
msg.setStatus(SENT);
ctx.getMessageRepository().save(msg);
@ -115,7 +113,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
}
protected void receive(ObjectMessage object, Msg msg) throws IOException {
for (BitmessageAddress identity : ctx.getAddressRepo().getIdentities()) {
for (BitmessageAddress identity : ctx.getAddressRepository().getIdentities()) {
try {
msg.decrypt(identity.getPrivateKey().getPrivateEncryptionKey());
msg.getPlaintext().setTo(identity);
@ -136,7 +134,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
protected void receive(ObjectMessage object, Broadcast broadcast) throws IOException {
byte[] tag = broadcast instanceof V5Broadcast ? ((V5Broadcast) broadcast).getTag() : null;
for (BitmessageAddress subscription : ctx.getAddressRepo().getSubscriptions(broadcast.getVersion())) {
for (BitmessageAddress subscription : ctx.getAddressRepository().getSubscriptions(broadcast.getVersion())) {
if (tag != null && !Arrays.equals(tag, subscription.getTag())) {
continue;
}

View File

@ -48,9 +48,11 @@ public class InternalContext {
private final NetworkHandler networkHandler;
private final AddressRepository addressRepository;
private final MessageRepository messageRepository;
private final ProofOfWorkRepository proofOfWorkRepository;
private final ProofOfWorkEngine proofOfWorkEngine;
private final MessageCallback messageCallback;
private final CustomCommandHandler customCommandHandler;
private final ProofOfWorkService proofOfWorkService;
private final TreeSet<Long> streams = new TreeSet<>();
private final int port;
@ -67,6 +69,8 @@ public class InternalContext {
this.networkHandler = builder.networkHandler;
this.addressRepository = builder.addressRepo;
this.messageRepository = builder.messageRepo;
this.proofOfWorkRepository = builder.proofOfWorkRepository;
this.proofOfWorkService = new ProofOfWorkService();
this.proofOfWorkEngine = builder.proofOfWorkEngine;
this.clientNonce = security.randomNonce();
this.messageCallback = builder.messageCallback;
@ -88,7 +92,9 @@ public class InternalContext {
streams.add(1L);
}
init(security, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine);
init(security, inventory, nodeRegistry, networkHandler, addressRepository, messageRepository,
proofOfWorkRepository, proofOfWorkService, proofOfWorkEngine,
messageCallback, customCommandHandler);
for (BitmessageAddress identity : addressRepository.getIdentities()) {
streams.add(identity.getStream());
}
@ -118,7 +124,7 @@ public class InternalContext {
return networkHandler;
}
public AddressRepository getAddressRepo() {
public AddressRepository getAddressRepository() {
return addressRepository;
}
@ -126,6 +132,10 @@ public class InternalContext {
return messageRepository;
}
public ProofOfWorkRepository getProofOfWorkRepository() {
return proofOfWorkRepository;
}
public ProofOfWorkEngine getProofOfWorkEngine() {
return proofOfWorkEngine;
}
@ -147,22 +157,12 @@ public class InternalContext {
return networkNonceTrialsPerByte;
}
public long getNonceTrialsPerByte(BitmessageAddress address) {
long nonceTrialsPerByte = address.getPubkey().getNonceTrialsPerByte();
return networkNonceTrialsPerByte > nonceTrialsPerByte ? networkNonceTrialsPerByte : nonceTrialsPerByte;
}
public long getNetworkExtraBytes() {
return networkExtraBytes;
}
public long getExtraBytes(BitmessageAddress address) {
long extraBytes = address.getPubkey().getExtraBytes();
return networkExtraBytes > extraBytes ? networkExtraBytes : extraBytes;
}
public void send(final BitmessageAddress from, BitmessageAddress to, final ObjectPayload payload,
final long timeToLive, final long nonceTrialsPerByte, final long extraBytes) {
final long timeToLive) {
try {
if (to == null) to = from;
long expires = UnixTime.now(+timeToLive);
@ -181,22 +181,7 @@ public class InternalContext {
object.encrypt(to.getPubkey());
}
messageCallback.proofOfWorkStarted(payload);
security.doProofOfWork(object, nonceTrialsPerByte, extraBytes,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
object.setNonce(nonce);
messageCallback.proofOfWorkCompleted(payload);
if (payload instanceof PlaintextHolder) {
Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext();
plaintext.setInventoryVector(object.getInventoryVector());
messageRepository.save(plaintext);
}
inventory.storeObject(object);
networkHandler.offer(object.getInventoryVector());
messageCallback.messageOffered(payload, object.getInventoryVector());
}
});
proofOfWorkService.doProofOfWork(to, object);
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -214,18 +199,8 @@ public class InternalContext {
response.sign(identity.getPrivateKey());
response.encrypt(security.createPublicKey(identity.getPublicDecryptionKey()));
messageCallback.proofOfWorkStarted(identity.getPubkey());
security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
response.setNonce(nonce);
messageCallback.proofOfWorkCompleted(identity.getPubkey());
inventory.storeObject(response);
networkHandler.offer(response.getInventoryVector());
// TODO: save that the pubkey was just sent, and on which stream!
messageCallback.messageOffered(identity.getPubkey(), response.getInventoryVector());
}
});
// TODO: remember that the pubkey is just about to be sent, and on which stream!
proofOfWorkService.doProofOfWork(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -240,17 +215,7 @@ public class InternalContext {
.payload(new GetPubkey(contact))
.build();
messageCallback.proofOfWorkStarted(response.getPayload());
security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
response.setNonce(nonce);
messageCallback.proofOfWorkCompleted(response.getPayload());
inventory.storeObject(response);
networkHandler.offer(response.getInventoryVector());
messageCallback.messageOffered(response.getPayload(), response.getInventoryVector());
}
});
proofOfWorkService.doProofOfWork(response);
}
public long getClientNonce() {

View File

@ -0,0 +1,62 @@
package ch.dissem.bitmessage;
import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.PlaintextHolder;
import ch.dissem.bitmessage.ports.MessageRepository;
import ch.dissem.bitmessage.ports.ProofOfWorkEngine;
import ch.dissem.bitmessage.ports.ProofOfWorkRepository;
import ch.dissem.bitmessage.ports.Security;
import static ch.dissem.bitmessage.utils.Singleton.security;
/**
* @author Christian Basler
*/
public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalContext.ContextHolder {
private Security security;
private InternalContext ctx;
private ProofOfWorkRepository powRepo;
private MessageRepository messageRepo;
public void doProofOfWork(ObjectMessage object) {
doProofOfWork(null, object);
}
public void doProofOfWork(BitmessageAddress recipient, ObjectMessage object) {
long nonceTrialsPerByte = recipient == null ? 0 : recipient.getPubkey().getNonceTrialsPerByte();
long extraBytes = recipient == null ? 0 : recipient.getPubkey().getExtraBytes();
powRepo.putObject(object, nonceTrialsPerByte, extraBytes);
if (object.getPayload() instanceof PlaintextHolder){
Plaintext plaintext = ((PlaintextHolder) object.getPayload()).getPlaintext();
plaintext.setInitialHash(security.getInitialHash(object));
messageRepo.save(plaintext);
}
security.doProofOfWork(object, nonceTrialsPerByte, extraBytes, this);
}
@Override
public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
ObjectMessage object = powRepo.getObject(initialHash);
object.setNonce(nonce);
// messageCallback.proofOfWorkCompleted(payload);
Plaintext plaintext = messageRepo.getMessage(initialHash);
if (plaintext != null) {
plaintext.setInventoryVector(object.getInventoryVector());
messageRepo.save(plaintext);
}
ctx.getInventory().storeObject(object);
ctx.getNetworkHandler().offer(object.getInventoryVector());
// messageCallback.messageOffered(payload, object.getInventoryVector());
}
@Override
public void setContext(InternalContext ctx) {
this.ctx = ctx;
this.security = security();
this.powRepo = ctx.getProofOfWorkRepository();
this.messageRepo = ctx.getMessageRepository();
}
}

View File

@ -16,26 +16,36 @@
package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.utils.AccessCounter;
import ch.dissem.bitmessage.utils.Encode;
import java.io.*;
import static ch.dissem.bitmessage.utils.Decode.bytes;
import static ch.dissem.bitmessage.utils.Decode.varString;
/**
* @author Christian Basler
*/
public class CustomMessage implements MessagePayload {
public static final String COMMAND_ERROR = "ERROR";
private final String command;
private final byte[] data;
public CustomMessage() {
public CustomMessage(String command) {
this.command = command;
this.data = null;
}
public CustomMessage(byte[] data) {
public CustomMessage(String command, byte[] data) {
this.command = command;
this.data = data;
}
public static MessagePayload read(InputStream in, int length) throws IOException {
return new CustomMessage(bytes(in, length));
AccessCounter counter = new AccessCounter();
return new CustomMessage(varString(in, counter), bytes(in, length - counter.length()));
}
@Override
@ -56,6 +66,7 @@ public class CustomMessage implements MessagePayload {
@Override
public void write(OutputStream out) throws IOException {
if (data != null) {
Encode.varString(command, out);
out.write(data);
} else {
throw new RuntimeException("Tried to write custom message without data. " +
@ -63,9 +74,13 @@ public class CustomMessage implements MessagePayload {
}
}
public boolean isError() {
return COMMAND_ERROR.equals(command);
}
public static CustomMessage error(String message) {
try {
return new CustomMessage(("ERROR\n" + message).getBytes("UTF-8"));
return new CustomMessage(COMMAND_ERROR, message.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}

View File

@ -156,7 +156,11 @@ public class ObjectMessage implements MessagePayload {
@Override
public void write(OutputStream out) throws IOException {
if (nonce != null) {
out.write(nonce);
} else {
out.write(new byte[8]);
}
out.write(getPayloadBytesWithoutNonce());
}

View File

@ -44,6 +44,7 @@ public class Plaintext implements Streamable {
private Long received;
private Set<Label> labels;
private byte[] initialHash;
private Plaintext(Builder builder) {
id = builder.id;
@ -260,6 +261,14 @@ public class Plaintext implements Streamable {
}
}
public void setInitialHash(byte[] initialHash) {
this.initialHash = initialHash;
}
public byte[] getInitialHash() {
return initialHash;
}
public enum Encoding {
IGNORE(0), TRIVIAL(1), SIMPLE(2);

View File

@ -34,6 +34,8 @@ import java.security.GeneralSecurityException;
import java.security.MessageDigest;
import java.security.SecureRandom;
import static ch.dissem.bitmessage.utils.Numbers.max;
/**
* Implements everything that isn't directly dependent on either Spongy- or Bouncycastle.
*/
@ -95,8 +97,8 @@ public abstract class AbstractSecurity implements Security, InternalContext.Cont
public void doProofOfWork(ObjectMessage object, long nonceTrialsPerByte,
long extraBytes, ProofOfWorkEngine.Callback callback) {
try {
if (nonceTrialsPerByte < 1000) nonceTrialsPerByte = 1000;
if (extraBytes < 1000) extraBytes = 1000;
nonceTrialsPerByte = max(nonceTrialsPerByte, context.getNetworkNonceTrialsPerByte());
extraBytes = max(extraBytes, context.getNetworkExtraBytes());
byte[] initialHash = getInitialHash(object);
@ -117,7 +119,8 @@ public abstract class AbstractSecurity implements Security, InternalContext.Cont
}
}
private byte[] getInitialHash(ObjectMessage object) throws IOException {
@Override
public byte[] getInitialHash(ObjectMessage object) {
return sha512(object.getPayloadBytesWithoutNonce());
}

View File

@ -30,6 +30,8 @@ public interface MessageRepository {
int countUnread(Label label);
Plaintext getMessage(byte[] initialHash);
List<Plaintext> findMessages(Label label);
List<Plaintext> findMessages(Status status);

View File

@ -102,7 +102,7 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine {
synchronized (callback) {
if (!Thread.interrupted()) {
try {
callback.onNonceCalculated(nonce);
callback.onNonceCalculated(initialHash, nonce);
} finally {
semaphore.release();
for (Worker w : workers) {
@ -128,12 +128,12 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine {
}
@Override
public void onNonceCalculated(byte[] nonce) {
public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
synchronized (this) {
if (waiting) {
LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
waiting = false;
callback.onNonceCalculated(nonce);
callback.onNonceCalculated(initialHash, nonce);
}
}
}

View File

@ -35,6 +35,6 @@ public interface ProofOfWorkEngine {
/**
* @param nonce 8 bytes nonce
*/
void onNonceCalculated(byte[] nonce);
void onNonceCalculated(byte[] initialHash, byte[] nonce);
}
}

View File

@ -0,0 +1,16 @@
package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.ObjectMessage;
/**
* Objects that proof of work is currently being done for.
*
* @author Christian Basler
*/
public interface ProofOfWorkRepository {
ObjectMessage getObject(byte[] initialHash);
void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes);
void removeObject(ObjectMessage object);
}

View File

@ -134,6 +134,8 @@ public interface Security {
void checkProofOfWork(ObjectMessage object, long nonceTrialsPerByte, long extraBytes)
throws IOException;
byte[] getInitialHash(ObjectMessage object);
/**
* Calculates the MAC for a message (data)
*

View File

@ -40,6 +40,6 @@ public class SimplePOWEngine implements ProofOfWorkEngine {
mda.update(nonce);
mda.update(initialHash);
} while (Bytes.lt(target, mda.digest(mda.digest()), 8));
callback.onNonceCalculated(nonce);
callback.onNonceCalculated(initialHash, nonce);
}
}

View File

@ -130,9 +130,13 @@ public class Decode {
}
public static String varString(InputStream stream) throws IOException {
int length = (int) varInt(stream);
return varString(stream, null);
}
public static String varString(InputStream stream, AccessCounter counter) throws IOException {
int length = (int) varInt(stream, counter);
// FIXME: technically, it says the length in characters, but I think this one might be correct
// otherwise it will get complicated, as we'll need to read UTF-8 char by char...
return new String(bytes(stream, length), "utf-8");
return new String(bytes(stream, length, counter), "utf-8");
}
}

View File

@ -0,0 +1,10 @@
package ch.dissem.bitmessage.utils;
/**
* Created by chrig on 07.12.2015.
*/
public class Numbers {
public static long max(long a, long b) {
return a > b ? a : b;
}
}

View File

@ -43,7 +43,7 @@ public class ProofOfWorkEngineTest extends TestBase {
engine.calculateNonce(initialHash, target,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
waiter1.setValue(nonce);
}
});
@ -59,7 +59,7 @@ public class ProofOfWorkEngineTest extends TestBase {
engine.calculateNonce(initialHash2, target2,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
waiter2.setValue(nonce);
}
});

View File

@ -36,17 +36,20 @@ import static ch.dissem.bitmessage.utils.Singleton.security;
* @author Christian Basler
*/
public class CryptoCustomMessage<T extends Streamable> extends CustomMessage {
public static final String COMMAND = "ENCRYPTED";
private final Reader<T> dataReader;
private CryptoBox container;
private BitmessageAddress sender;
private T data;
public CryptoCustomMessage(T data) throws IOException {
super(COMMAND);
this.data = data;
this.dataReader = null;
}
private CryptoCustomMessage(CryptoBox container, Reader<T> dataReader) {
super(COMMAND);
this.container = container;
this.dataReader = dataReader;
}

View File

@ -22,6 +22,7 @@ import ch.dissem.bitmessage.entity.Plaintext;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.ports.MessageRepository;
import ch.dissem.bitmessage.utils.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -108,6 +109,20 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
return 0;
}
@Override
public Plaintext getMessage(byte[] initialHash) {
List<Plaintext> plaintexts = find("initial_hash=X'" + Strings.hex(initialHash) + "'");
switch (plaintexts.size()) {
case 0:
return null;
case 1:
return plaintexts.get(0);
default:
throw new RuntimeException("This shouldn't happen, found " + plaintexts.size() +
" messages, one or none was expected");
}
}
@Override
public List<Plaintext> findMessages(Label label) {
return find("id IN (SELECT message_id FROM Message_Label WHERE label_id=" + label.getId() + ")");
@ -141,8 +156,8 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
long id = rs.getLong("id");
builder.id(id);
builder.IV(new InventoryVector(iv));
builder.from(ctx.getAddressRepo().getAddress(rs.getString("sender")));
builder.to(ctx.getAddressRepo().getAddress(rs.getString("recipient")));
builder.from(ctx.getAddressRepository().getAddress(rs.getString("sender")));
builder.to(ctx.getAddressRepository().getAddress(rs.getString("recipient")));
builder.sent(rs.getLong("sent"));
builder.received(rs.getLong("received"));
builder.status(Plaintext.Status.valueOf(rs.getString("status")));
@ -173,12 +188,12 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
public void save(Plaintext message) {
// save from address if necessary
if (message.getId() == null) {
BitmessageAddress savedAddress = ctx.getAddressRepo().getAddress(message.getFrom().getAddress());
BitmessageAddress savedAddress = ctx.getAddressRepository().getAddress(message.getFrom().getAddress());
if (savedAddress == null || savedAddress.getPrivateKey() == null) {
if (savedAddress != null && savedAddress.getAlias() != null) {
message.getFrom().setAlias(savedAddress.getAlias());
}
ctx.getAddressRepo().save(message.getFrom());
ctx.getAddressRepository().save(message.getFrom());
}
}
@ -219,7 +234,7 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
private void insert(Connection connection, Plaintext message) throws SQLException, IOException {
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO Message (iv, type, sender, recipient, data, sent, received, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"INSERT INTO Message (iv, type, sender, recipient, data, sent, received, status, initial_hash) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
Statement.RETURN_GENERATED_KEYS);
ps.setBytes(1, message.getInventoryVector() != null ? message.getInventoryVector().getHash() : null);
ps.setString(2, message.getType().name());
@ -229,6 +244,7 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
ps.setLong(6, message.getSent());
ps.setLong(7, message.getReceived());
ps.setString(8, message.getStatus() != null ? message.getStatus().name() : null);
ps.setBytes(9, message.getInitialHash());
ps.executeUpdate();

View File

@ -0,0 +1,69 @@
package ch.dissem.bitmessage.repository;
import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.ports.ProofOfWorkRepository;
import ch.dissem.bitmessage.utils.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import static ch.dissem.bitmessage.utils.Singleton.security;
/**
* @author Christian Basler
*/
public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWorkRepository {
private static final Logger LOG = LoggerFactory.getLogger(JdbcProofOfWorkRepository.class);
public JdbcProofOfWorkRepository(JdbcConfig config) {
super(config);
}
@Override
public ObjectMessage getObject(byte[] initialHash) {
try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement("SELECT data, version FROM POW WHERE initial_hash=?");
ps.setBytes(1, initialHash);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
Blob data = rs.getBlob("data");
return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length());
} else {
throw new RuntimeException("Object requested that we don't have. Initial hash: " + Strings.hex(initialHash));
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) {
try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement("INSERT INTO POW (initial_hash, data, version) VALUES (?, ?, ?)");
ps.setBytes(1, security().getInitialHash(object));
writeBlob(ps, 2, object);
ps.setLong(3, object.getVersion());
ps.executeUpdate();
} catch (SQLException e) {
LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e);
throw new RuntimeException(e);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public void removeObject(ObjectMessage object) {
try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement("DELETE FROM POW WHERE initial_hash=?");
ps.setBytes(1, security().getInitialHash(object));
ps.executeUpdate();
} catch (SQLException e) {
LOG.debug(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,2 @@
ALTER TABLE Message ADD COLUMN initial_hash BINARY(64);
ALTER TABLE Message ADD CONSTRAINT initial_hash_unique UNIQUE(initial_hash);

View File

@ -0,0 +1,5 @@
CREATE TABLE POW (
initial_hash BINARY(64) PRIMARY KEY,
data BLOB NOT NULL,
version BIGINT NOT NULL
);

View File

@ -91,7 +91,7 @@ public class SecurityTest {
security.doProofOfWork(objectMessage, 1000, 1000,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
waiter.setValue(nonce);
}
});