protected InboundMessageQueuer connect(Peer peer, VersionMessage versionMessage) throws Exception { checkArgument(versionMessage.hasBlockChain()); final AtomicBoolean doneConnecting = new AtomicBoolean(false); final Thread thisThread = Thread.currentThread(); peer.addDisconnectedEventListener(new PeerDisconnectedEventListener() { @Override public void onPeerDisconnected(Peer p, int peerCount) { synchronized (doneConnecting) { if (!doneConnecting.get()) thisThread.interrupt(); } } }); if (clientType == ClientType.NIO_CLIENT_MANAGER || clientType == ClientType.BLOCKING_CLIENT_MANAGER) channels.openConnection(new InetSocketAddress("127.0.0.1", 2000), peer); else if (clientType == ClientType.NIO_CLIENT) new NioClient(new InetSocketAddress("127.0.0.1", 2000), peer, 100); else if (clientType == ClientType.BLOCKING_CLIENT) new BlockingClient(new InetSocketAddress("127.0.0.1", 2000), peer, 100, SocketFactory.getDefault(), null); else throw new RuntimeException(); // Claim we are connected to a different IP that what we really are, so tx confidence broadcastBy sets work InboundMessageQueuer writeTarget = newPeerWriteTargetQueue.take(); writeTarget.peer = peer; // Complete handshake with the peer - send/receive version(ack)s, receive bloom filter checkState(!peer.getVersionHandshakeFuture().isDone()); writeTarget.sendMessage(versionMessage); writeTarget.sendMessage(new VersionAck()); try { checkState(writeTarget.nextMessageBlocking() instanceof VersionMessage); checkState(writeTarget.nextMessageBlocking() instanceof VersionAck); peer.getVersionHandshakeFuture().get(); synchronized (doneConnecting) { doneConnecting.set(true); } Thread.interrupted(); // Clear interrupted bit in case it was set before we got into the CS } catch (InterruptedException e) { // We were disconnected before we got back version/verack } return writeTarget; }