public NetworkDispatcher connectToLocal(SocketAddress address) { NetworkDispatcher dispatch = new NetworkDispatcher(this, NetworkSide.CLIENT); final EventLoopGroup boss = new DefaultEventLoopGroup(); final Bootstrap b = new Bootstrap() .group(boss) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(dispatch); } }) .channel(LocalChannel.class); //Connect and wait until done b.connect(address).syncUninterruptibly(); return dispatch; }
@Test public void disposeLaterDefers() { DefaultPoolResources.Pool pool = new DefaultPoolResources.Pool( new Bootstrap(), (b, handler, checker) -> channelPool, null, new DefaultEventLoopGroup()); DefaultPoolResources poolResources = new DefaultPoolResources("test", (b, handler, checker) -> channelPool); //"register" our fake Pool poolResources.channelPools.put( InetSocketAddress.createUnresolved("localhost", 80), pool); Mono<Void> disposer = poolResources.disposeLater(); assertThat(closed.get()).as("pool closed by disposeLater()").isEqualTo(0); disposer.subscribe(); assertThat(closed.get()).as("pool closed by disposer subscribe()").isEqualTo(1); }
@Test public void disposeOnlyOnce() { DefaultPoolResources.Pool pool = new DefaultPoolResources.Pool( new Bootstrap(), (b, handler, checker) -> channelPool, null, new DefaultEventLoopGroup()); DefaultPoolResources poolResources = new DefaultPoolResources("test", (b, handler, checker) -> channelPool); //"register" our fake Pool poolResources.channelPools.put( InetSocketAddress.createUnresolved("localhost", 80), pool); poolResources.dispose(); assertThat(closed.get()).as("pool closed by dispose()").isEqualTo(1); Mono<Void> disposer = poolResources.disposeLater(); disposer.subscribe(); poolResources.disposeLater().subscribe(); poolResources.dispose(); assertThat(closed.get()).as("pool closed only once").isEqualTo(1); }
@Test public void stressTest() { final EventLoopGroup group = new DefaultEventLoopGroup(1024); final EventLoopScheduler s = new EventLoopScheduler(group); final List<Entry> acquiredEntries = new ArrayList<>(); stressTest(s, acquiredEntries, 0.8); stressTest(s, acquiredEntries, 0.5); stressTest(s, acquiredEntries, 0.2); // Release all acquired entries to make sure activeRequests are all 0. acquiredEntries.forEach(Entry::release); final List<Entry> entries = s.entries(endpoint); for (Entry e : entries) { assertThat(e.activeRequests()).withFailMessage("All entries must have 0 activeRequests.").isZero(); } assertThat(entries.get(0).id()).isZero(); }
@BeforeClass public static void init() { // Configure a test server group = new DefaultEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new ChannelHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // Discard ReferenceCountUtil.release(msg); } }); } }); localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress(); }
@Test public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopIfProvided() throws Exception { EventLoopGroup eventLoop = new DefaultEventLoopGroup(); BoundCluster cluster; MockClient client; try (Server server = Server.builder() .withAddressResolver(localAddressResolver) .withEventLoopGroup(eventLoop, LocalServerChannel.class) .build()) { cluster = server.register(ClusterSpec.builder().withNodes(5)); BoundNode node = cluster.node(0); SocketAddress address = node.getAddress(); client = new MockClient(eventLoop); client.connect(address); } // event loop should not have been closed. assertThat(eventLoop.isShutdown()).isFalse(); // timer should have since a custom one was not provided. try { cluster .getServer() .timer .newTimeout( timeout -> { // noop }, 1, TimeUnit.SECONDS); fail("Expected IllegalStateException"); } catch (IllegalStateException ise) { // expected } eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS); }
@Test public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopAndTimerIfProvided() throws Exception { EventLoopGroup eventLoop = new DefaultEventLoopGroup(); Timer timer = new HashedWheelTimer(); BoundCluster cluster; MockClient client; try (Server server = Server.builder() .withAddressResolver(localAddressResolver) .withTimer(timer) .withEventLoopGroup(eventLoop, LocalServerChannel.class) .build()) { cluster = server.register(ClusterSpec.builder().withNodes(5)); BoundNode node = cluster.node(0); SocketAddress address = node.getAddress(); client = new MockClient(eventLoop); client.connect(address); } // event loop should not have been closed. assertThat(eventLoop.isShutdown()).isFalse(); // timer should not have since a custom one was not provided. cluster .getServer() .timer .newTimeout( timeout -> { // noop }, 1, TimeUnit.SECONDS); eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS); timer.stop(); }
private void init() { LOG.log(Level.SEVERE, "init {0}", this); this.thredpool = Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "hdb-client"); t.setDaemon(true); return t; } }); this.networkGroup = NetworkUtils.isEnableEpoolNative() ? new EpollEventLoopGroup(0, thredpool) : new NioEventLoopGroup(0, thredpool); this.localEventsGroup = new DefaultEventLoopGroup(); String mode = configuration.getString(ClientConfiguration.PROPERTY_MODE, ClientConfiguration.PROPERTY_MODE_LOCAL); switch (mode) { case ClientConfiguration.PROPERTY_MODE_LOCAL: case ClientConfiguration.PROPERTY_MODE_STANDALONE: this.clientSideMetadataProvider = new StaticClientSideMetadataProvider( configuration.getString(ClientConfiguration.PROPERTY_SERVER_ADDRESS, ClientConfiguration.PROPERTY_SERVER_ADDRESS_DEFAULT), configuration.getInt(ClientConfiguration.PROPERTY_SERVER_PORT, ClientConfiguration.PROPERTY_SERVER_PORT_DEFAULT), configuration.getBoolean(ClientConfiguration.PROPERTY_SERVER_SSL, ClientConfiguration.PROPERTY_SERVER_SSL_DEFAULT) ); break; case ClientConfiguration.PROPERTY_MODE_CLUSTER: this.clientSideMetadataProvider = new ZookeeperClientSideMetadataProvider( configuration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS_DEFAULT), configuration.getInt(ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT_DEFAULT), configuration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_PATH, ClientConfiguration.PROPERTY_ZOOKEEPER_PATH_DEFAULT) ); break; default: throw new IllegalStateException(mode); } }
@Test(timeout = 15000) public void testSocketReuse() throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap(); LocalHander serverHandler = new LocalHander("SERVER"); serverBootstrap .group(new DefaultEventLoopGroup(), new DefaultEventLoopGroup()) .channel(LocalServerChannel.class) .childHandler(serverHandler); Bootstrap clientBootstrap = new Bootstrap(); LocalHander clientHandler = new LocalHander("CLIENT"); clientBootstrap .group(new DefaultEventLoopGroup()) .channel(LocalChannel.class) .remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler); serverBootstrap.bind(new LocalAddress(LOCAL_CHANNEL)).sync(); int count = 100; for (int i = 1; i < count + 1; i ++) { Channel ch = clientBootstrap.connect().sync().channel(); // SPIN until we get what we are looking for. int target = i * messageCountPerRun; while (serverHandler.count.get() != target || clientHandler.count.get() != target) { Thread.sleep(50); } close(ch, clientHandler); } assertEquals(count * 2 * messageCountPerRun, serverHandler.count.get() + clientHandler.count.get()); }
@BeforeMethod public void startEventLoop() { eventLoop = new DefaultEventLoopGroup(); }
@Override protected EventLoopGroup group() { return new DefaultEventLoopGroup(); }
@Test public void testConnect() throws ExecutionException { EventLoopGroup group = new DefaultEventLoopGroup(); Connector connector = new Connector(address) { @Override protected List<Map.Entry<String, ChannelHandler>> payloadHandlers() { return Arrays.asList(); } @Override protected EventLoopGroup group() { return group; } @Override protected Class<? extends Channel> channel() { return LocalChannel.class; } }; ListenableFuture<Channel> future = connector.connect(); CountDownLatch done = new CountDownLatch(1); Futures.addCallback( future, new FutureCallback<Channel>() { @Override public void onSuccess(Channel ch) { assertTrue(true); done.countDown(); } @Override public void onFailure(Throwable throwable) { done.countDown(); assertTrue(false); } }); Uninterruptibles.awaitUninterruptibly(done); // block }
@BeforeClass public static void createExecutor() { executor = new DefaultEventLoopGroup(2); }
@BeforeClass public static void createGroup() { group = new DefaultEventLoopGroup(2); }
@Test public void testLocalAddressReuse() throws Exception { for (int i = 0; i < 2; i ++) { EventLoopGroup clientGroup = new DefaultEventLoopGroup(); EventLoopGroup serverGroup = new DefaultEventLoopGroup(); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); cb.group(clientGroup) .channel(LocalChannel.class) .handler(new TestHandler()); sb.group(serverGroup) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new TestHandler()); } }); // Start server Channel sc = sb.bind(addr).sync().channel(); final CountDownLatch latch = new CountDownLatch(1); // Connect to the server final Channel cc = cb.connect(addr).sync().channel(); cc.eventLoop().execute(new Runnable() { @Override public void run() { // Send a message event up the pipeline. cc.pipeline().fireChannelRead("Hello, World"); latch.countDown(); } }); latch.await(); // Close the channel cc.close().sync(); serverGroup.shutdownGracefully(); clientGroup.shutdownGracefully(); sc.closeFuture().sync(); assertNull(String.format( "Expected null, got channel '%s' for local address '%s'", LocalChannelRegistry.get(addr), addr), LocalChannelRegistry.get(addr)); serverGroup.terminationFuture().sync(); clientGroup.terminationFuture().sync(); } }
@Test public void testWriteFailsFastOnClosedChannel() throws Exception { EventLoopGroup clientGroup = new DefaultEventLoopGroup(); EventLoopGroup serverGroup = new DefaultEventLoopGroup(); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); cb.group(clientGroup) .channel(LocalChannel.class) .handler(new TestHandler()); sb.group(serverGroup) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new TestHandler()); } }); // Start server sb.bind(addr).sync(); // Connect to the server final Channel cc = cb.connect(addr).sync().channel(); // Close the channel and write something. cc.close().sync(); try { cc.writeAndFlush(new Object()).sync(); fail("must raise a ClosedChannelException"); } catch (Exception e) { assertThat(e, is(instanceOf(ClosedChannelException.class))); // Ensure that the actual write attempt on a closed channel was never made by asserting that // the ClosedChannelException has been created by AbstractUnsafe rather than transport implementations. if (e.getStackTrace().length > 0) { assertThat( e.getStackTrace()[0].getClassName(), is(AbstractChannel.class.getName() + "$AbstractUnsafe")); e.printStackTrace(); } } serverGroup.shutdownGracefully(); clientGroup.shutdownGracefully(); serverGroup.terminationFuture().sync(); clientGroup.terminationFuture().sync(); }
@Test(timeout = 10000) public void testBindDeadLock() throws Exception { EventLoopGroup groupA = new DefaultEventLoopGroup(1); EventLoopGroup groupB = new DefaultEventLoopGroup(1); try { ChannelHandler dummyHandler = new DummyHandler(); final Bootstrap bootstrapA = new Bootstrap(); bootstrapA.group(groupA); bootstrapA.channel(LocalChannel.class); bootstrapA.handler(dummyHandler); final Bootstrap bootstrapB = new Bootstrap(); bootstrapB.group(groupB); bootstrapB.channel(LocalChannel.class); bootstrapB.handler(dummyHandler); List<Future<?>> bindFutures = new ArrayList<Future<?>>(); // Try to bind from each other. for (int i = 0; i < 1024; i ++) { bindFutures.add(groupA.next().submit(new Runnable() { @Override public void run() { bootstrapB.bind(LocalAddress.ANY); } })); bindFutures.add(groupB.next().submit(new Runnable() { @Override public void run() { bootstrapA.bind(LocalAddress.ANY); } })); } for (Future<?> f: bindFutures) { f.sync(); } } finally { groupA.shutdownGracefully(); groupB.shutdownGracefully(); groupA.terminationFuture().sync(); groupB.terminationFuture().sync(); } }
@Test(timeout = 10000) public void testConnectDeadLock() throws Exception { EventLoopGroup groupA = new DefaultEventLoopGroup(1); EventLoopGroup groupB = new DefaultEventLoopGroup(1); try { ChannelHandler dummyHandler = new DummyHandler(); final Bootstrap bootstrapA = new Bootstrap(); bootstrapA.group(groupA); bootstrapA.channel(LocalChannel.class); bootstrapA.handler(dummyHandler); final Bootstrap bootstrapB = new Bootstrap(); bootstrapB.group(groupB); bootstrapB.channel(LocalChannel.class); bootstrapB.handler(dummyHandler); List<Future<?>> bindFutures = new ArrayList<Future<?>>(); // Try to connect from each other. for (int i = 0; i < 1024; i ++) { bindFutures.add(groupA.next().submit(new Runnable() { @Override public void run() { bootstrapB.connect(LocalAddress.ANY); } })); bindFutures.add(groupB.next().submit(new Runnable() { @Override public void run() { bootstrapA.connect(LocalAddress.ANY); } })); } for (Future<?> f: bindFutures) { f.sync(); } } finally { groupA.shutdownGracefully(); groupB.shutdownGracefully(); groupA.terminationFuture().sync(); groupB.terminationFuture().sync(); } }