Some POW improvements

This commit is contained in:
Christian Basler 2015-12-18 16:42:17 +01:00
parent 51bf3b8bd2
commit 61788802c5
8 changed files with 125 additions and 19 deletions

View File

@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*; import java.util.concurrent.*;
import static ch.dissem.bitmessage.entity.Plaintext.Status.*; import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
@ -72,6 +74,13 @@ public class BitmessageContext {
// As this thread is used for parts that do POW, which itself uses parallel threads, only // As this thread is used for parts that do POW, which itself uses parallel threads, only
// one should be executed at any time. // one should be executed at any time.
pool = Executors.newFixedThreadPool(1); pool = Executors.newFixedThreadPool(1);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
ctx.getProofOfWorkService().doMissingProofOfWork();
}
}, 30_000); // After 30 seconds
} }
public AddressRepository addresses() { public AddressRepository addresses() {
@ -206,6 +215,19 @@ public class BitmessageContext {
} }
} }
/**
* Send a custom message to a specific node (that should implement handling for this message type) and returns
* the response, which in turn is expected to be a {@link CustomMessage}.
*
* @param server the node's address
* @param port the node's port
* @param request the request
* @return the response
*/
public CustomMessage send(InetAddress server, int port, CustomMessage request) {
return ctx.getNetworkHandler().send(server, port, request);
}
public void cleanup() { public void cleanup() {
ctx.getInventory().cleanup(); ctx.getInventory().cleanup();
} }
@ -276,6 +298,14 @@ public class BitmessageContext {
); );
} }
/**
* Returns the {@link InternalContext} - normally you wouldn't need it,
* unless you are doing something crazy with the protocol.
*/
public InternalContext internals() {
return ctx;
}
public interface Listener { public interface Listener {
void receive(Plaintext plaintext); void receive(Plaintext plaintext);
} }

View File

@ -140,6 +140,10 @@ public class InternalContext {
return proofOfWorkEngine; return proofOfWorkEngine;
} }
public ProofOfWorkService getProofOfWorkService() {
return proofOfWorkService;
}
public long[] getStreams() { public long[] getStreams() {
long[] result = new long[streams.size()]; long[] result = new long[streams.size()];
int i = 0; int i = 0;

View File

@ -20,6 +20,13 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
private ProofOfWorkRepository powRepo; private ProofOfWorkRepository powRepo;
private MessageRepository messageRepo; private MessageRepository messageRepo;
public void doMissingProofOfWork() {
for (byte[] initialHash : powRepo.getItems()) {
ProofOfWorkRepository.Item item = powRepo.getItem(initialHash);
security.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes, this);
}
}
public void doProofOfWork(ObjectMessage object) { public void doProofOfWork(ObjectMessage object) {
doProofOfWork(null, object); doProofOfWork(null, object);
} }
@ -29,7 +36,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
long extraBytes = recipient == null ? 0 : recipient.getPubkey().getExtraBytes(); long extraBytes = recipient == null ? 0 : recipient.getPubkey().getExtraBytes();
powRepo.putObject(object, nonceTrialsPerByte, extraBytes); powRepo.putObject(object, nonceTrialsPerByte, extraBytes);
if (object.getPayload() instanceof PlaintextHolder){ if (object.getPayload() instanceof PlaintextHolder) {
Plaintext plaintext = ((PlaintextHolder) object.getPayload()).getPlaintext(); Plaintext plaintext = ((PlaintextHolder) object.getPayload()).getPlaintext();
plaintext.setInitialHash(security.getInitialHash(object)); plaintext.setInitialHash(security.getInitialHash(object));
messageRepo.save(plaintext); messageRepo.save(plaintext);
@ -39,7 +46,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
@Override @Override
public void onNonceCalculated(byte[] initialHash, byte[] nonce) { public void onNonceCalculated(byte[] initialHash, byte[] nonce) {
ObjectMessage object = powRepo.getObject(initialHash); ObjectMessage object = powRepo.getItem(initialHash).object;
object.setNonce(nonce); object.setNonce(nonce);
// messageCallback.proofOfWorkCompleted(payload); // messageCallback.proofOfWorkCompleted(payload);
Plaintext plaintext = messageRepo.getMessage(initialHash); Plaintext plaintext = messageRepo.getMessage(initialHash);
@ -48,6 +55,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC
messageRepo.save(plaintext); messageRepo.save(plaintext);
} }
ctx.getInventory().storeObject(object); ctx.getInventory().storeObject(object);
ctx.getProofOfWorkRepository().removeObject(initialHash);
ctx.getNetworkHandler().offer(object.getInventoryVector()); ctx.getNetworkHandler().offer(object.getInventoryVector());
// messageCallback.messageOffered(payload, object.getInventoryVector()); // messageCallback.messageOffered(payload, object.getInventoryVector());
} }

View File

@ -53,13 +53,21 @@ public class CustomMessage implements MessagePayload {
return Command.CUSTOM; return Command.CUSTOM;
} }
public byte[] getData() throws IOException { public String getCustomCommand() {
return command;
}
public byte[] getData() {
if (data != null) { if (data != null) {
return data; return data;
} else { } else {
ByteArrayOutputStream out = new ByteArrayOutputStream(); try {
write(out); ByteArrayOutputStream out = new ByteArrayOutputStream();
return out.toByteArray(); write(out);
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -2,15 +2,31 @@ package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.ObjectMessage;
import java.util.List;
/** /**
* Objects that proof of work is currently being done for. * Objects that proof of work is currently being done for.
* *
* @author Christian Basler * @author Christian Basler
*/ */
public interface ProofOfWorkRepository { public interface ProofOfWorkRepository {
ObjectMessage getObject(byte[] initialHash); Item getItem(byte[] initialHash);
List<byte[]> getItems();
void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes); void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes);
void removeObject(ObjectMessage object); void removeObject(byte[] initialHash);
class Item {
public final ObjectMessage object;
public final long nonceTrialsPerByte;
public final long extraBytes;
public Item(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) {
this.object = object;
this.nonceTrialsPerByte = nonceTrialsPerByte;
this.extraBytes = extraBytes;
}
}
} }

View File

@ -18,13 +18,13 @@ package ch.dissem.bitmessage.extensions.pow;
import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.BitmessageAddress;
import ch.dissem.bitmessage.entity.Streamable; import ch.dissem.bitmessage.entity.Streamable;
import ch.dissem.bitmessage.extensions.CryptoCustomMessage;
import ch.dissem.bitmessage.utils.Encode; import ch.dissem.bitmessage.utils.Encode;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import static ch.dissem.bitmessage.extensions.pow.ProofOfWorkRequest.Request.CALCULATE;
import static ch.dissem.bitmessage.utils.Decode.*; import static ch.dissem.bitmessage.utils.Decode.*;
/** /**
@ -34,6 +34,7 @@ public class ProofOfWorkRequest implements Streamable {
private final BitmessageAddress sender; private final BitmessageAddress sender;
private final byte[] initialHash; private final byte[] initialHash;
private final Request request; private final Request request;
private final byte[] data; private final byte[] data;
public ProofOfWorkRequest(BitmessageAddress sender, byte[] initialHash, Request request) { public ProofOfWorkRequest(BitmessageAddress sender, byte[] initialHash, Request request) {
@ -79,10 +80,23 @@ public class ProofOfWorkRequest implements Streamable {
Encode.varBytes(data, out); Encode.varBytes(data, out);
} }
public static class Reader implements CryptoCustomMessage.Reader<ProofOfWorkRequest> {
private final BitmessageAddress identity;
public Reader(BitmessageAddress identity) {
this.identity = identity;
}
@Override
public ProofOfWorkRequest read(BitmessageAddress sender, InputStream in) throws IOException {
return ProofOfWorkRequest.read(identity, in);
}
}
public enum Request { public enum Request {
CALCULATE, CALCULATE,
CALCULATING, CALCULATING,
QUERY,
COMPLETE COMPLETE
} }
} }

View File

@ -8,6 +8,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.*; import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import static ch.dissem.bitmessage.utils.Singleton.security; import static ch.dissem.bitmessage.utils.Singleton.security;
@ -22,14 +24,18 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork
} }
@Override @Override
public ObjectMessage getObject(byte[] initialHash) { public Item getItem(byte[] initialHash) {
try (Connection connection = config.getConnection()) { try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement("SELECT data, version FROM POW WHERE initial_hash=?"); PreparedStatement ps = connection.prepareStatement("SELECT data, version, nonce_trials_per_byte, extra_bytes FROM POW WHERE initial_hash=?");
ps.setBytes(1, initialHash); ps.setBytes(1, initialHash);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
if (rs.next()) { if (rs.next()) {
Blob data = rs.getBlob("data"); Blob data = rs.getBlob("data");
return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()); return new Item(
Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()),
rs.getLong("nonce_trials_per_byte"),
rs.getLong("extra_bytes")
);
} else { } else {
throw new RuntimeException("Object requested that we don't have. Initial hash: " + Strings.hex(initialHash)); throw new RuntimeException("Object requested that we don't have. Initial hash: " + Strings.hex(initialHash));
} }
@ -39,13 +45,31 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork
} }
} }
@Override
public List<byte[]> getItems() {
try (Connection connection = config.getConnection()) {
List<byte[]> result = new LinkedList<>();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT initial_hash FROM POW");
while (rs.next()) {
result.add(rs.getBytes("initial_hash"));
}
return result;
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override @Override
public void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) { public void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) {
try (Connection connection = config.getConnection()) { try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement("INSERT INTO POW (initial_hash, data, version) VALUES (?, ?, ?)"); PreparedStatement ps = connection.prepareStatement("INSERT INTO POW (initial_hash, data, version, nonce_trials_per_byte, extra_bytes) VALUES (?, ?, ?, ?, ?)");
ps.setBytes(1, security().getInitialHash(object)); ps.setBytes(1, security().getInitialHash(object));
writeBlob(ps, 2, object); writeBlob(ps, 2, object);
ps.setLong(3, object.getVersion()); ps.setLong(3, object.getVersion());
ps.setLong(4, nonceTrialsPerByte);
ps.setLong(5, extraBytes);
ps.executeUpdate(); ps.executeUpdate();
} catch (SQLException e) { } catch (SQLException e) {
LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e);
@ -57,10 +81,10 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork
} }
@Override @Override
public void removeObject(ObjectMessage object) { public void removeObject(byte[] initialHash) {
try (Connection connection = config.getConnection()) { try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement("DELETE FROM POW WHERE initial_hash=?"); PreparedStatement ps = connection.prepareStatement("DELETE FROM POW WHERE initial_hash=?");
ps.setBytes(1, security().getInitialHash(object)); ps.setBytes(1, initialHash);
ps.executeUpdate(); ps.executeUpdate();
} catch (SQLException e) { } catch (SQLException e) {
LOG.debug(e.getMessage(), e); LOG.debug(e.getMessage(), e);

View File

@ -1,5 +1,7 @@
CREATE TABLE POW ( CREATE TABLE POW (
initial_hash BINARY(64) PRIMARY KEY, initial_hash BINARY(64) PRIMARY KEY,
data BLOB NOT NULL, data BLOB NOT NULL,
version BIGINT NOT NULL version BIGINT NOT NULL,
nonce_trials_per_byte BIGINT NOT NULL,
extra_bytes BIGINT NOT NULL
); );