The nonce is now set over a callback method in the POW engine. This should make some POW implementations easier.

This commit is contained in:
Christian Basler 2015-10-26 09:49:49 +01:00
parent bdc8e025c1
commit 36fe780766
9 changed files with 172 additions and 72 deletions

View File

@ -159,12 +159,13 @@ public class InternalContext {
return networkExtraBytes > extraBytes ? networkExtraBytes : extraBytes; return networkExtraBytes > extraBytes ? networkExtraBytes : extraBytes;
} }
public void send(BitmessageAddress from, BitmessageAddress to, ObjectPayload payload, long timeToLive, long nonceTrialsPerByte, long extraBytes) { public void send(final BitmessageAddress from, BitmessageAddress to, final ObjectPayload payload,
final long timeToLive, final long nonceTrialsPerByte, final long extraBytes) {
try { try {
if (to == null) to = from; if (to == null) to = from;
long expires = UnixTime.now(+timeToLive); long expires = UnixTime.now(+timeToLive);
LOG.info("Expires at " + expires); LOG.info("Expires at " + expires);
ObjectMessage object = new ObjectMessage.Builder() final ObjectMessage object = new ObjectMessage.Builder()
.stream(to.getStream()) .stream(to.getStream())
.expiresTime(expires) .expiresTime(expires)
.payload(payload) .payload(payload)
@ -178,26 +179,32 @@ public class InternalContext {
object.encrypt(to.getPubkey()); object.encrypt(to.getPubkey());
} }
messageCallback.proofOfWorkStarted(payload); messageCallback.proofOfWorkStarted(payload);
security.doProofOfWork(object, nonceTrialsPerByte, extraBytes); security.doProofOfWork(object, nonceTrialsPerByte, extraBytes,
messageCallback.proofOfWorkCompleted(payload); new ProofOfWorkEngine.Callback() {
if (payload instanceof PlaintextHolder) { @Override
Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext(); public void onNonceCalculated(byte[] nonce) {
plaintext.setInventoryVector(object.getInventoryVector()); object.setNonce(nonce);
messageRepository.save(plaintext); messageCallback.proofOfWorkCompleted(payload);
} if (payload instanceof PlaintextHolder) {
inventory.storeObject(object); Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext();
networkHandler.offer(object.getInventoryVector()); plaintext.setInventoryVector(object.getInventoryVector());
messageCallback.messageOffered(payload, object.getInventoryVector()); messageRepository.save(plaintext);
}
inventory.storeObject(object);
networkHandler.offer(object.getInventoryVector());
messageCallback.messageOffered(payload, object.getInventoryVector());
}
});
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
public void sendPubkey(BitmessageAddress identity, long targetStream) { public void sendPubkey(final BitmessageAddress identity, final long targetStream) {
try { try {
long expires = UnixTime.now(+28 * DAY); long expires = UnixTime.now(+28 * DAY);
LOG.info("Expires at " + expires); LOG.info("Expires at " + expires);
ObjectMessage response = new ObjectMessage.Builder() final ObjectMessage response = new ObjectMessage.Builder()
.stream(targetStream) .stream(targetStream)
.expiresTime(expires) .expiresTime(expires)
.payload(identity.getPubkey()) .payload(identity.getPubkey())
@ -205,31 +212,43 @@ public class InternalContext {
response.sign(identity.getPrivateKey()); response.sign(identity.getPrivateKey());
response.encrypt(security.createPublicKey(identity.getPublicDecryptionKey())); response.encrypt(security.createPublicKey(identity.getPublicDecryptionKey()));
messageCallback.proofOfWorkStarted(identity.getPubkey()); messageCallback.proofOfWorkStarted(identity.getPubkey());
security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes); security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes,
messageCallback.proofOfWorkCompleted(identity.getPubkey()); new ProofOfWorkEngine.Callback() {
inventory.storeObject(response); @Override
networkHandler.offer(response.getInventoryVector()); public void onNonceCalculated(byte[] nonce) {
// TODO: save that the pubkey was just sent, and on which stream! response.setNonce(nonce);
messageCallback.messageOffered(identity.getPubkey(), response.getInventoryVector()); messageCallback.proofOfWorkCompleted(identity.getPubkey());
inventory.storeObject(response);
networkHandler.offer(response.getInventoryVector());
// TODO: save that the pubkey was just sent, and on which stream!
messageCallback.messageOffered(identity.getPubkey(), response.getInventoryVector());
}
});
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
public void requestPubkey(BitmessageAddress contact) { public void requestPubkey(final BitmessageAddress contact) {
long expires = UnixTime.now(+2 * DAY); long expires = UnixTime.now(+2 * DAY);
LOG.info("Expires at " + expires); LOG.info("Expires at " + expires);
ObjectMessage response = new ObjectMessage.Builder() final ObjectMessage response = new ObjectMessage.Builder()
.stream(contact.getStream()) .stream(contact.getStream())
.expiresTime(expires) .expiresTime(expires)
.payload(new GetPubkey(contact)) .payload(new GetPubkey(contact))
.build(); .build();
messageCallback.proofOfWorkStarted(response.getPayload()); messageCallback.proofOfWorkStarted(response.getPayload());
security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes); security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes,
messageCallback.proofOfWorkCompleted(response.getPayload()); new ProofOfWorkEngine.Callback() {
inventory.storeObject(response); @Override
networkHandler.offer(response.getInventoryVector()); public void onNonceCalculated(byte[] nonce) {
messageCallback.messageOffered(response.getPayload(), response.getInventoryVector()); response.setNonce(nonce);
messageCallback.proofOfWorkCompleted(response.getPayload());
inventory.storeObject(response);
networkHandler.offer(response.getInventoryVector());
messageCallback.messageOffered(response.getPayload(), response.getInventoryVector());
}
});
} }
public long getClientNonce() { public long getClientNonce() {

View File

@ -93,7 +93,7 @@ public abstract class AbstractSecurity implements Security, InternalContext.Cont
} }
public void doProofOfWork(ObjectMessage object, long nonceTrialsPerByte, public void doProofOfWork(ObjectMessage object, long nonceTrialsPerByte,
long extraBytes) { long extraBytes, ProofOfWorkEngine.Callback callback) {
try { try {
if (nonceTrialsPerByte < 1000) nonceTrialsPerByte = 1000; if (nonceTrialsPerByte < 1000) nonceTrialsPerByte = 1000;
if (extraBytes < 1000) extraBytes = 1000; if (extraBytes < 1000) extraBytes = 1000;
@ -102,8 +102,7 @@ public abstract class AbstractSecurity implements Security, InternalContext.Cont
byte[] target = getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes); byte[] target = getProofOfWorkTarget(object, nonceTrialsPerByte, extraBytes);
byte[] nonce = context.getProofOfWorkEngine().calculateNonce(initialHash, target); context.getProofOfWorkEngine().calculateNonce(initialHash, target, callback);
object.setNonce(nonce);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -34,14 +34,15 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine {
private static Logger LOG = LoggerFactory.getLogger(MultiThreadedPOWEngine.class); private static Logger LOG = LoggerFactory.getLogger(MultiThreadedPOWEngine.class);
@Override @Override
public byte[] calculateNonce(byte[] initialHash, byte[] target) { public void calculateNonce(byte[] initialHash, byte[] target, Callback callback) {
callback = new CallbackWrapper(callback);
int cores = Runtime.getRuntime().availableProcessors(); int cores = Runtime.getRuntime().availableProcessors();
if (cores > 255) cores = 255; if (cores > 255) cores = 255;
LOG.info("Doing POW using " + cores + " cores"); LOG.info("Doing POW using " + cores + " cores");
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
List<Worker> workers = new ArrayList<>(cores); List<Worker> workers = new ArrayList<>(cores);
for (int i = 0; i < cores; i++) { for (int i = 0; i < cores; i++) {
Worker w = new Worker(workers, (byte) cores, i, initialHash, target); Worker w = new Worker(workers, (byte) cores, i, initialHash, target, callback);
workers.add(w); workers.add(w);
} }
for (Worker w : workers) { for (Worker w : workers) {
@ -49,30 +50,20 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine {
// if a worker finds a nonce while new ones are still being added. // if a worker finds a nonce while new ones are still being added.
w.start(); w.start();
} }
for (Worker w : workers) {
try {
w.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (w.isSuccessful()) {
LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - time) / 1000) + " seconds");
return w.getNonce();
}
}
throw new RuntimeException("All workers ended without yielding a nonce - something is seriously broken!");
} }
private static class Worker extends Thread { private static class Worker extends Thread {
private final Callback callback;
private final byte numberOfCores; private final byte numberOfCores;
private final List<Worker> workers; private final List<Worker> workers;
private final byte[] initialHash; private final byte[] initialHash;
private final byte[] target; private final byte[] target;
private final MessageDigest mda; private final MessageDigest mda;
private final byte[] nonce = new byte[8]; private final byte[] nonce = new byte[8];
private boolean successful = false;
public Worker(List<Worker> workers, byte numberOfCores, int core, byte[] initialHash, byte[] target) { public Worker(List<Worker> workers, byte numberOfCores, int core, byte[] initialHash, byte[] target,
Callback callback) {
this.callback = callback;
this.numberOfCores = numberOfCores; this.numberOfCores = numberOfCores;
this.workers = workers; this.workers = workers;
this.initialHash = initialHash; this.initialHash = initialHash;
@ -86,14 +77,6 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine {
} }
} }
public boolean isSuccessful() {
return successful;
}
public byte[] getNonce() {
return nonce;
}
@Override @Override
public void run() { public void run() {
do { do {
@ -101,13 +84,42 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine {
mda.update(nonce); mda.update(nonce);
mda.update(initialHash); mda.update(initialHash);
if (!Bytes.lt(target, mda.digest(mda.digest()), 8)) { if (!Bytes.lt(target, mda.digest(mda.digest()), 8)) {
successful = true; synchronized (callback) {
for (Worker w : workers) { if (!Thread.interrupted()) {
w.interrupt(); try {
callback.onNonceCalculated(nonce);
} finally {
for (Worker w : workers) {
w.interrupt();
}
}
}
} }
return; return;
} }
} while (!Thread.interrupted()); } while (!Thread.interrupted());
} }
} }
public static class CallbackWrapper implements Callback {
private final Callback callback;
private final long startTime;
private boolean waiting = true;
public CallbackWrapper(Callback callback) {
this.startTime = System.currentTimeMillis();
this.callback = callback;
}
@Override
public void onNonceCalculated(byte[] nonce) {
synchronized (this) {
if (waiting) {
LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
waiting = false;
callback.onNonceCalculated(nonce);
}
}
}
}
} }

View File

@ -24,9 +24,17 @@ public interface ProofOfWorkEngine {
* Returns a nonce, such that the first 8 bytes from sha512(sha512(nonce||initialHash)) represent a unsigned long * Returns a nonce, such that the first 8 bytes from sha512(sha512(nonce||initialHash)) represent a unsigned long
* smaller than target. * smaller than target.
* *
* @param initialHash the SHA-512 hash of the object to send, sans nonce * @param initialHash the SHA-512 hash of the object to send, sans nonce
* @param target the target, representing an unsigned long * @param target the target, representing an unsigned long
* @return 8 bytes nonce * @param callback called with the calculated nonce as argument. The ProofOfWorkEngine implementation must make
* sure this is only called once.
*/ */
byte[] calculateNonce(byte[] initialHash, byte[] target); void calculateNonce(byte[] initialHash, byte[] target, Callback callback);
interface Callback {
/**
* @param nonce 8 bytes nonce
*/
void onNonceCalculated(byte[] nonce);
}
} }

View File

@ -120,9 +120,10 @@ public interface Security {
* @param object to do the proof of work for * @param object to do the proof of work for
* @param nonceTrialsPerByte difficulty * @param nonceTrialsPerByte difficulty
* @param extraBytes bytes to add to the object size (makes it more difficult to send small messages) * @param extraBytes bytes to add to the object size (makes it more difficult to send small messages)
* @param callback to handle nonce once it's calculated
*/ */
void doProofOfWork(ObjectMessage object, long nonceTrialsPerByte, void doProofOfWork(ObjectMessage object, long nonceTrialsPerByte,
long extraBytes); long extraBytes, ProofOfWorkEngine.Callback callback);
/** /**
* @param object to be checked * @param object to be checked
@ -143,7 +144,6 @@ public interface Security {
byte[] mac(byte[] key_m, byte[] data); byte[] mac(byte[] key_m, byte[] data);
/** /**
*
* @param encrypt if true, encrypts data, otherwise tries to decrypt it. * @param encrypt if true, encrypts data, otherwise tries to decrypt it.
* @param data * @param data
* @param key_e * @param key_e

View File

@ -27,7 +27,7 @@ import static ch.dissem.bitmessage.utils.Bytes.inc;
*/ */
public class SimplePOWEngine implements ProofOfWorkEngine { public class SimplePOWEngine implements ProofOfWorkEngine {
@Override @Override
public byte[] calculateNonce(byte[] initialHash, byte[] target) { public void calculateNonce(byte[] initialHash, byte[] target, Callback callback) {
byte[] nonce = new byte[8]; byte[] nonce = new byte[8];
MessageDigest mda; MessageDigest mda;
try { try {
@ -40,6 +40,6 @@ public class SimplePOWEngine implements ProofOfWorkEngine {
mda.update(nonce); mda.update(nonce);
mda.update(initialHash); mda.update(initialHash);
} while (Bytes.lt(target, mda.digest(mda.digest()), 8)); } while (Bytes.lt(target, mda.digest(mda.digest()), 8));
return nonce; callback.onNonceCalculated(nonce);
} }
} }

View File

@ -0,0 +1,41 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
/**
* Waits for a value within a callback method to be set.
*/
public class CallbackWaiter<T> {
private volatile boolean isSet;
private volatile T value;
public void setValue(T value) {
synchronized (this) {
this.isSet = true;
this.value = value;
}
}
public T waitForValue() throws InterruptedException {
while (!isSet) {
Thread.sleep(100);
}
synchronized (this) {
return value;
}
}
}

View File

@ -17,29 +17,40 @@
package ch.dissem.bitmessage.ports; package ch.dissem.bitmessage.ports;
import ch.dissem.bitmessage.utils.Bytes; import ch.dissem.bitmessage.utils.Bytes;
import ch.dissem.bitmessage.utils.CallbackWaiter;
import ch.dissem.bitmessage.utils.TestBase;
import org.junit.Test; import org.junit.Test;
import static ch.dissem.bitmessage.utils.Singleton.security; import static ch.dissem.bitmessage.utils.Singleton.security;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class ProofOfWorkEngineTest { public class ProofOfWorkEngineTest extends TestBase {
@Test @Test
public void testSimplePOWEngine() { public void testSimplePOWEngine() throws InterruptedException {
testPOW(new SimplePOWEngine()); testPOW(new SimplePOWEngine());
} }
@Test @Test
public void testThreadedPOWEngine() { public void testThreadedPOWEngine() throws InterruptedException {
testPOW(new MultiThreadedPOWEngine()); testPOW(new MultiThreadedPOWEngine());
} }
private void testPOW(ProofOfWorkEngine engine) { private void testPOW(ProofOfWorkEngine engine) throws InterruptedException {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
byte[] initialHash = security().sha512(new byte[]{1, 3, 6, 4}); byte[] initialHash = security().sha512(new byte[]{1, 3, 6, 4});
byte[] target = {0, 0, -1, -1, -1, -1, -1, -1}; byte[] target = {0, 0, -1, -1, -1, -1, -1, -1};
byte[] nonce = engine.calculateNonce(initialHash, target); final CallbackWaiter<byte[]> waiter = new CallbackWaiter<>();
engine.calculateNonce(initialHash, target,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
waiter.setValue(nonce);
}
});
byte[] nonce = waiter.waitForValue();
System.out.println("Calculating nonce took " + (System.currentTimeMillis() - time) + "ms"); System.out.println("Calculating nonce took " + (System.currentTimeMillis() - time) + "ms");
assertTrue(Bytes.lt(security().doubleSha512(nonce, initialHash), target, 8)); assertTrue(Bytes.lt(security().doubleSha512(nonce, initialHash), target, 8));
} }
} }

View File

@ -4,7 +4,9 @@ import ch.dissem.bitmessage.InternalContext;
import ch.dissem.bitmessage.entity.ObjectMessage; import ch.dissem.bitmessage.entity.ObjectMessage;
import ch.dissem.bitmessage.entity.payload.GenericPayload; import ch.dissem.bitmessage.entity.payload.GenericPayload;
import ch.dissem.bitmessage.ports.MultiThreadedPOWEngine; import ch.dissem.bitmessage.ports.MultiThreadedPOWEngine;
import ch.dissem.bitmessage.ports.ProofOfWorkEngine;
import ch.dissem.bitmessage.security.bc.BouncySecurity; import ch.dissem.bitmessage.security.bc.BouncySecurity;
import ch.dissem.bitmessage.utils.CallbackWaiter;
import ch.dissem.bitmessage.utils.Singleton; import ch.dissem.bitmessage.utils.Singleton;
import ch.dissem.bitmessage.utils.UnixTime; import ch.dissem.bitmessage.utils.UnixTime;
import org.junit.Test; import org.junit.Test;
@ -78,14 +80,22 @@ public class SecurityTest {
} }
@Test @Test
public void testDoProofOfWork() throws IOException { public void testDoProofOfWork() throws Exception {
ObjectMessage objectMessage = new ObjectMessage.Builder() ObjectMessage objectMessage = new ObjectMessage.Builder()
.nonce(new byte[8]) .nonce(new byte[8])
.expiresTime(UnixTime.now(+2 * DAY)) .expiresTime(UnixTime.now(+2 * DAY))
.objectType(0) .objectType(0)
.payload(GenericPayload.read(0, new ByteArrayInputStream(new byte[0]), 1, 0)) .payload(GenericPayload.read(0, new ByteArrayInputStream(new byte[0]), 1, 0))
.build(); .build();
security.doProofOfWork(objectMessage, 1000, 1000); final CallbackWaiter<byte[]> waiter = new CallbackWaiter<>();
security.doProofOfWork(objectMessage, 1000, 1000,
new ProofOfWorkEngine.Callback() {
@Override
public void onNonceCalculated(byte[] nonce) {
waiter.setValue(nonce);
}
});
objectMessage.setNonce(waiter.waitForValue());
security.checkProofOfWork(objectMessage, 1000, 1000); security.checkProofOfWork(objectMessage, 1000, 1000);
} }
} }