Finally fixed the bug that was haunting me for the last week.

This commit is contained in:
Christian Basler 2016-01-21 20:32:23 +01:00
parent 733335ef42
commit 9f05af8bb7
11 changed files with 89 additions and 39 deletions

View File

@ -157,6 +157,7 @@ public class BitmessageContext {
.from(from) .from(from)
.to(to) .to(to)
.message(subject, message) .message(subject, message)
.labels(messages().getLabels(Label.Type.SENT))
.build(); .build();
if (to.getPubkey() == null) { if (to.getPubkey() == null) {
tryToFindMatchingPubkey(to); tryToFindMatchingPubkey(to);

View File

@ -90,28 +90,32 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
address = ctx.getAddressRepository().findContact(pubkey.getRipe()); address = ctx.getAddressRepository().findContact(pubkey.getRipe());
} }
if (address != null) { if (address != null) {
address.setPubkey(pubkey); updatePubkey(address, pubkey);
LOG.info("Got pubkey for contact " + address);
ctx.getAddressRepository().save(address);
List<Plaintext> messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address);
LOG.info("Sending " + messages.size() + " messages for contact " + address);
for (Plaintext msg : messages) {
msg.setStatus(DOING_PROOF_OF_WORK);
ctx.getMessageRepository().save(msg);
ctx.send(
msg.getFrom(),
msg.getTo(),
new Msg(msg),
+2 * DAY
);
msg.setStatus(SENT);
ctx.getMessageRepository().save(msg);
}
} }
} catch (DecryptionFailedException ignore) { } catch (DecryptionFailedException ignore) {
} }
} }
private void updatePubkey(BitmessageAddress address, Pubkey pubkey){
address.setPubkey(pubkey);
LOG.info("Got pubkey for contact " + address);
ctx.getAddressRepository().save(address);
List<Plaintext> messages = ctx.getMessageRepository().findMessages(Plaintext.Status.PUBKEY_REQUESTED, address);
LOG.info("Sending " + messages.size() + " messages for contact " + address);
for (Plaintext msg : messages) {
msg.setStatus(DOING_PROOF_OF_WORK);
ctx.getMessageRepository().save(msg);
ctx.send(
msg.getFrom(),
msg.getTo(),
new Msg(msg),
+2 * DAY
);
msg.setStatus(SENT);
ctx.getMessageRepository().save(msg);
}
}
protected void receive(ObjectMessage object, Msg msg) throws IOException { protected void receive(ObjectMessage object, Msg msg) throws IOException {
for (BitmessageAddress identity : ctx.getAddressRepository().getIdentities()) { for (BitmessageAddress identity : ctx.getAddressRepository().getIdentities()) {
try { try {
@ -125,6 +129,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
msg.getPlaintext().setInventoryVector(object.getInventoryVector()); msg.getPlaintext().setInventoryVector(object.getInventoryVector());
ctx.getMessageRepository().save(msg.getPlaintext()); ctx.getMessageRepository().save(msg.getPlaintext());
listener.receive(msg.getPlaintext()); listener.receive(msg.getPlaintext());
updatePubkey(msg.getPlaintext().getFrom(), msg.getPlaintext().getFrom().getPubkey());
} }
break; break;
} catch (DecryptionFailedException ignore) { } catch (DecryptionFailedException ignore) {
@ -148,6 +153,7 @@ class DefaultMessageListener implements NetworkHandler.MessageListener {
broadcast.getPlaintext().setInventoryVector(object.getInventoryVector()); broadcast.getPlaintext().setInventoryVector(object.getInventoryVector());
ctx.getMessageRepository().save(broadcast.getPlaintext()); ctx.getMessageRepository().save(broadcast.getPlaintext());
listener.receive(broadcast.getPlaintext()); listener.receive(broadcast.getPlaintext());
updatePubkey(broadcast.getPlaintext().getFrom(), broadcast.getPlaintext().getFrom().getPubkey());
} }
} catch (DecryptionFailedException ignore) { } catch (DecryptionFailedException ignore) {
} }

View File

@ -44,10 +44,10 @@ public class Inv implements MessagePayload {
} }
@Override @Override
public void write(OutputStream stream) throws IOException { public void write(OutputStream out) throws IOException {
Encode.varInt(inventory.size(), stream); Encode.varInt(inventory.size(), out);
for (InventoryVector iv : inventory) { for (InventoryVector iv : inventory) {
iv.write(stream); iv.write(out);
} }
} }

View File

@ -38,7 +38,6 @@ public class InventoryVector implements Streamable, Serializable {
InventoryVector that = (InventoryVector) o; InventoryVector that = (InventoryVector) o;
return Arrays.equals(hash, that.hash); return Arrays.equals(hash, that.hash);
} }
@Override @Override

View File

@ -44,6 +44,9 @@ class V3MessageFactory {
findMagic(in); findMagic(in);
String command = getCommand(in); String command = getCommand(in);
int length = (int) Decode.uint32(in); int length = (int) Decode.uint32(in);
if (length > 1600003) {
throw new NodeException("Payload of " + length + " bytes received, no more than 1600003 was expected.");
}
byte[] checksum = Decode.bytes(in, 4); byte[] checksum = Decode.bytes(in, 4);
byte[] payloadBytes = Decode.bytes(in, length); byte[] payloadBytes = Decode.bytes(in, length);
@ -191,10 +194,10 @@ class V3MessageFactory {
private static String getCommand(InputStream stream) throws IOException { private static String getCommand(InputStream stream) throws IOException {
byte[] bytes = new byte[12]; byte[] bytes = new byte[12];
int end = -1; int end = bytes.length;
for (int i = 0; i < bytes.length; i++) { for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) stream.read(); bytes[i] = (byte) stream.read();
if (end == -1) { if (end == bytes.length) {
if (bytes[i] == 0) end = i; if (bytes[i] == 0) end = i;
} else { } else {
if (bytes[i] != 0) throw new IOException("'\\0' padding expected for command"); if (bytes[i] != 0) throw new IOException("'\\0' padding expected for command");

View File

@ -17,6 +17,7 @@
package ch.dissem.bitmessage.entity; package ch.dissem.bitmessage.entity;
import ch.dissem.bitmessage.entity.payload.*; import ch.dissem.bitmessage.entity.payload.*;
import ch.dissem.bitmessage.entity.valueobject.InventoryVector;
import ch.dissem.bitmessage.entity.valueobject.Label; import ch.dissem.bitmessage.entity.valueobject.Label;
import ch.dissem.bitmessage.factory.Factory; import ch.dissem.bitmessage.factory.Factory;
import ch.dissem.bitmessage.utils.TestBase; import ch.dissem.bitmessage.utils.TestBase;
@ -25,9 +26,11 @@ import org.junit.Test;
import java.io.*; import java.io.*;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG; import static ch.dissem.bitmessage.entity.Plaintext.Type.MSG;
import static ch.dissem.bitmessage.utils.Singleton.security;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class SerializationTest extends TestBase { public class SerializationTest extends TestBase {
@ -95,6 +98,23 @@ public class SerializationTest extends TestBase {
assertEquals(p1, p2); assertEquals(p1, p2);
} }
@Test
public void ensureNetworkMessageIsSerializedAndDeserializedCorrectly() throws Exception {
ArrayList<InventoryVector> ivs = new ArrayList<>(50000);
for (int i = 0; i < 50000; i++) {
ivs.add(new InventoryVector(security().randomBytes(32)));
}
Inv inv = new Inv.Builder().inventory(ivs).build();
NetworkMessage before = new NetworkMessage(inv);
ByteArrayOutputStream out = new ByteArrayOutputStream();
before.write(out);
NetworkMessage after = Factory.getNetworkMessage(3, new ByteArrayInputStream(out.toByteArray()));
Inv invAfter = (Inv) after.getPayload();
assertEquals(ivs, invAfter.getInventory());
}
private void doTest(String resourceName, int version, Class<?> expectedPayloadType) throws IOException { private void doTest(String resourceName, int version, Class<?> expectedPayloadType) throws IOException {
byte[] data = TestUtils.getBytes(resourceName); byte[] data = TestUtils.getBytes(resourceName);
InputStream in = new ByteArrayInputStream(data); InputStream in = new ByteArrayInputStream(data);

View File

@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.util.List; import java.util.List;
import java.util.Scanner; import java.util.Scanner;
@ -40,7 +41,7 @@ public class Application {
private BitmessageContext ctx; private BitmessageContext ctx;
public Application() { public Application(String syncServer, int syncPort) {
JdbcConfig jdbcConfig = new JdbcConfig(); JdbcConfig jdbcConfig = new JdbcConfig();
ctx = new BitmessageContext.Builder() ctx = new BitmessageContext.Builder()
.addressRepo(new JdbcAddressRepository(jdbcConfig)) .addressRepo(new JdbcAddressRepository(jdbcConfig))
@ -63,7 +64,9 @@ public class Application {
}) })
.build(); .build();
ctx.startup(); if (syncServer == null) {
ctx.startup();
}
scanner = new Scanner(System.in); scanner = new Scanner(System.in);
@ -75,6 +78,9 @@ public class Application {
System.out.println("c) contacts"); System.out.println("c) contacts");
System.out.println("s) subscriptions"); System.out.println("s) subscriptions");
System.out.println("m) messages"); System.out.println("m) messages");
if (syncServer != null) {
System.out.println("y) sync");
}
System.out.println("?) info"); System.out.println("?) info");
System.out.println("e) exit"); System.out.println("e) exit");
@ -99,6 +105,9 @@ public class Application {
break; break;
case "e": case "e":
break; break;
case "y":
ctx.synchronize(InetAddress.getByName(syncServer), syncPort, 120, true);
break;
default: default:
System.out.println("Unknown command. Please try again."); System.out.println("Unknown command. Please try again.");
} }

View File

@ -64,7 +64,7 @@ public class Main {
new WifImporter(ctx, options.importWIF).importAll(); new WifImporter(ctx, options.importWIF).importAll();
} }
} else { } else {
new Application(); new Application(options.syncServer, options.syncPort);
} }
} }
@ -74,5 +74,11 @@ public class Main {
@Option(name = "-export", usage = "Export to WIF file.") @Option(name = "-export", usage = "Export to WIF file.")
private File exportWIF; private File exportWIF;
@Option(name = "-syncServer", usage = "Use manual synchronization with the given server instead of starting a full node.")
private String syncServer;
@Option(name = "-syncPort", usage = "Port to use for synchronisation")
private int syncPort = 8444;
} }
} }

View File

@ -139,13 +139,13 @@ class Connection {
@SuppressWarnings("RedundantIfStatement") @SuppressWarnings("RedundantIfStatement")
private boolean syncFinished(NetworkMessage msg) { private boolean syncFinished(NetworkMessage msg) {
if (mode != SYNC){ if (mode != SYNC) {
return false; return false;
} }
if (Thread.interrupted()) { if (Thread.interrupted()) {
return true; return true;
} }
if (syncTimeout == 0 || state != ACTIVE) { if (state != ACTIVE) {
return false; return false;
} }
if (syncTimeout < UnixTime.now()) { if (syncTimeout < UnixTime.now()) {
@ -204,10 +204,11 @@ class Connection {
switch (messagePayload.getCommand()) { switch (messagePayload.getCommand()) {
case INV: case INV:
Inv inv = (Inv) messagePayload; Inv inv = (Inv) messagePayload;
int originalSize = inv.getInventory().size();
updateIvCache(inv.getInventory()); updateIvCache(inv.getInventory());
List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams); List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams);
missing.removeAll(commonRequestedObjects); missing.removeAll(commonRequestedObjects);
LOG.debug("Received inventory with " + inv.getInventory().size() + " elements, of which are " LOG.debug("Received inventory with " + originalSize + " elements, of which are "
+ missing.size() + " missing."); + missing.size() + " missing.");
send(new GetData.Builder().inventory(missing).build()); send(new GetData.Builder().inventory(missing).build());
break; break;
@ -230,8 +231,6 @@ class Connection {
security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); security().checkProofOfWork(objectMessage, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes());
ctx.getInventory().storeObject(objectMessage); ctx.getInventory().storeObject(objectMessage);
// offer object to some random nodes so it gets distributed throughout the network: // offer object to some random nodes so it gets distributed throughout the network:
// FIXME: don't do this while we catch up after initialising our first connection
// (that might be a bit tricky to do)
networkHandler.offer(objectMessage.getInventoryVector()); networkHandler.offer(objectMessage.getInventoryVector());
lastObjectTime = UnixTime.now(); lastObjectTime = UnixTime.now();
} catch (InsufficientProofOfWorkException e) { } catch (InsufficientProofOfWorkException e) {
@ -283,7 +282,9 @@ class Connection {
if (payload instanceof GetData) { if (payload instanceof GetData) {
requestedObjects.addAll(((GetData) payload).getInventory()); requestedObjects.addAll(((GetData) payload).getInventory());
} }
new NetworkMessage(payload).write(out); synchronized (this) {
new NetworkMessage(payload).write(out);
}
} catch (IOException e) { } catch (IOException e) {
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
disconnect(); disconnect();
@ -342,16 +343,19 @@ class Connection {
public class ReaderRunnable implements Runnable { public class ReaderRunnable implements Runnable {
@Override @Override
public void run() { public void run() {
lastObjectTime = 0;
try (Socket socket = Connection.this.socket) { try (Socket socket = Connection.this.socket) {
initSocket(socket); initSocket(socket);
if (mode == CLIENT || mode == SYNC) { if (mode == CLIENT || mode == SYNC) {
send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build()); send(new Version.Builder().defaults().addrFrom(host).addrRecv(node).build());
} }
while (state != DISCONNECTED) { while (state != DISCONNECTED) {
if (mode != SYNC && state == ACTIVE && requestedObjects.isEmpty()) { if (mode != SYNC) {
Thread.sleep(1000); if (state == ACTIVE && requestedObjects.isEmpty() && sendingQueue.isEmpty()) {
} else { Thread.sleep(1000);
Thread.sleep(100); } else {
Thread.sleep(100);
}
} }
try { try {
NetworkMessage msg = Factory.getNetworkMessage(version, in); NetworkMessage msg = Factory.getNetworkMessage(version, in);

View File

@ -117,7 +117,8 @@ public class NetworkHandlerTest {
); );
nodeInventory.init( nodeInventory.init(
"V1Msg.payload" "V1Msg.payload",
"V4Pubkey.payload"
); );
Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, Future<?> future = networkHandler.synchronize(InetAddress.getLocalHost(), 6001,

View File

@ -256,12 +256,13 @@ public class JdbcMessageRepository extends JdbcHelper implements MessageReposito
private void update(Connection connection, Plaintext message) throws SQLException, IOException { private void update(Connection connection, Plaintext message) throws SQLException, IOException {
PreparedStatement ps = connection.prepareStatement( PreparedStatement ps = connection.prepareStatement(
"UPDATE Message SET iv=?, sent=?, received=?, status=? WHERE id=?"); "UPDATE Message SET iv=?, sent=?, received=?, status=?, initial_hash=? WHERE id=?");
ps.setBytes(1, message.getInventoryVector() != null ? message.getInventoryVector().getHash() : null); ps.setBytes(1, message.getInventoryVector() != null ? message.getInventoryVector().getHash() : null);
ps.setLong(2, message.getSent()); ps.setLong(2, message.getSent());
ps.setLong(3, message.getReceived()); ps.setLong(3, message.getReceived());
ps.setString(4, message.getStatus() != null ? message.getStatus().name() : null); ps.setString(4, message.getStatus() != null ? message.getStatus().name() : null);
ps.setLong(5, (Long) message.getId()); ps.setBytes(5, message.getInitialHash());
ps.setLong(6, (Long) message.getId());
ps.executeUpdate(); ps.executeUpdate();
} }