@Override protected final Object filterOutboundMessage(Object msg) throws Exception { if (msg instanceof SctpMessage) { SctpMessage m = (SctpMessage) msg; ByteBuf buf = m.content(); if (buf.isDirect() && buf.nioBufferCount() == 1) { return m; } return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf)); } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + " (expected: " + StringUtil.simpleClassName(SctpMessage.class)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(msg); if(msg instanceof SctpMessage){ SctpMessage sctpMsg = (SctpMessage) msg; System.out.println(sctpMsg.content().toString(CharsetUtil.UTF_8)); ctx.write(sctpMsg); } }
@Override protected Object filterOutboundMessage(Object msg) throws Exception { if (msg instanceof SctpMessage) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE); }
@Override protected int doReadMessages(List<Object> buf) throws Exception { SctpChannel ch = javaChannel(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle(); } ByteBuf buffer = allocHandle.allocate(config().getAllocator()); boolean free = true; try { ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes()); int pos = data.position(); MessageInfo messageInfo = ch.receive(data, null, notificationHandler); if (messageInfo == null) { return 0; } buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.position() - pos))); free = false; return 1; } catch (Throwable cause) { PlatformDependent.throwException(cause); return -1; } finally { int bytesRead = buffer.readableBytes(); allocHandle.record(bytesRead); if (free) { buffer.release(); } } }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } ByteBufAllocator alloc = alloc(); boolean needsCopy = data.nioBufferCount() != 1; if (!needsCopy) { if (!data.isDirect() && alloc.isDirectBufferPooled()) { needsCopy = true; } } ByteBuffer nioData; if (!needsCopy) { nioData = data.nioBuffer(); } else { data = alloc.directBuffer(dataLen).writeBytes(data); nioData = data.nioBuffer(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); final int writtenBytes = javaChannel().send(nioData, mi); return writtenBytes > 0; }
@Override public final boolean acceptInboundMessage(Object msg) throws Exception { if (super.acceptInboundMessage(msg)) { return acceptInboundMessage((SctpMessage) msg); } return false; }
@Override protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception { if (!msg.isComplete()) { throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in the " + "pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName())); } out.add(msg.content().retain()); }
@Override protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception { final ByteBuf byteBuf = msg.content(); final int protocolIdentifier = msg.protocolIdentifier(); final int streamIdentifier = msg.streamIdentifier(); final boolean isComplete = msg.isComplete(); ByteBuf frag; if (fragments.containsKey(streamIdentifier)) { frag = fragments.remove(streamIdentifier); } else { frag = Unpooled.EMPTY_BUFFER; } if (isComplete && !frag.isReadable()) { //data chunk is not fragmented out.add(msg); } else if (!isComplete && frag.isReadable()) { //more message to complete fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf)); } else if (isComplete && frag.isReadable()) { //last message to complete fragments.remove(streamIdentifier); SctpMessage assembledMsg = new SctpMessage( protocolIdentifier, streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf)); out.add(assembledMsg); } else { //first incomplete message fragments.put(streamIdentifier, byteBuf); } byteBuf.retain(); }
@Override public boolean acceptInboundMessage(Object msg) throws Exception { if (msg instanceof SctpMessage) { SctpMessage sctpMsg = (SctpMessage) msg; if (sctpMsg.isComplete()) { return true; } throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in " + "the pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName())); } else { return false; } }
@Override protected int doReadMessages(List<Object> buf) throws Exception { SctpChannel ch = javaChannel(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle(); } ByteBuf buffer = allocHandle.allocate(config().getAllocator()); boolean free = true; try { ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes()); int pos = data.position(); MessageInfo messageInfo = ch.receive(data, null, notificationHandler); if (messageInfo == null) { return 0; } buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + (data.position() - pos)))); free = false; return 1; } catch (Throwable cause) { PlatformDependent.throwException(cause); return -1; } finally { int bytesRead = buffer.readableBytes(); allocHandle.record(bytesRead); if (free) { buffer.release(); } } }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } ByteBufAllocator alloc = alloc(); boolean needsCopy = data.nioBufferCount() != 1; if (!needsCopy) { if (!data.isDirect() && alloc.isDirectBufferPooled()) { needsCopy = true; } } ByteBuffer nioData; if (!needsCopy) { nioData = data.nioBuffer(); } else { data = alloc.directBuffer(dataLen).writeBytes(data); nioData = data.nioBuffer(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); final int writtenBytes = javaChannel().send(nioData, mi); boolean done = writtenBytes > 0; if (needsCopy) { if (!done) { in.current(new SctpMessage(mi, data)); } else { in.current(data); } } return done; }
@Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(new SctpMessage(0, 0, firstMessage)); }
@Override public void channelActive(ChannelHandlerContext ctx) { ctx.write(new SctpMessage(0, 0, firstMessage)); ctx.write(new SctpMessage(0, 0, secondMessage)); ctx.flush(); }
@Override protected int doReadMessages(List<Object> msgs) throws Exception { if (!readSelector.isOpen()) { return 0; } int readMessages = 0; final int selectedKeys = readSelector.select(SO_TIMEOUT); final boolean keysSelected = selectedKeys > 0; if (!keysSelected) { return readMessages; } Set<SelectionKey> reableKeys = readSelector.selectedKeys(); try { for (SelectionKey ignored : reableKeys) { RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle(); } ByteBuf buffer = allocHandle.allocate(config().getAllocator()); boolean free = true; try { ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes()); MessageInfo messageInfo = ch.receive(data, null, notificationHandler); if (messageInfo == null) { return readMessages; } data.flip(); msgs.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining()))); free = false; readMessages ++; } catch (Throwable cause) { PlatformDependent.throwException(cause); } finally { int bytesRead = buffer.readableBytes(); allocHandle.record(bytesRead); if (free) { buffer.release(); } } } } finally { reableKeys.clear(); } return readMessages; }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { if (!writeSelector.isOpen()) { return; } final int size = in.size(); final int selectedKeys = writeSelector.select(SO_TIMEOUT); if (selectedKeys > 0) { final Set<SelectionKey> writableKeys = writeSelector.selectedKeys(); if (writableKeys.isEmpty()) { return; } Iterator<SelectionKey> writableKeysIt = writableKeys.iterator(); int written = 0; for (;;) { if (written == size) { // all written return; } writableKeysIt.next(); writableKeysIt.remove(); SctpMessage packet = (SctpMessage) in.current(); if (packet == null) { return; } ByteBuf data = packet.content(); int dataLen = data.readableBytes(); ByteBuffer nioData; if (data.nioBufferCount() != -1) { nioData = data.nioBuffer(); } else { nioData = ByteBuffer.allocate(dataLen); data.getBytes(data.readerIndex(), nioData); nioData.flip(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); ch.send(nioData, mi); written ++; in.remove(); if (!writableKeysIt.hasNext()) { return; } } } }
@Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { out.add(new SctpMessage(streamIdentifier, protocolIdentifier, msg.retain())); }
protected boolean acceptInboundMessage(SctpMessage msg) { return msg.protocolIdentifier() == protocolIdentifier && msg.streamIdentifier() == streamIdentifier; }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(new SctpMessage(0, 0, firstMessage)); }