public void initChannel(AbstractChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } // Add the text line codec combination first, pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); // the encoder and decoder are static as these are sharable pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // and then business logic. pipeline.addLast(iotNettyTcpHandler); }
/** * {@inheritDoc} * @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel) */ @Override protected void initChannel(final AbstractChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("bytesDecoder", bytesDecoder); p.addLast("framer", new LineBasedFrameDecoder(1024, true, true)); p.addLast("linehandler", new StringMetricHandler()); }
public static ChannelInitializer<AbstractChannel> createChannelWithDecoder( @Nonnull final BmpSessionFactory sessionFactory, @Nonnull final BmpHandlerFactory hf, @Nonnull final BmpSessionListenerFactory slf) { return new ChannelInitializer<AbstractChannel>() { @Override protected void initChannel(final AbstractChannel ch) throws Exception { ch.pipeline().addLast(hf.getDecoders()); ch.pipeline().addLast(sessionFactory.getSession(ch, slf)); } }; }
public static ChannelInitializer<AbstractChannel> createChannelWithEncoder( @Nonnull final BmpSessionFactory sessionFactory, @Nonnull final BmpHandlerFactory hf, @Nonnull final BmpSessionListenerFactory slf) { return new ChannelInitializer<AbstractChannel>() { @Override protected void initChannel(final AbstractChannel ch) throws Exception { ch.pipeline().addLast(hf.getEncoders()); ch.pipeline().addLast(sessionFactory.getSession(ch, slf)); } }; }
@Test public void testWriteFailsFastOnClosedChannel() throws Exception { EventLoopGroup clientGroup = new LocalEventLoopGroup(); EventLoopGroup serverGroup = new LocalEventLoopGroup(); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); cb.group(clientGroup) .channel(LocalChannel.class) .handler(new TestHandler()); sb.group(serverGroup) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new TestHandler()); } }); // Start server sb.bind(addr).sync(); // Connect to the server final Channel cc = cb.connect(addr).sync().channel(); // Close the channel and write something. cc.close().sync(); try { cc.writeAndFlush(new Object()).sync(); fail("must raise a ClosedChannelException"); } catch (Exception e) { assertThat(e, is(instanceOf(ClosedChannelException.class))); // Ensure that the actual write attempt on a closed channel was never made by asserting that // the ClosedChannelException has been created by AbstractUnsafe rather than transport implementations. if (e.getStackTrace().length > 0) { assertThat( e.getStackTrace()[0].getClassName(), is(AbstractChannel.class.getName() + "$AbstractUnsafe")); e.printStackTrace(); } } serverGroup.shutdownGracefully(); clientGroup.shutdownGracefully(); serverGroup.terminationFuture().sync(); clientGroup.terminationFuture().sync(); }
ChannelInitializer<AbstractChannel> create(@Nonnull BmpSessionFactory sessionFactory, @Nonnull BmpHandlerFactory hf, @Nonnull BmpSessionListenerFactory slf);
@Test public void testWriteFailsFastOnClosedChannel() throws Exception { EventLoopGroup clientGroup = new DefaultEventLoopGroup(); EventLoopGroup serverGroup = new DefaultEventLoopGroup(); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); cb.group(clientGroup) .channel(LocalChannel.class) .handler(new TestHandler()); sb.group(serverGroup) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new TestHandler()); } }); // Start server sb.bind(addr).sync(); // Connect to the server final Channel cc = cb.connect(addr).sync().channel(); // Close the channel and write something. cc.close().sync(); try { cc.writeAndFlush(new Object()).sync(); fail("must raise a ClosedChannelException"); } catch (Exception e) { assertThat(e, is(instanceOf(ClosedChannelException.class))); // Ensure that the actual write attempt on a closed channel was never made by asserting that // the ClosedChannelException has been created by AbstractUnsafe rather than transport implementations. if (e.getStackTrace().length > 0) { assertThat( e.getStackTrace()[0].getClassName(), is(AbstractChannel.class.getName() + "$AbstractUnsafe")); e.printStackTrace(); } } serverGroup.shutdownGracefully(); clientGroup.shutdownGracefully(); serverGroup.terminationFuture().sync(); clientGroup.terminationFuture().sync(); }