private Bootstrap newBootstrap() { Bootstrap boot = new Bootstrap(); boot.channel(NettyPlatformIndependent.channelClass()); boot.option(ChannelOption.TCP_NODELAY, true); // replace by heart beat boot.option(ChannelOption.SO_KEEPALIVE, false); // default is pooled direct // ByteBuf(io.netty.util.internal.PlatformDependent.DIRECT_BUFFER_PREFERRED) boot.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // 32kb(for massive long connections, See // http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points) // 64kb(RocketMq remoting default value) boot.option(ChannelOption.SO_SNDBUF, 32 * 1024); boot.option(ChannelOption.SO_RCVBUF, 32 * 1024); // temporary settings, need more tests boot.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)); //default is true, reduce thread context switching boot.option(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, true); return boot; }
private ServerBootstrap newServerBootstrap() { ServerBootstrap serverBoot = new ServerBootstrap(); serverBoot.channel(NettyPlatformIndependent.serverChannelClass()); // connections wait for accept serverBoot.option(ChannelOption.SO_BACKLOG, 1024); serverBoot.option(ChannelOption.SO_REUSEADDR, true); // replace by heart beat serverBoot.childOption(ChannelOption.SO_KEEPALIVE, false); serverBoot.childOption(ChannelOption.TCP_NODELAY, true); serverBoot.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); serverBoot.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); // temporary settings, need more tests serverBoot.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)); serverBoot.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); //default is true, reduce thread context switching serverBoot.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, true); return serverBoot; }
private Bootstrap bootstrapClient(Endpoint endpoint) { Bootstrap bootstrap = new Bootstrap(); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024)); bootstrap.option(ChannelOption.SO_SNDBUF, 1048576); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000); bootstrap.group(clientGroup); // TODO: Make this faster: // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0 bootstrap.channel(clientChannelClass); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.remoteAddress(endpoint.host(), endpoint.port()); if (enableNettyTls) { bootstrap.handler(new SslClientCommunicationChannelInitializer()); } else { bootstrap.handler(new BasicChannelInitializer()); } return bootstrap; }
private Bootstrap bootstrapClient(Endpoint endpoint) { Bootstrap bootstrap = new Bootstrap(); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024)); bootstrap.option(ChannelOption.SO_SNDBUF, 1048576); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000); bootstrap.group(clientGroup); bootstrap.channel(clientChannelClass); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port()); if (enableNettyTls) { bootstrap.handler(new SslClientCommunicationChannelInitializer()); } else { bootstrap.handler(new BasicChannelInitializer()); } return bootstrap; }
public void start() throws Exception { EventLoopGroup bossGroup = Netty.createEventLoopGroup(); EventLoopGroup workerGroup = Netty.createEventLoopGroup(); this.channel = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(Netty.getServerChannel()) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 10)) .childAttr(PROTOCOL_STATE, ProtocolState.HANDSHAKE) .childHandler(this) .bind(this.address) .sync().channel(); logger.log(Level.INFO, "Listening on {0}", this.channel); }
private void createServerBootstrap(final ProxyConfiguration proxyConfiguration, final Integer... localPorts) { List<Integer> portBindings = singletonList(0); if (localPorts != null && localPorts.length > 0) { portBindings = Arrays.asList(localPorts); } serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .option(ChannelOption.SO_BACKLOG, 1024) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.AUTO_READ, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)) .childHandler(new MockServerUnificationInitializer(MockServer.this, httpStateHandler, proxyConfiguration)) .childAttr(REMOTE_SOCKET, remoteSocket) .childAttr(PROXYING, remoteSocket != null); bindToPorts(portBindings); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { MockServer.super.stop(); } })); }
@Override protected void initOptions(ServerBootstrap b) { super.initOptions(b); b.option(ChannelOption.SO_BACKLOG, 1024); /** * TCP层面的接收和发送缓冲区大小设置, * 在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF, * 需要根据推送消息的大小,合理设置,对于海量长连接,通常32K是个不错的选择。 */ if (snd_buf.connect_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.connect_server); if (rcv_buf.connect_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.connect_server); /** * 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。 * 异步化是提高吞吐量的一个很好的手段。但是,与异步相比,同步有天然的负反馈机制,也就是如果后端慢了,前面也会跟着慢起来,可以自动的调节。 * 但是异步就不同了,异步就像决堤的大坝一样,洪水是畅通无阻。如果这个时候没有进行有效的限流措施就很容易把后端冲垮。 * 如果一下子把后端冲垮倒也不是最坏的情况,就怕把后端冲的要死不活。 * 这个时候,后端就会变得特别缓慢,如果这个时候前面的应用使用了一些无界的资源等,就有可能把自己弄死。 * 那么现在要介绍的这个坑就是关于Netty里的ChannelOutboundBuffer这个东西的。 * 这个buffer是用在netty向channel write数据的时候,有个buffer缓冲,这样可以提高网络的吞吐量(每个channel有一个这样的buffer)。 * 初始大小是32(32个元素,不是指字节),但是如果超过32就会翻倍,一直增长。 * 大部分时候是没有什么问题的,但是在碰到对端非常慢(对端慢指的是对端处理TCP包的速度变慢,比如对端负载特别高的时候就有可能是这个情况)的时候就有问题了, * 这个时候如果还是不断地写数据,这个buffer就会不断地增长,最后就有可能出问题了(我们的情况是开始吃swap,最后进程被linux killer干掉了)。 * 为什么说这个地方是坑呢,因为大部分时候我们往一个channel写数据会判断channel是否active,但是往往忽略了这种慢的情况。 * * 那这个问题怎么解决呢?其实ChannelOutboundBuffer虽然无界,但是可以给它配置一个高水位线和低水位线, * 当buffer的大小超过高水位线的时候对应channel的isWritable就会变成false, * 当buffer的大小低于低水位线的时候,isWritable就会变成true。所以应用应该判断isWritable,如果是false就不要再写数据了。 * 高水位线和低水位线是字节数,默认高水位是64K,低水位是32K,我们可以根据我们的应用需要支持多少连接数和系统资源进行合理规划。 */ b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( connect_server_low, connect_server_high )); }
private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl) throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); Class<? extends ServerChannel> serverChannelClass; if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) { serverChannelClass = EpollServerSocketChannel.class; } else { serverChannelClass = NioServerSocketChannel.class; } bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass); bootstrap.option(ChannelOption.TCP_NODELAY, true); if (scheduledExecutor != null) { bootstrap.handler(scheduledExecutor); } bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl)); bootstrap.childOption(ChannelOption.TCP_NODELAY, true) // setting buffer size can improve I/O .childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576) // recommended in // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.bind(port).sync(); }
@Test public void testOptionsHaveCorrectTypes() throws Exception { final ServerBootstrap bootstrap = new ServerBootstrap(); final ChannelOptions options = new ChannelOptions(); options.setAllocator(new PooledByteBufAllocator()); options.setRecvBufAllocator(new AdaptiveRecvByteBufAllocator()); options.setConnectTimeout(1); options.setWriteSpinCount(1); options.setWriteBufferWaterMark(new WriteBufferWaterMark(8192, 32768)); options.setAllowHalfClosure(true); options.setAutoRead(true); options.setSoBroadcast(true); options.setSoKeepAlive(true); options.setSoReuseAddr(true); options.setSoSndBuf(8192); options.setSoRcvBuf(8192); options.setSoLinger(0); options.setSoBacklog(0); options.setSoTimeout(0); options.setIpTos(0); options.setIpMulticastAddr(getLoopbackAddress()); options.setIpMulticastIf(getNetworkInterfaces().nextElement()); options.setIpMulticastTtl(300); options.setIpMulticastLoopDisabled(true); options.setTcpNodelay(true); final Map<ChannelOption, Object> channelOptionMap = options.get(); for (final Map.Entry<ChannelOption, Object> entry : channelOptionMap.entrySet()) { bootstrap.option(entry.getKey(), entry.getValue()); bootstrap.childOption(entry.getKey(), entry.getValue()); } }
private @NotNull Bootstrap bootstrap(@NotNull TChannel topChannel) { return new Bootstrap() .group(this.childGroup) .channel(useEpoll ? EpollSocketChannel.class : NioSocketChannel.class) .handler(this.channelInitializer(false, topChannel)) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK)) .validate(); }
private @NotNull ServerBootstrap serverBootstrap(@NotNull TChannel topChannel) { return new ServerBootstrap() .group(this.bossGroup, this.childGroup) .channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_BACKLOG, 128) .childHandler(this.channelInitializer(true, topChannel)) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option( ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK) ) .validate(); }
private CompletableFuture<Void> startAcceptingConnections() { CompletableFuture<Void> future = new CompletableFuture<>(); ServerBootstrap b = new ServerBootstrap(); b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)); b.option(ChannelOption.SO_RCVBUF, 1048576); b.childOption(ChannelOption.SO_KEEPALIVE, true); b.childOption(ChannelOption.TCP_NODELAY, true); b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); b.group(serverGroup, clientGroup); b.channel(serverChannelClass); if (enableNettyTls) { b.childHandler(new SslServerCommunicationChannelInitializer()); } else { b.childHandler(new BasicChannelInitializer()); } b.option(ChannelOption.SO_BACKLOG, 128); // Bind and start to accept incoming connections. b.bind(localEndpoint.port()).addListener(f -> { if (f.isSuccess()) { log.info("{} accepting incoming connections on port {}", localEndpoint.host(), localEndpoint.port()); future.complete(null); } else { log.warn("{} failed to bind to port {} due to {}", localEndpoint.host(), localEndpoint.port(), f.cause()); future.completeExceptionally(f.cause()); } }); return future; }
private Bootstrap buildBootstrap() { Bootstrap bootstrap = new Bootstrap(); // TODO(CK): move all of these constants out into Config bootstrap .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option( ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)) .option(ChannelOption.AUTO_READ, true) .group(group()) .channel(channel()); return configure(bootstrap); }
public Bootstrap buildBootstrap(ChannelConfiguration channelConfig) { return new Bootstrap() .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option( ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)) .option(ChannelOption.TCP_NODELAY, true) .group(channelConfig.workerGroup()) .channel(channelConfig.channel()); }
private void startAcceptingConnections() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)); b.option(ChannelOption.SO_RCVBUF, 1048576); b.childOption(ChannelOption.SO_KEEPALIVE, true); b.childOption(ChannelOption.TCP_NODELAY, true); b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); b.group(serverGroup, clientGroup); b.channel(serverChannelClass); if (enableNettyTls) { b.childHandler(new SslServerCommunicationChannelInitializer()); } else { b.childHandler(new BasicChannelInitializer()); } b.option(ChannelOption.SO_BACKLOG, 128); b.childOption(ChannelOption.SO_KEEPALIVE, true); // Bind and start to accept incoming connections. b.bind(localEndpoint.port()).sync().addListener(future -> { if (future.isSuccess()) { log.info("{} accepting incoming connections on port {}", localEndpoint.host(), localEndpoint.port()); } else { log.warn("{} failed to bind to port {} due to {}", localEndpoint.host(), localEndpoint.port(), future.cause()); } }); }
@Override UkcpClientChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
@Override UkcpServerChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
public ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { return (ServerSocketChannelConfig) super.setWriteBufferWaterMark(writeBufferWaterMark); }
@Override public synchronized void start(){ if(this.isOn()) return; try{ try { ServerBootstrap boot = new ServerBootstrap(); if(cfg.isNativeTransport()){ boot.channel(EpollServerSocketChannel.class); }else{ boot.channel(NioServerSocketChannel.class); } boot.group(bossGroup, workerGroup) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)) .childOption(ChannelOption.SO_SNDBUF, cfg.getSendBuffer()) .childOption(ChannelOption.SO_RCVBUF, cfg.getReceiveBuffer()) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(init); //bind to the main port boot.bind(cfg.getPort()).sync(); //bind to the redirect port (e.g. 80 will redirect to 443) for(Integer port : cfg.getRedirectPorts()){ ChannelFuture f = boot.bind(port); f.sync(); } this.on.set(true); String version = BundleUtils.getVersion(Activator.bundle); String name = BundleUtils.getName(Activator.bundle); cfg.print(name + " ready and listening on " + cfg.getPort() + " running version " + version); } finally { } }catch(Exception e){ LOG.log(Level.SEVERE,null, e); } }
public void setWriteBufferWaterMark(final WriteBufferWaterMark mark) { options.put(ChannelOption.WRITE_BUFFER_WATER_MARK, mark); }
public static boolean start(String address, int port, ChannelHandler handler) { boolean isEpoll = Epoll.isAvailable(); int cpuNum = Runtime.getRuntime().availableProcessors(); if (isEpoll) { bossGroup = new EpollEventLoopGroup(cpuNum); workGroup = new EpollEventLoopGroup(cpuNum * 2 + 1); } else { bossGroup = new NioEventLoopGroup(cpuNum); workGroup = new NioEventLoopGroup(cpuNum * 2 + 1); } try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class); bootstrap.childHandler(handler); // TIME_WAIT时可重用端口,服务器关闭后可立即重启,此时任何非期 // 望数据到达,都可能导致服务程序反应混乱,不过这只是一种可能,事实上很不可能 bootstrap.option(ChannelOption.SO_REUSEADDR, true); // 设置了ServerSocket类的SO_RCVBUF选项,就相当于设置了Socket对象的接收缓冲区大小,4KB bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 8); // 请求连接的最大队列长度,如果backlog参数的值大于操作系统限定的队列的最大长度,那么backlog参数无效 bootstrap.option(ChannelOption.SO_BACKLOG, 128); // 使用内存池的缓冲区重用机制 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // 当客户端发生断网或断电等非正常断开的现象,如果服务器没有设置SO_KEEPALIVE选项,则会一直不关闭SOCKET。具体的时间由OS配置 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 在调用close方法后,将阻塞n秒,让未完成发送的数据尽量发出,netty中这部分操作调用方法异步进行。我们的游戏业务没有这种需要,所以设置为0 bootstrap.childOption(ChannelOption.SO_LINGER, 0); // 数据包不缓冲,立即发出 bootstrap.childOption(ChannelOption.TCP_NODELAY, true); // 发送缓冲大小,默认8192 bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 8); WriteBufferWaterMark writeMark = new WriteBufferWaterMark(1024 * 64, 1024 * 128); bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeMark); channelFuture = bootstrap.bind(new InetSocketAddress(address, port)); channelFuture.sync(); Log.info( "Socket service start success address : " + address + ", port :" + port + ", isEpoll = " + isEpoll); return true; } catch (Exception e) { Log.error("init netty exception address = " + address + ", port = " + port, e); return false; } }
public static boolean start(String address, int port, ChannelHandler handler) { boolean isEpoll = Epoll.isAvailable(); int cpuNum = Runtime.getRuntime().availableProcessors(); if (isEpoll) { bossGroup = new EpollEventLoopGroup(cpuNum); workGroup = new EpollEventLoopGroup(cpuNum * 2 + 1); } else { bossGroup = new NioEventLoopGroup(cpuNum); workGroup = new NioEventLoopGroup(cpuNum * 2 + 1); } try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class); bootstrap.childHandler(handler); // TIME_WAIT时可重用端口,服务器关闭后可立即重启,此时任何非期 // 望数据到达,都可能导致服务程序反应混乱,不过这只是一种可能,事实上很不可能 bootstrap.option(ChannelOption.SO_REUSEADDR, true); // 设置了ServerSocket类的SO_RCVBUF选项,就相当于设置了Socket对象的接收缓冲区大小,4KB bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 8); // 请求连接的最大队列长度,如果backlog参数的值大于操作系统限定的队列的最大长度,那么backlog参数无效 bootstrap.option(ChannelOption.SO_BACKLOG, 128); // 使用内存池的缓冲区重用机制 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // 当客户端发生断网或断电等非正常断开的现象,如果服务器没有设置SO_KEEPALIVE选项,则会一直不关闭SOCKET。具体的时间由OS配置 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 在调用close方法后,将阻塞n秒,让未完成发送的数据尽量发出,netty中这部分操作调用方法异步进行。我们的游戏业务没有这种需要,所以设置为0 bootstrap.childOption(ChannelOption.SO_LINGER, 0); // 数据包不缓冲,立即发出 bootstrap.childOption(ChannelOption.TCP_NODELAY, true); // 发送缓冲大小,默认8192 bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 8); bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 64, 1024 * 128)); channelFuture = bootstrap.bind(new InetSocketAddress(address, port)); channelFuture.sync(); Log.info("Socket service start success address : " + address + ", port :" + port + ", isEpoll = " + isEpoll); return true; } catch (Exception e) { Log.error("init netty exception address = " + address + ", port = " + port, e); return false; } }
@Test(timeout = 60000) public void childChannelOptions() throws Exception { final int originalLowWaterMark = 2097169; final int originalHighWaterMark = 2097211; Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>(); channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark)); final AtomicInteger lowWaterMark = new AtomicInteger(0); final AtomicInteger highWaterMark = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(1); InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, NioServerSocketChannel.class, channelOptions, null, // no boss group null, // no event group new ProtocolNegotiators.PlaintextNegotiator(), Collections.<ServerStreamTracer.Factory>emptyList(), TransportTracer.getDefaultFactory(), 1, // ignore 1, // ignore 1, // ignore 1, // ignore 1, // ignore 1, 1, // ignore 1, 1, // ignore true, 0); // ignore ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { Channel channel = ((NettyServerTransport)transport).channel(); WriteBufferWaterMark writeBufferWaterMark = channel.config() .getOption(ChannelOption.WRITE_BUFFER_WATER_MARK); lowWaterMark.set(writeBufferWaterMark.low()); highWaterMark.set(writeBufferWaterMark.high()); countDownLatch.countDown(); return null; } @Override public void serverShutdown() {} }); Socket socket = new Socket(); socket.connect(new InetSocketAddress("localhost", ns.getPort()), /* timeout= */ 8000); countDownLatch.await(); socket.close(); assertThat(lowWaterMark.get()).isEqualTo(originalLowWaterMark); assertThat(highWaterMark.get()).isEqualTo(originalHighWaterMark); ns.shutdown(); }