Improved connection management, preventing multiple connections to the same node, and improved broadcast handling.

This commit is contained in:
Christian Basler 2015-06-12 06:57:20 +02:00
parent fe93c95f40
commit bd5bf76904
9 changed files with 172 additions and 20 deletions

View File

@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static ch.dissem.bitmessage.entity.Plaintext.Status.*;
@ -109,7 +110,6 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
ctx.getAddressRepo().save(address);
}
} catch (DecryptionFailedException ignore) {
LOG.debug(ignore.getMessage(), ignore);
}
}
@ -129,21 +129,25 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
}
break;
} catch (DecryptionFailedException ignore) {
LOG.trace(ignore.getMessage(), ignore);
}
}
}
protected void receive(ObjectMessage object, Broadcast broadcast) throws IOException {
// TODO this should work fine as-is, but checking the tag might be more efficient
// V5Broadcast v5 = broadcast instanceof V5Broadcast ? (V5Broadcast) broadcast : null;
for (BitmessageAddress subscription : ctx.getAddressRepo().getSubscriptions()) {
byte[] tag = broadcast instanceof V5Broadcast ? ((V5Broadcast) broadcast).getTag() : null;
for (BitmessageAddress subscription : ctx.getAddressRepo().getSubscriptions(broadcast.getVersion())) {
if (tag != null && !Arrays.equals(tag, subscription.getTag())) {
continue;
}
try {
broadcast.decrypt(subscription.getPublicDecryptionKey());
if (!object.isSignatureValid(broadcast.getPlaintext().getFrom().getPubkey())) {
LOG.warn("Broadcast with IV " + object.getInventoryVector() + " was successfully decrypted, but signature check failed. Ignoring.");
} else {
broadcast.getPlaintext().setStatus(RECEIVED);
broadcast.getPlaintext().addLabels(ctx.getMessageRepository().getLabels(Label.Type.INBOX, Label.Type.UNREAD));
broadcast.getPlaintext().setInventoryVector(object.getInventoryVector());
ctx.getMessageRepository().save(broadcast.getPlaintext());
listener.receive(broadcast.getPlaintext());
}
} catch (DecryptionFailedException ignore) {

View File

@ -31,6 +31,9 @@ public interface AddressRepository {
List<BitmessageAddress> getIdentities();
List<BitmessageAddress> getSubscriptions();
List<BitmessageAddress> getSubscriptions(long broadcastVersion);
/**
* Returns all Bitmessage addresses that have no private key.
*/

View File

@ -0,0 +1,57 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
public class Collections {
private final static Random RANDOM = new Random();
/**
* Returns a random subset of the given collection, or a copy of the collection if it's not larger than count.
* The randomness
*/
public static <T> List<T> selectRandom(int count, Collection<T> collection) {
ArrayList<T> result = new ArrayList<>(count);
if (collection.size() <= count) {
result.addAll(collection);
} else {
double collectionRest = collection.size();
double resultRest = count;
int skipMax = (int) Math.ceil(collectionRest / resultRest);
int skip = RANDOM.nextInt(skipMax);
for (T item : collection) {
collectionRest--;
if (skip > 0) {
skip--;
} else {
result.add(item);
resultRest--;
if (resultRest == 0){
break;
}
skipMax = (int) Math.ceil(collectionRest / resultRest);
skip = RANDOM.nextInt(skipMax);
}
}
}
return result;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2015 Christian Basler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.dissem.bitmessage.utils;
import org.junit.Test;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class CollectionsTest {
@Test
public void ensureSelectRandomReturnsMaximumPossibleItems() throws Exception {
List<Integer> list = new LinkedList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
assertEquals(9, Collections.selectRandom(9, list).size());
}
}

View File

@ -29,10 +29,13 @@ import ch.dissem.bitmessage.utils.Security;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -161,7 +164,7 @@ public class Connection implements Runnable {
listener.receive(objectMessage);
ctx.getInventory().storeObject(objectMessage);
} catch (InsufficientProofOfWorkException e) {
// DebugUtils.saveToFile(objectMessage);
LOG.warn(e.getMessage());
} catch (IOException e) {
LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e);
DebugUtils.saveToFile(objectMessage);
@ -217,5 +220,18 @@ public class Connection implements Runnable {
.build());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Connection that = (Connection) o;
return Objects.equals(node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(node);
}
public enum State {SERVER, CLIENT, ACTIVE, DISCONNECTED}
}

View File

@ -21,6 +21,7 @@ import ch.dissem.bitmessage.InternalContext.ContextHolder;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.NetworkAddress;
import ch.dissem.bitmessage.ports.NetworkHandler;
import ch.dissem.bitmessage.utils.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,8 +88,8 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
}
}
}
if (connections.size() < 1) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(8, ctx.getStreams());
if (connections.size() < 8) {
List<NetworkAddress> addresses = ctx.getNodeRegistry().getKnownAddresses(8 - connections.size(), ctx.getStreams());
for (NetworkAddress address : addresses) {
try {
startConnection(new Connection(ctx, CLIENT, new Socket(address.toInetAddress(), address.getPort()), listener));
@ -96,7 +97,6 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
LOG.debug(e.getMessage(), e);
}
}
// FIXME: prevent connecting twice to the same node
}
try {
Thread.sleep(30000);
@ -131,6 +131,10 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
private void startConnection(Connection c) {
synchronized (connections) {
// prevent connecting twice to the same node
if (connections.contains(c)) {
return;
}
connections.add(c);
}
pool.execute(c);
@ -138,12 +142,10 @@ public class NetworkNode implements NetworkHandler, ContextHolder {
@Override
public void offer(final InventoryVector iv) {
// TODO:
// - should offer to (random) 8 nodes during 8 seconds (if possible)
// - should probably offer later if no connection available at the moment?
synchronized (connections) {
LOG.debug(connections.size() + " connections available to offer " + iv);
for (Connection connection : connections) {
List<Connection> random8 = Collections.selectRandom(8, this.connections);
for (Connection connection : random8) {
connection.offer(iv);
}
}

View File

@ -75,6 +75,15 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito
return find("subscribed = '1'");
}
@Override
public List<BitmessageAddress> getSubscriptions(long broadcastVersion) {
if (broadcastVersion > 4) {
return find("subscribed = '1' AND version > 3");
} else {
return find("subscribed = '1' AND version <= 3");
}
}
@Override
public List<BitmessageAddress> getContacts() {
return find("private_key IS NULL");
@ -155,12 +164,13 @@ public class JdbcAddressRepository extends JdbcHelper implements AddressReposito
private void insert(BitmessageAddress address) throws IOException, SQLException {
try (Connection connection = config.getConnection()) {
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO Address (address, alias, public_key, private_key, subscribed) VALUES (?, ?, ?, ?, ?)");
"INSERT INTO Address (address, version, alias, public_key, private_key, subscribed) VALUES (?, ?, ?, ?, ?, ?)");
ps.setString(1, address.getAddress());
ps.setString(2, address.getAlias());
writePubkey(ps, 3, address.getPubkey());
writeBlob(ps, 4, address.getPrivateKey());
ps.setBoolean(5, address.isSubscribed());
ps.setLong(2, address.getVersion());
ps.setString(3, address.getAlias());
writePubkey(ps, 4, address.getPubkey());
writeBlob(ps, 5, address.getPrivateKey());
ps.setBoolean(6, address.isSubscribed());
ps.executeUpdate();
}
}

View File

@ -1,5 +1,6 @@
CREATE TABLE Address (
address VARCHAR(40) NOT NULL PRIMARY KEY,
version BIGINT NOT NULL,
alias VARCHAR(255),
public_key BLOB,
private_key BLOB,

View File

@ -81,8 +81,26 @@ public class JdbcAddressRepositoryTest {
@Test
public void testGetSubscriptions() throws Exception {
addSubscription("BM-2cXxfcSetKnbHJX2Y85rSkaVpsdNUZ5q9h");
addSubscription("BM-2D9Vc5rFxxR5vTi53T9gkLfemViHRMVLQZ");
addSubscription("BM-2D9QKN4teYRvoq2fyzpiftPh9WP9qggtzh");
List<BitmessageAddress> subscriptions = repo.getSubscriptions();
assertEquals(0, subscriptions.size());
assertEquals(3, subscriptions.size());
}
@Test
public void testGetSubscriptionsForVersion() throws Exception {
addSubscription("BM-2cXxfcSetKnbHJX2Y85rSkaVpsdNUZ5q9h");
addSubscription("BM-2D9Vc5rFxxR5vTi53T9gkLfemViHRMVLQZ");
addSubscription("BM-2D9QKN4teYRvoq2fyzpiftPh9WP9qggtzh");
List<BitmessageAddress> subscriptions;
subscriptions = repo.getSubscriptions(5);
assertEquals(1, subscriptions.size());
subscriptions = repo.getSubscriptions(4);
assertEquals(2, subscriptions.size());
}
@Test
@ -123,4 +141,10 @@ public class JdbcAddressRepositoryTest {
assertNotNull(address);
assertNotNull(address.getPrivateKey());
}
private void addSubscription(String address) {
BitmessageAddress subscription = new BitmessageAddress(address);
subscription.setSubscribed(true);
repo.save(subscription);
}
}