private SimpleChannelInboundHandler<DatagramPacket> createListenerHandler(SeedNode thisNode, ByteBuf seedNodeInfo) { return new SimpleChannelInboundHandler<DatagramPacket>() { @Override public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { ByteBuf buf = msg.content(); if (buf.readableBytes() > 4 && buf.readInt() == Utils.MAGIC_BYTES) { MessageTYpe msgType = MessageTYpe.values()[buf.readByte()]; if (msgType == MessageTYpe.DISCOVERY) { String cluster = decodeUtf(buf); InetSocketAddress address = decodeAddress(buf); if (thisNode.cluster().equals(cluster) && !address.equals(thisNode.address())) { onDiscoveryMessage(address); DatagramPacket response = new DatagramPacket(seedNodeInfo.copy(), msg.sender()); ctx.writeAndFlush(response); } } } } }; }
public static Bootstrap createNettyHttpClientBootstrap() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new HttpClientCodec()); p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); p.addLast("clientResponseHandler", new SimpleChannelInboundHandler<HttpObject>() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { throw new RuntimeException("Client response handler was not setup before the call"); } }); } }); return bootstrap; }
public static CompletableFuture<NettyHttpClientResponse> setupNettyHttpClientResponseHandler( Channel ch, Consumer<ChannelPipeline> pipelineAdjuster ) { CompletableFuture<NettyHttpClientResponse> responseFromServerFuture = new CompletableFuture<>(); ch.pipeline().replace("clientResponseHandler", "clientResponseHandler", new SimpleChannelInboundHandler<HttpObject>() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof FullHttpResponse) { // Store the proxyServer response for asserting on later. responseFromServerFuture.complete(new NettyHttpClientResponse((FullHttpResponse) msg)); } else { // Should never happen. throw new RuntimeException("Received unexpected message type: " + msg.getClass()); } } }); if (pipelineAdjuster != null) pipelineAdjuster.accept(ch.pipeline()); return responseFromServerFuture; }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (log.isDebugEnabled()) { log.error("Unhandled exception caught within the pipeline {} for Channel {}, Id: {}", cause, ctx.channel(), ctx.channel().id()); if (ctx.channel().hasAttr(ChannelAttributes.LAST_REQUEST_SENT)) { AbstractRequest request = ctx.channel().attr(ChannelAttributes.LAST_REQUEST_SENT).get(); if (request != null && SocketChannel.class.isAssignableFrom(ctx.channel().getClass())) { Throwable ex = new ResponseException(request, cause); SimpleChannelInboundHandler responseRouter = ctx.pipeline().get(SimpleChannelInboundHandler.class); responseRouter.channelRead(ctx, ex); return; } } throw new TransportException(cause); } }
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(true); SslHandler sslHandler = new SslHandler(engine); //pipeline.addLast(sslHandler); pipeline.addLast(new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); //pipeline.addLast(new HttpRequestDecoder()); //pipeline.addLast(new HttpResponseEncoder()); //pipeline.addLast(new HttpContentCompressor()); //pipeline.addLast(new HTTPClientHandler()); }
@Bean(name = "channelInitializer") public ChannelInitializer<SocketChannel> initializerFactory(final ApplicationContext contxt) { return new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { SimpleChannelInboundHandler<?> channelInboundHandler = contxt.getBean(NettyHttpChannelHandler.class); ChannelPipeline pipeline = ch.pipeline(); // HTTP pipeline.addLast(new HttpRequestDecoder()); pipeline.addLast(new HttpResponseEncoder()); pipeline.addLast(new HttpContentCompressor()); pipeline.addLast(new ChunkedWriteHandler()); // 设置处理的Handler pipeline.addLast(channelInboundHandler); } }; }
public SimpleLineBasedSerialChannel(String port, final SimpleStringChannelHandler stringHandler) { group = new OioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(JsscChannel.class) .handler(new ChannelInitializer<JsscChannel>() { @Override public void initChannel(JsscChannel ch) throws Exception { ch.pipeline().addLast( new LineBasedFrameDecoder(Integer.MAX_VALUE), new StringDecoder(), new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, String msg) throws Exception { stringHandler.channelRead(ctx, msg); } } ); } }); f = b.connect(new JsscDeviceAddress(port)).syncUninterruptibly(); }
private ServerBootstrap createTCPListeningPoint(final SimpleChannelInboundHandler<SipMessageEvent> handler) { final ServerBootstrap b = new ServerBootstrap(); b.group(this.bossGroup, this.workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(final SocketChannel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new SipMessageStreamDecoder()); pipeline.addLast("encoder", new SipMessageEncoder()); pipeline.addLast("handler", handler); } }) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true); return b; }
/** * Returns a new channel initializer suited to encode and decode a protocol * buffer message. * <p/> * <p>Message sizes over 10 MB are not supported.</p> * <p/> * <p>The handler will be executed on the I/O thread. Blocking operations * should be executed in their own thread.</p> * * @param defaultInstance an instance of the message to handle * @param handler the handler implementing the application logic * @param <M> the type of the support protocol buffer message */ public static final <M extends Message> ChannelInitializer<Channel> protoBuf( final M defaultInstance, final SimpleChannelInboundHandler<M> handler) { return new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4)); channel.pipeline().addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); channel.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4)); channel.pipeline().addLast("protobufEncoder", new ProtobufEncoder()); channel.pipeline().addLast("applicationHandler", handler); } }; }
/** * Returns a server-side channel initializer capable of securely receiving * and sending HTTP requests and responses * <p/> * <p>Communications will be encrypted as per the configured SSL context</p> * * @param handler the handler implementing the business logic. * @param sslContext the SSL context which drives the security of the * link to the client. */ public static final ChannelInitializer<Channel> secureHttpServer( final SimpleChannelInboundHandler<HttpRequest> handler, final SSLContext sslContext) { return new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); SSLEngine sslEngine = sslContext.createSSLEngine(); sslEngine.setUseClientMode(false); pipeline.addLast("ssl", new SslHandler(sslEngine)); pipeline.addLast("httpCodec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024)); pipeline.addLast("httpServerHandler", handler); } }; }
/** * Returns a client-side channel initializer capable of securely sending * and receiving HTTP requests and responses. * <p/> * <p>Communications will be encrypted as per the configured SSL context</p> * * @param handler the handler in charge of implementing the business logic * @param sslContext the SSL context which drives the security of the * link to the server. */ public static final ChannelInitializer<Channel> secureHttpClient( final SimpleChannelInboundHandler<HttpResponse> handler, final SSLContext sslContext) { return new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); SSLEngine sslEngine = sslContext.createSSLEngine(); sslEngine.setUseClientMode(true); pipeline.addLast("ssl", new SslHandler(sslEngine)); pipeline.addLast("httpCodec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024)); pipeline.addLast("httpClientHandler", handler); } }; }
@Override protected void initChannel(Channel channel) throws Exception { InetSocketAddress address = (InetSocketAddress) channel.remoteAddress(); Address clientAddress = new Address(address.getHostName(), address.getPort()); channel.pipeline().addLast( master.proxyHandler(clientAddress), new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { LOGGER.info("[Client ({})] => Unhandled inbound: {}", clientAddress, o); } }); }
public EpollConnDroppingServer(final int port, final int dropEveryRequest) throws InterruptedException { dispatchGroup = new EpollEventLoopGroup(); workerGroup = new EpollEventLoopGroup(); final ServerBootstrap bootstrap = new ServerBootstrap() .group(dispatchGroup, workerGroup) .channel(EpollServerSocketChannel.class) .childHandler( new ChannelInitializer<SocketChannel>() { @Override public final void initChannel(final SocketChannel ch) { if(dropEveryRequest > 0) { ch.pipeline().addLast( new SimpleChannelInboundHandler<Object>() { @Override protected final void channelRead0( final ChannelHandlerContext ctx, final Object msg ) throws Exception { if(0 == reqCounter.incrementAndGet() % dropEveryRequest) { final Channel conn = ctx.channel(); System.out.println("Dropping the connection " + conn); conn.close(); } } } ); } } } ); bindFuture = bootstrap.bind(port).sync(); }
public NioConnDroppingServer(final int port, final int dropEveryRequest) throws InterruptedException { dispatchGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); final ServerBootstrap bootstrap = new ServerBootstrap() .group(dispatchGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler( new ChannelInitializer<SocketChannel>() { @Override public final void initChannel(final SocketChannel ch) { ch.pipeline().addLast( new SimpleChannelInboundHandler<Object>() { @Override protected final void channelRead0( final ChannelHandlerContext ctx, final Object msg ) throws Exception { if(0 == reqCounter.incrementAndGet() % dropEveryRequest) { final Channel conn = ctx.channel(); System.out.println("Dropping the connection " + conn); conn.close(); } } } ); } } ); bindFuture = bootstrap.bind(port).sync(); }
protected FullHttpResponse executeHttpRequest(final FullHttpRequest request) throws InterruptedException, ConnectException { ThreadContext.put(KEY_TEST_STEP_ID, stepId); ThreadContext.put(KEY_CLASS_NAME, CLS_NAME); final Channel channel = getUnpooledConnection(); try { final ChannelPipeline pipeline = channel.pipeline(); Loggers.MSG.debug( "{}: execute the HTTP request using the channel {} w/ pipeline: {}", stepId, channel.hashCode(), pipeline ); pipeline.removeLast(); // remove the API specific handler final SynchronousQueue<FullHttpResponse> fullRespSync = new SynchronousQueue<>(); pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); pipeline.addLast( new SimpleChannelInboundHandler<HttpObject>() { @Override protected final void channelRead0( final ChannelHandlerContext ctx, final HttpObject msg ) throws Exception { if(msg instanceof FullHttpResponse) { fullRespSync.put(((FullHttpResponse) msg).retain()); } } } ); channel.writeAndFlush(request).sync(); return fullRespSync.take(); } finally { channel.close(); } }
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){ return new Bootstrap().group(loopGroup) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(handler) .bind(port); }
public static void newHttpServerBootstrap(String ip, int port, SimpleChannelInboundHandler<? extends FullHttpRequest> handler){ ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("decoder", new HttpRequestDecoder()); p.addLast("aggregator", new HttpObjectAggregator(65536)); p.addLast("encoder", new HttpResponseEncoder()); p.addLast("chunkedWriter", new ChunkedWriteHandler()); p.addLast("handler", handler ); } }; newHttpServerBootstrap(ip, port, channelInitializer); }
public UDPConsumingServer( boolean enableEpoll, int numThreads, List<InetSocketAddress> addresses, SimpleChannelInboundHandler<DatagramPacket> handler ) { super( enableEpoll, numThreads, addresses ); this.handler = handler; }
private SimpleChannelInboundHandler<ChicagoMessage> newReader() { return new SimpleChannelInboundHandler<ChicagoMessage>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ChicagoMessage msg) throws Exception { Promise<WriteResult> result = resultMap.get(msg.id); if (result != null) { System.out.println("Got result for id " + msg.id); result.setSuccess(new WriteResult()); } else { System.out.println("Couldn't find result for id " + msg.id); } } }; }
@Test public void testSslRequest() throws InterruptedException { CountDownLatch receivedResponse = new CountDownLatch(2); final ConcurrentLinkedQueue<HttpObject> responses = new ConcurrentLinkedQueue<>(); ChannelHandler responseHandler = new SimpleChannelInboundHandler<HttpObject>() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { responses.add(msg); receivedResponse.countDown(); } }; ClientConfig config = ClientConfig.fromConfig("xio.h1TestClient"); XioClientBootstrap bootstrap = new XioClientBootstrap(config) .channelConfig(ChannelConfiguration.clientConfig(1)) .handler(responseHandler); HttpClientBuilder builder = new HttpClientBuilder(bootstrap); URL url = server.url("/hello-world").url(); HttpClient client = builder.endpointForUrl(url).build(); client.write(Http.get("/hello-world")); Uninterruptibles.awaitUninterruptibly(receivedResponse); // block // check request RecordedRequest request1 = server.takeRequest(); assertEquals("/hello-world", request1.getPath()); // check response assertEquals(HttpResponseStatus.OK, ((HttpResponse) responses.poll()).status()); }
@SideOnly(Side.CLIENT) public void setClientHandler(SimpleChannelInboundHandler<T> handler) { FMLEmbeddedChannel channel = channels.get(Side.CLIENT); String codecName = channel.findChannelHandlerNameForType(codec.getClass()); if(handlers.get(Side.CLIENT) != null) { channel.pipeline().remove("ClientHandler"); } channel.pipeline().addAfter(codecName, "ClientHandler", handler); handlers.put(Side.CLIENT, handler); }
public void setServerHandler(SimpleChannelInboundHandler<T> handler) { FMLEmbeddedChannel channel = channels.get(Side.SERVER); String codecName = channel.findChannelHandlerNameForType(codec.getClass()); if(handlers.get(Side.SERVER) != null) { channel.pipeline().remove("ServerHandler"); } channel.pipeline().addAfter(codecName, "ServerHandler", handler); handlers.put(Side.SERVER, handler); }
public ListenableFuture<CommandReply> sendRequest(CommandRpcRequest<?> request, InetSocketAddress remoteAddress) { SettableFuture<CommandReply> replyMessageFuture = SettableFuture.create(); ChannelFuture connectFuture = client.connect(remoteAddress); connectFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { future.channel().pipeline().addLast(new SimpleChannelInboundHandler<CommandReply>() { @Override protected void channelRead0(ChannelHandlerContext ctx, CommandReply msg) throws Exception { replyMessageFuture.set(msg); ctx.channel().close(); } }); // connected is fine, flush message: future.channel().writeAndFlush(request); } else { replyMessageFuture.setException(future.cause()); future.channel().close(); } } }); return replyMessageFuture; }
@Override protected void init(SpecificTransportContext<AvroTcpConfig> context) throws TransportLifecycleException { AvroTcpConfig configuration = context.getConfiguration(); configuration.setBindInterface(replaceProperty(configuration.getBindInterface(), BIND_INTERFACE_PROP_NAME, context.getCommonProperties().getProperty(BIND_INTERFACE_PROP_NAME, LOCALHOST))); configuration.setPublicInterface(replaceProperty(configuration.getPublicInterface(), PUBLIC_INTERFACE_PROP_NAME, context.getCommonProperties().getProperty(PUBLIC_INTERFACE_PROP_NAME, LOCALHOST))); final KaaTcpCommandFactory factory = new KaaTcpCommandFactory(); this.netty = new AbstractNettyServer(configuration.getBindInterface(), configuration.getBindPort()) { @Override protected ChannelInitializer<SocketChannel> configureInitializer() throws Exception { return new AbstractKaaTcpServerInitializer() { @Override protected SimpleChannelInboundHandler<AbstractKaaTcpCommandProcessor> getMainHandler( UUID uuid) { return new TcpHandler(uuid, TcpTransport.this.handler); } @Override protected KaaTcpDecoder getDecoder() { return new KaaTcpDecoder(factory); } }; } }; }
@Test public void listenerTest() throws Exception { final CountDownLatch awaitLatch = new CountDownLatch(1); Bootstrap bootstrap = client(); Channel channel = bootstrap.connect(webServer.getHostname(), webServer.getListeningPort()).sync().channel(); channel.pipeline().addLast(new SimpleChannelInboundHandler<FullHttpResponse>() { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { awaitLatch.countDown(); } }); HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); channel.writeAndFlush(request); boolean await = awaitLatch.await(3000, TimeUnit.MILLISECONDS); Assert.assertTrue(await); PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance(); verifier.printCache(); verifier.verifyTrace(event("NETTY", Bootstrap.class.getMethod("connect", SocketAddress.class), annotation("netty.address", webServer.getHostAndPort()))); verifier.verifyTrace(event("NETTY", "io.netty.channel.DefaultChannelPipeline.writeAndFlush(java.lang.Object)")); verifier.verifyTrace(event("ASYNC", "Asynchronous Invocation")); verifier.verifyTrace(event("NETTY_HTTP", "io.netty.handler.codec.http.HttpObjectEncoder.encode(io.netty.channel.ChannelHandlerContext, java.lang.Object, java.util.List)", annotation("http.url", "/"))); }
@Test public void writeTest() throws Exception { final CountDownLatch awaitLatch = new CountDownLatch(1); Bootstrap bootstrap = client(); bootstrap.connect(webServer.getHostname(), webServer.getListeningPort()).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { Channel channel = future.channel(); channel.pipeline().addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { awaitLatch.countDown(); } }); HttpRequest request = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); future.channel().writeAndFlush(request); } } }); boolean await = awaitLatch.await(3000, TimeUnit.MILLISECONDS); Assert.assertTrue(await); PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance(); verifier.printCache(); verifier.verifyTrace(event("NETTY", Bootstrap.class.getMethod("connect", SocketAddress.class), annotation("netty.address", webServer.getHostAndPort()))); verifier.verifyTrace(event("NETTY", "io.netty.channel.DefaultChannelPromise.addListener(io.netty.util.concurrent.GenericFutureListener)")); verifier.verifyTrace(event("ASYNC", "Asynchronous Invocation")); verifier.verifyTrace(event("NETTY_INTERNAL", "io.netty.util.concurrent.DefaultPromise.notifyListenersNow()")); verifier.verifyTrace(event("NETTY_INTERNAL", "io.netty.util.concurrent.DefaultPromise.notifyListener0(io.netty.util.concurrent.Future, io.netty.util.concurrent.GenericFutureListener)")); verifier.verifyTrace(event("NETTY", "io.netty.channel.DefaultChannelPipeline.writeAndFlush(java.lang.Object)")); verifier.verifyTrace(event("NETTY_HTTP", "io.netty.handler.codec.http.HttpObjectEncoder.encode(io.netty.channel.ChannelHandlerContext, java.lang.Object, java.util.List)", annotation("http.url", "/"))); }
private void start() throws Exception { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parent, child) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("codec", new HttpServerCodec()); pipeline.addLast("httpHandler", new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext context, Object message) throws Exception { if (message instanceof HttpRequest) { // Keep track of processed requests HttpRequest request = (HttpRequest) message; requests.add(request); } if (message instanceof HttpContent) { // Write an empty JSON response back to the client FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.copiedBuffer("{}", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "application/json; charset=UTF-8"); response.headers().set(CONTENT_LENGTH, response.content().readableBytes()); context.writeAndFlush(response); } } }); } }); channel = bootstrap.bind(port).syncUninterruptibly().channel(); }
public SimpleSipStack(final SimpleChannelInboundHandler<SipMessageEvent> handler, final String ip, final int port) { this.ip = ip; this.port = port; this.bootstrap = createUDPListeningPoint(handler); this.serverBootstrap = createTCPListeningPoint(handler); }
private Bootstrap createUDPListeningPoint(final SimpleChannelInboundHandler<SipMessageEvent> handler) { final Bootstrap b = new Bootstrap(); b.group(this.udpGroup) .channel(NioDatagramChannel.class) .handler(new ChannelInitializer<DatagramChannel>() { @Override protected void initChannel(final DatagramChannel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new SipMessageDatagramDecoder()); pipeline.addLast("encoder", new SipMessageEncoder()); pipeline.addLast("handler", handler); } }); return b; }
/** * Returns a new chanel initializer suited to decode and process HTTP * requests. * * @param handler the handler implementing the application logic */ public static final ChannelInitializer<Channel> httpServer( final SimpleChannelInboundHandler<HttpRequest> handler) { Preconditions.checkArgument(handler.isSharable()); return new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("httpCodec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024)); pipeline.addLast("httpServerHandler", handler); } }; }
/** * Returns a channel initializer suited to decode and process HTTP responses. * * @param handler the handler implementing the application logic */ public static final ChannelInitializer<Channel> httpClient( final SimpleChannelInboundHandler<HttpResponse> handler) { return new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("httpCodec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024)); pipeline.addLast("httpClientHandler", handler); } }; }