diff --git a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index 842611b..c2fb84f 100644 --- a/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -64,12 +64,9 @@ public class BitmessageContext { public static final int CURRENT_VERSION = 3; private final static Logger LOG = LoggerFactory.getLogger(BitmessageContext.class); - private final ExecutorService pool; - private final InternalContext ctx; private final Labeler labeler; - private final Listener listener; private final NetworkHandler.MessageListener networkListener; private final boolean sendPubkeyOnIdentityCreation; @@ -77,12 +74,7 @@ public class BitmessageContext { private BitmessageContext(Builder builder) { ctx = new InternalContext(builder); labeler = builder.labeler; - listener = builder.listener; - networkListener = new DefaultMessageListener(ctx, labeler, listener); - - // As this thread is used for parts that do POW, which itself uses parallel threads, only - // one should be executed at any time. - pool = Executors.newFixedThreadPool(1); + networkListener = new DefaultMessageListener(ctx, labeler, builder.listener); sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation; @@ -116,12 +108,7 @@ public class BitmessageContext { )); ctx.getAddressRepository().save(identity); if (sendPubkeyOnIdentityCreation) { - pool.submit(new Runnable() { - @Override - public void run() { - ctx.sendPubkey(identity, identity.getStream()); - } - }); + ctx.sendPubkey(identity, identity.getStream()); } return identity; } @@ -177,35 +164,33 @@ public class BitmessageContext { if (msg.getFrom() == null || msg.getFrom().getPrivateKey() == null) { throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key."); } - pool.submit(new Runnable() { - @Override - public void run() { - BitmessageAddress to = msg.getTo(); - if (to != null) { - if (to.getPubkey() == null) { - LOG.info("Public key is missing from recipient. Requesting."); - ctx.requestPubkey(to); - } - if (to.getPubkey() == null) { - msg.setStatus(PUBKEY_REQUESTED); - msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); - ctx.getMessageRepository().save(msg); - } - } - if (to == null || to.getPubkey() != null) { - LOG.info("Sending message."); - msg.setStatus(DOING_PROOF_OF_WORK); - msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); - ctx.getMessageRepository().save(msg); - ctx.send( - msg.getFrom(), - to, - wrapInObjectPayload(msg), - TTL.msg() - ); - } + BitmessageAddress to = msg.getTo(); + if (to != null) { + if (to.getPubkey() == null) { + LOG.info("Public key is missing from recipient. Requesting."); + ctx.requestPubkey(to); } - }); + if (to.getPubkey() == null) { + msg.setStatus(PUBKEY_REQUESTED); + msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); + ctx.getMessageRepository().save(msg); + } + } + if (to == null || to.getPubkey() != null) { + LOG.info("Sending message."); + msg.setStatus(DOING_PROOF_OF_WORK); + msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); + ctx.getMessageRepository().save(msg); + ctx.send( + msg.getFrom(), + to, + wrapInObjectPayload(msg), + TTL.msg() + ); + msg.setStatus(SENT); + msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT)); + ctx.getMessageRepository().save(msg); + } } private ObjectPayload wrapInObjectPayload(Plaintext msg) { @@ -425,6 +410,8 @@ public class BitmessageContext { * sender can't receive your public key) in some special situations. Also note that it's probably * not a good idea to set it too low. *

+ * + * @deprecated use {@link TTL#pubkey(long)} instead. */ public Builder pubkeyTTL(long days) { if (days < 0 || days > 28 * DAY) throw new IllegalArgumentException("TTL must be between 1 and 28 days"); diff --git a/core/src/main/java/ch/dissem/bitmessage/ports/MultiThreadedPOWEngine.java b/core/src/main/java/ch/dissem/bitmessage/ports/MultiThreadedPOWEngine.java index 50164a3..fb45e50 100644 --- a/core/src/main/java/ch/dissem/bitmessage/ports/MultiThreadedPOWEngine.java +++ b/core/src/main/java/ch/dissem/bitmessage/ports/MultiThreadedPOWEngine.java @@ -25,16 +25,18 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Semaphore; +import java.util.concurrent.*; import static ch.dissem.bitmessage.utils.Bytes.inc; +import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; /** * A POW engine using all available CPU cores. */ public class MultiThreadedPOWEngine implements ProofOfWorkEngine { private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedPOWEngine.class); - private static final Semaphore semaphore = new Semaphore(1, true); + private final ExecutorService waiterPool = Executors.newSingleThreadExecutor(pool("POW-waiter").daemon().build()); + private final ExecutorService workerPool = Executors.newCachedThreadPool(pool("POW-worker").daemon().build()); /** * This method will block until all pending nonce calculations are done, but not wait for its own calculation @@ -46,42 +48,59 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine { * @param callback called with the calculated nonce as argument. The ProofOfWorkEngine implementation must make */ @Override - public void calculateNonce(byte[] initialHash, byte[] target, Callback callback) { - try { - semaphore.acquire(); - } catch (InterruptedException e) { - throw new ApplicationException(e); - } - callback = new CallbackWrapper(callback); - int cores = Runtime.getRuntime().availableProcessors(); - if (cores > 255) cores = 255; - LOG.info("Doing POW using " + cores + " cores"); - List workers = new ArrayList<>(cores); - for (int i = 0; i < cores; i++) { - Worker w = new Worker(workers, (byte) cores, i, initialHash, target, callback); - workers.add(w); - } - for (Worker w : workers) { - // Doing this in the previous loop might cause a ConcurrentModificationException in the worker - // if a worker finds a nonce while new ones are still being added. - w.start(); - } + public void calculateNonce(final byte[] initialHash, final byte[] target, final Callback callback) { + waiterPool.execute(new Runnable() { + @Override + public void run() { + long startTime = System.currentTimeMillis(); + + int cores = Runtime.getRuntime().availableProcessors(); + if (cores > 255) cores = 255; + LOG.info("Doing POW using " + cores + " cores"); + List workers = new ArrayList<>(cores); + for (int i = 0; i < cores; i++) { + Worker w = new Worker((byte) cores, i, initialHash, target); + workers.add(w); + } + List> futures = new ArrayList<>(cores); + for (Worker w : workers) { + // Doing this in the previous loop might cause a ConcurrentModificationException in the worker + // if a worker finds a nonce while new ones are still being added. + futures.add(workerPool.submit(w)); + } + try { + while (!Thread.interrupted()) { + for (Future future : futures) { + if (future.isDone()) { + callback.onNonceCalculated(initialHash, future.get()); + LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); + for (Future f : futures) { + f.cancel(true); + } + return; + } + } + Thread.sleep(100); + } + LOG.error("POW waiter thread interrupted - this should not happen!"); + } catch (ExecutionException e) { + LOG.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOG.error("POW waiter thread interrupted - this should not happen!", e); + } + } + }); } - private static class Worker extends Thread { - private final Callback callback; + private class Worker implements Callable { private final byte numberOfCores; - private final List workers; private final byte[] initialHash; private final byte[] target; private final MessageDigest mda; private final byte[] nonce = new byte[8]; - public Worker(List workers, byte numberOfCores, int core, byte[] initialHash, byte[] target, - Callback callback) { - this.callback = callback; + Worker(byte numberOfCores, int core, byte[] initialHash, byte[] target) { this.numberOfCores = numberOfCores; - this.workers = workers; this.initialHash = initialHash; this.target = target; this.nonce[7] = (byte) core; @@ -94,49 +113,16 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine { } @Override - public void run() { + public byte[] call() throws Exception { do { inc(nonce, numberOfCores); mda.update(nonce); mda.update(initialHash); if (!Bytes.lt(target, mda.digest(mda.digest()), 8)) { - synchronized (callback) { - if (!Thread.interrupted()) { - for (Worker w : workers) { - w.interrupt(); - } - // Clear interrupted flag for callback - Thread.interrupted(); - callback.onNonceCalculated(initialHash, nonce); - } - } - return; + return nonce; } } while (!Thread.interrupted()); - } - } - - public static class CallbackWrapper implements Callback { - private final Callback callback; - private final long startTime; - private boolean waiting = true; - - public CallbackWrapper(Callback callback) { - this.startTime = System.currentTimeMillis(); - this.callback = callback; - } - - @Override - public void onNonceCalculated(byte[] initialHash, byte[] nonce) { - // Prevents the callback from being called twice if two nonces are found simultaneously - synchronized (this) { - if (waiting) { - semaphore.release(); - LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); - waiting = false; - callback.onNonceCalculated(initialHash, nonce); - } - } + return null; } } } diff --git a/core/src/main/java/ch/dissem/bitmessage/utils/ThreadFactoryBuilder.java b/core/src/main/java/ch/dissem/bitmessage/utils/ThreadFactoryBuilder.java new file mode 100644 index 0000000..36cf5b9 --- /dev/null +++ b/core/src/main/java/ch/dissem/bitmessage/utils/ThreadFactoryBuilder.java @@ -0,0 +1,65 @@ +/* + * Copyright 2016 Christian Basler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.dissem.bitmessage.utils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class ThreadFactoryBuilder { + private final String namePrefix; + private int prio = Thread.NORM_PRIORITY; + private boolean daemon = false; + + private ThreadFactoryBuilder(String pool) { + this.namePrefix = pool + "-thread-"; + } + + + public static ThreadFactoryBuilder pool(String name) { + return new ThreadFactoryBuilder(name); + } + + public ThreadFactoryBuilder lowPrio() { + prio = Thread.MIN_PRIORITY; + return this; + } + + public ThreadFactoryBuilder daemon() { + daemon = true; + return this; + } + + public ThreadFactory build() { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + + return new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + t.setPriority(prio); + t.setDaemon(daemon); + return t; + } + }; + } +} diff --git a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java index 43333ac..289aa79 100644 --- a/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java +++ b/networking/src/main/java/ch/dissem/bitmessage/networking/DefaultNetworkHandler.java @@ -28,8 +28,7 @@ import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.ports.NetworkHandler; import ch.dissem.bitmessage.utils.Collections; import ch.dissem.bitmessage.utils.Property; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import ch.dissem.bitmessage.utils.ThreadFactoryBuilder; import java.io.IOException; import java.net.InetAddress; @@ -40,35 +39,27 @@ import java.util.concurrent.*; import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; import static ch.dissem.bitmessage.utils.DebugUtils.inc; +import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; import static java.util.Collections.newSetFromMap; /** * Handles all the networky stuff. */ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { - private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class); - public final static int NETWORK_MAGIC_NUMBER = 8; final Collection connections = new ConcurrentLinkedQueue<>(); - private final ExecutorService pool; + private final ExecutorService pool = Executors.newCachedThreadPool( + pool("network") + .lowPrio() + .daemon() + .build()); private InternalContext ctx; private ServerRunnable server; private volatile boolean running; final Set requestedObjects = newSetFromMap(new ConcurrentHashMap(50_000)); - public DefaultNetworkHandler() { - pool = Executors.newCachedThreadPool(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = Executors.defaultThreadFactory().newThread(r); - thread.setPriority(Thread.MIN_PRIORITY); - return thread; - } - }); - } - @Override public void setContext(InternalContext context) { this.ctx = context;