/** * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * @param buf the {@link ByteBuf} from which the bytes should be written */ private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception { int readableBytes = buf.readableBytes(); if (readableBytes == 0) { in.remove(); return true; } if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { int writtenBytes = doWriteBytes(buf, writeSpinCount); in.removeBytes(writtenBytes); return writtenBytes == readableBytes; } else { ByteBuffer[] nioBuffers = buf.nioBuffers(); return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount); } }
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { // The outbound buffer contains only one message or it contains a file region. Object msg = in.current(); if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!writeBytes(in, buf, writeSpinCount)) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. return false; } } else if (msg instanceof DefaultFileRegion) { DefaultFileRegion region = (DefaultFileRegion) msg; if (!writeFileRegion(in, region, writeSpinCount)) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. return false; } } else { // Should never reach here. throw new Error(); } return true; }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(); if (msg == null) { // nothing left to write break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; while (buf.isReadable()) { doWriteBytes(buf); } in.remove(); } else if (msg instanceof FileRegion) { doWriteFileRegion((FileRegion) msg); in.remove(); } else { in.remove(new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg))); } } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { final Object o = in.current(); if (o == null) { break; } if (o instanceof CanFrame) { socket.send((CanFrame) o); in.remove(); } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o)); } } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { final Object o = in.current(); if (o == null) { break; } // TODO if (o instanceof CanFrame) { socket.send((CanFrame) o); in.remove(); } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o)); } } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(false); if (msg == null) { // nothing left to write break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; while (buf.isReadable()) { doWriteBytes(buf); } in.remove(); } else if (msg instanceof FileRegion) { doWriteFileRegion((FileRegion) msg); in.remove(); } else { in.remove(new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg))); } } }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer buffer) throws Exception { DatagramPacket dpacket = (DatagramPacket) msg; InetSocketAddress recipient = dpacket.recipient(); ByteBuf byteBuf = dpacket.content(); int readableBytes = byteBuf.readableBytes(); if (readableBytes == 0) { return true; } ByteBuffer internalNioBuffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), readableBytes); return javaChannel().send(internalNioBuffer, recipient) > 0; }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { // expects a message final UdtMessage message = (UdtMessage) msg; final ByteBuf byteBuf = message.content(); final int messageSize = byteBuf.readableBytes(); final long writtenBytes; if (byteBuf.nioBufferCount() == 1) { writtenBytes = javaChannel().write(byteBuf.nioBuffer()); } else { writtenBytes = javaChannel().write(byteBuf.nioBuffers()); } // did not write the message if (writtenBytes <= 0 && messageSize > 0) { return false; } // wrote message completely if (writtenBytes != messageSize) { throw new Error( "Provider error: failed to write message. Provider library should be upgraded."); } return true; }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { final SocketAddress remoteAddress; final ByteBuf data; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg; remoteAddress = envelope.recipient(); data = envelope.content(); } else { data = (ByteBuf) msg; remoteAddress = null; } final int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen); final int writtenBytes; if (remoteAddress != null) { writtenBytes = javaChannel().send(nioData, remoteAddress); } else { writtenBytes = javaChannel().write(nioData); } return writtenBytes > 0; }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(); if (msg == null) { break; } ReferenceCountUtil.retain(msg); outboundMessages.add(msg); in.remove(); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(); if (msg == null) { // nothing left to write break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); while (readableBytes > 0) { doWriteBytes(buf); int newReadableBytes = buf.readableBytes(); in.progress(readableBytes - newReadableBytes); readableBytes = newReadableBytes; } in.remove(); } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; long transfered = region.transfered(); doWriteFileRegion(region); in.progress(region.transfered() - transfered); in.remove(); } else { in.remove(new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg))); } } }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } ByteBufAllocator alloc = alloc(); boolean needsCopy = data.nioBufferCount() != 1; if (!needsCopy) { if (!data.isDirect() && alloc.isDirectBufferPooled()) { needsCopy = true; } } ByteBuffer nioData; if (!needsCopy) { nioData = data.nioBuffer(); } else { data = alloc.directBuffer(dataLen).writeBytes(data); nioData = data.nioBuffer(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); final int writtenBytes = javaChannel().send(nioData, mi); return writtenBytes > 0; }
/** * Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of * {@link ChannelOutboundBuffer}. */ static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception { NativeDatagramPacketArray array = ARRAY.get(); array.count = 0; buffer.forEachFlushedMessage(array); return array; }
/** * Write a {@link DefaultFileRegion} * * @param region the {@link DefaultFileRegion} from which the bytes should be written * @return amount the amount of written bytes */ private boolean writeFileRegion( ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception { final long regionCount = region.count(); if (region.transfered() >= regionCount) { in.remove(); return true; } final long baseOffset = region.position(); boolean done = false; long flushedAmount = 0; for (int i = writeSpinCount - 1; i >= 0; i--) { final long offset = region.transfered(); final long localFlushedAmount = Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset); if (localFlushedAmount == 0) { break; } flushedAmount += localFlushedAmount; if (region.transfered() >= regionCount) { done = true; break; } } if (flushedAmount > 0) { in.progress(flushedAmount); } if (done) { in.remove(); } else { // Returned EAGAIN need to set EPOLLOUT setFlag(Native.EPOLLOUT); } return done; }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = config().getWriteSpinCount(); for (;;) { final int msgCount = in.size(); if (msgCount == 0) { // Wrote all messages. clearFlag(Native.EPOLLOUT); break; } // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. if (msgCount > 1 && in.current() instanceof ByteBuf) { if (!doWriteMultiple(in, writeSpinCount)) { break; } // We do not break the loop here even if the outbound buffer was flushed completely, // because a user might have triggered another write and flush when we notify his or her // listeners. } else { // msgCount == 1 if (!doWriteSingle(in, writeSpinCount)) { break; } } } }
@Override protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { Object msg = in.current(); if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) { // File descriptor was written, so remove it. in.remove(); return true; } return super.doWriteSingle(in, writeSpinCount); }
/** * Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}. */ static IovArray get(ChannelOutboundBuffer buffer) throws Exception { IovArray array = ARRAY.get(); array.clear(); buffer.forEachFlushedMessage(array); return array; }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (; ; ) { Object msg = in.current(); if (msg == null) { // Wrote all messages. if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { if (doWriteMessage(msg, in)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } ByteBufAllocator alloc = alloc(); boolean needsCopy = data.nioBufferCount() != 1; if (!needsCopy) { if (!data.isDirect() && alloc.isDirectBufferPooled()) { needsCopy = true; } } ByteBuffer nioData; if (!needsCopy) { nioData = data.nioBuffer(); } else { data = alloc.directBuffer(dataLen).writeBytes(data); nioData = data.nioBuffer(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); final int writtenBytes = javaChannel().send(nioData, mi); boolean done = writtenBytes > 0; if (needsCopy) { if (!done) { in.current(new SctpMessage(mi, data)); } else { in.current(data); } } return done; }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(false); if (msg == null) { break; } ReferenceCountUtil.retain(msg); outboundMessages.add(msg); in.remove(); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { if (doWriteMessage(msg, in)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } }
private boolean hasPendingWriteBytes() { // On close the outboundBuffer is made null. After that point // adding messages and flushes to outboundBuffer is not allowed. ChannelOutboundBuffer outBuffer = this.unsafe.outboundBuffer(); return outBuffer != null && outBuffer.totalPendingWriteBytes() > 0; }
@Override protected void doWrite(ChannelOutboundBuffer arg0) throws Exception { }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); }
void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) { ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer(); if (cob != null) { cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { final Object o = in.current(); if (o == null) { break; } final ByteBuf data; final SocketAddress remoteAddress; if (o instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o; remoteAddress = envelope.recipient(); data = envelope.content(); } else { data = (ByteBuf) o; remoteAddress = null; } final int length = data.readableBytes(); if (remoteAddress != null) { tmpPacket.setSocketAddress(remoteAddress); } if (data.hasArray()) { tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); } else { byte[] tmp = new byte[length]; data.getBytes(data.readerIndex(), tmp); tmpPacket.setData(tmp); } try { socket.send(tmpPacket); in.remove(); } catch (IOException e) { // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 in.remove(e); } } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { if (state < 2) { throw new NotYetConnectedException(); } if (state > 2) { throw new ClosedChannelException(); } final LocalChannel peer = this.peer; final ChannelPipeline peerPipeline = peer.pipeline(); final EventLoop peerLoop = peer.eventLoop(); if (peerLoop == eventLoop()) { for (;;) { Object msg = in.current(); if (msg == null) { break; } peer.inboundBuffer.add(msg); ReferenceCountUtil.retain(msg); in.remove(); } finishPeerRead(peer, peerPipeline); } else { // Use a copy because the original msgs will be recycled by AbstractChannel. final Object[] msgsCopy = new Object[in.size()]; for (int i = 0; i < msgsCopy.length; i ++) { msgsCopy[i] = ReferenceCountUtil.retain(in.current()); in.remove(); } peerLoop.execute(new Runnable() { @Override public void run() { Collections.addAll(peer.inboundBuffer, msgsCopy); finishPeerRead(peer, peerPipeline); } }); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } try { boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { if (doWriteMessage(msg, in)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } catch (IOException e) { if (continueOnWriteError()) { in.remove(e); } else { throw e; } } } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { if (!writeSelector.isOpen()) { return; } final int size = in.size(); final int selectedKeys = writeSelector.select(SO_TIMEOUT); if (selectedKeys > 0) { final Set<SelectionKey> writableKeys = writeSelector.selectedKeys(); if (writableKeys.isEmpty()) { return; } Iterator<SelectionKey> writableKeysIt = writableKeys.iterator(); int written = 0; for (;;) { if (written == size) { // all written return; } writableKeysIt.next(); writableKeysIt.remove(); SctpMessage packet = (SctpMessage) in.current(); if (packet == null) { return; } ByteBuf data = packet.content(); int dataLen = data.readableBytes(); ByteBuffer nioData; if (data.nioBufferCount() != -1) { nioData = data.nioBuffer(); } else { nioData = ByteBuffer.allocate(dataLen); data.getBytes(data.readerIndex(), nioData); nioData.flip(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); ch.send(nioData, mi); written ++; in.remove(); if (!writableKeysIt.hasNext()) { return; } } } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. clearFlag(Native.EPOLLOUT); break; } try { // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) { NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in); int cnt = array.count(); if (cnt >= 1) { // Try to use gathering writes via sendmmsg(...) syscall. int offset = 0; NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets(); while (cnt > 0) { int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt); if (send == 0) { // Did not write all messages. setFlag(Native.EPOLLOUT); return; } for (int i = 0; i < send; i++) { in.remove(); } cnt -= send; offset += send; } continue; } } boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { if (doWriteMessage(msg)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. setFlag(Native.EPOLLOUT); break; } } catch (IOException e) { // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 in.remove(e); } } }
private boolean writeBytesMultiple( ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException { long expectedWrittenBytes = array.size(); final long initialExpectedWrittenBytes = expectedWrittenBytes; int cnt = array.count(); assert expectedWrittenBytes != 0; assert cnt != 0; boolean done = false; int offset = 0; int end = offset + cnt; for (int i = writeSpinCount - 1; i >= 0; i--) { long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt); if (localWrittenBytes == 0) { break; } expectedWrittenBytes -= localWrittenBytes; if (expectedWrittenBytes == 0) { // Written everything, just break out here (fast-path) done = true; break; } do { long bytes = array.processWritten(offset, localWrittenBytes); if (bytes == -1) { // incomplete write break; } else { offset++; cnt--; localWrittenBytes -= bytes; } } while (offset < end && localWrittenBytes > 0); } if (!done) { setFlag(Native.EPOLLOUT); } in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); return done; }