Java 类io.netty.channel.ChannelOutboundBuffer 实例源码

项目:netty4.0.27Learn    文件:AbstractEpollStreamChannel.java   
/**
 * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
 * @param buf           the {@link ByteBuf} from which the bytes should be written
 */
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception {
    int readableBytes = buf.readableBytes();
    if (readableBytes == 0) {
        in.remove();
        return true;
    }

    if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
        int writtenBytes = doWriteBytes(buf, writeSpinCount);
        in.removeBytes(writtenBytes);
        return writtenBytes == readableBytes;
    } else {
        ByteBuffer[] nioBuffers = buf.nioBuffers();
        return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount);
    }
}
项目:netty4.0.27Learn    文件:AbstractEpollStreamChannel.java   
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
    // The outbound buffer contains only one message or it contains a file region.
    Object msg = in.current();
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (!writeBytes(in, buf, writeSpinCount)) {
            // was not able to write everything so break here we will get notified later again once
            // the network stack can handle more writes.
            return false;
        }
    } else if (msg instanceof DefaultFileRegion) {
        DefaultFileRegion region = (DefaultFileRegion) msg;
        if (!writeFileRegion(in, region, writeSpinCount)) {
            // was not able to write everything so break here we will get notified later again once
            // the network stack can handle more writes.
            return false;
        }
    } else {
        // Should never reach here.
        throw new Error();
    }

    return true;
}
项目:netty4study    文件:AbstractOioByteChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // nothing left to write
            break;
        }
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            while (buf.isReadable()) {
                doWriteBytes(buf);
            }
            in.remove();
        } else if (msg instanceof FileRegion) {
            doWriteFileRegion((FileRegion) msg);
            in.remove();
        } else {
            in.remove(new UnsupportedOperationException(
                    "unsupported message type: " + StringUtil.simpleClassName(msg)));
        }
    }
}
项目:jnaCan    文件:CanChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        final Object o = in.current();
        if (o == null) {
            break;
        }

        if (o instanceof CanFrame) {
            socket.send((CanFrame) o);
            in.remove();
        } else {
            throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o));
        }
    }
}
项目:jnaCan    文件:Tp20Channel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        final Object o = in.current();
        if (o == null) {
            break;
        }
        // TODO
        if (o instanceof CanFrame) {
            socket.send((CanFrame) o);
            in.remove();
        } else {
            throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o));
        }
    }
}
项目:netty-netty-5.0.0.Alpha1    文件:AbstractOioByteChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current(false);
        if (msg == null) {
            // nothing left to write
            break;
        }
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            while (buf.isReadable()) {
                doWriteBytes(buf);
            }
            in.remove();
        } else if (msg instanceof FileRegion) {
            doWriteFileRegion((FileRegion) msg);
            in.remove();
        } else {
            in.remove(new UnsupportedOperationException(
                    "unsupported message type: " + StringUtil.simpleClassName(msg)));
        }
    }
}
项目:UdpServerSocketChannel    文件:NioUdpServerChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer buffer) throws Exception {
    DatagramPacket dpacket = (DatagramPacket) msg;
    InetSocketAddress recipient = dpacket.recipient();
    ByteBuf byteBuf = dpacket.content();
    int readableBytes = byteBuf.readableBytes();
    if (readableBytes == 0) {
        return true;
    }
    ByteBuffer internalNioBuffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), readableBytes);
    return javaChannel().send(internalNioBuffer, recipient) > 0;
}
项目:netty4.0.27Learn    文件:NioUdtMessageConnectorChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    // expects a message
    final UdtMessage message = (UdtMessage) msg;

    final ByteBuf byteBuf = message.content();

    final int messageSize = byteBuf.readableBytes();

    final long writtenBytes;
    if (byteBuf.nioBufferCount() == 1) {
        writtenBytes = javaChannel().write(byteBuf.nioBuffer());
    } else {
        writtenBytes = javaChannel().write(byteBuf.nioBuffers());
    }

    // did not write the message
    if (writtenBytes <= 0 && messageSize > 0) {
        return false;
    }

    // wrote message completely
    if (writtenBytes != messageSize) {
        throw new Error(
                "Provider error: failed to write message. Provider library should be upgraded.");
    }

    return true;
}
项目:netty4.0.27Learn    文件:NioDatagramChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    final SocketAddress remoteAddress;
    final ByteBuf data;
    if (msg instanceof AddressedEnvelope) {
        @SuppressWarnings("unchecked")
        AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
        remoteAddress = envelope.recipient();
        data = envelope.content();
    } else {
        data = (ByteBuf) msg;
        remoteAddress = null;
    }

    final int dataLen = data.readableBytes();
    if (dataLen == 0) {
        return true;
    }

    final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
    final int writtenBytes;
    if (remoteAddress != null) {
        writtenBytes = javaChannel().send(nioData, remoteAddress);
    } else {
        writtenBytes = javaChannel().write(nioData);
    }
    return writtenBytes > 0;
}
项目:netty4.0.27Learn    文件:EmbeddedChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            break;
        }

        ReferenceCountUtil.retain(msg);
        outboundMessages.add(msg);
        in.remove();
    }
}
项目:netty4.0.27Learn    文件:AbstractOioByteChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // nothing left to write
            break;
        }
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            int readableBytes = buf.readableBytes();
            while (readableBytes > 0) {
                doWriteBytes(buf);
                int newReadableBytes = buf.readableBytes();
                in.progress(readableBytes - newReadableBytes);
                readableBytes = newReadableBytes;
            }
            in.remove();
        } else if (msg instanceof FileRegion) {
            FileRegion region = (FileRegion) msg;
            long transfered = region.transfered();
            doWriteFileRegion(region);
            in.progress(region.transfered() - transfered);
            in.remove();
        } else {
            in.remove(new UnsupportedOperationException(
                    "unsupported message type: " + StringUtil.simpleClassName(msg)));
        }
    }
}
项目:netty4.0.27Learn    文件:NioSctpChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    SctpMessage packet = (SctpMessage) msg;
    ByteBuf data = packet.content();
    int dataLen = data.readableBytes();
    if (dataLen == 0) {
        return true;
    }

    ByteBufAllocator alloc = alloc();
    boolean needsCopy = data.nioBufferCount() != 1;
    if (!needsCopy) {
        if (!data.isDirect() && alloc.isDirectBufferPooled()) {
            needsCopy = true;
        }
    }
    ByteBuffer nioData;
    if (!needsCopy) {
        nioData = data.nioBuffer();
    } else {
        data = alloc.directBuffer(dataLen).writeBytes(data);
        nioData = data.nioBuffer();
    }
    final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
    mi.payloadProtocolID(packet.protocolIdentifier());
    mi.streamNumber(packet.streamIdentifier());

    final int writtenBytes = javaChannel().send(nioData, mi);
    return writtenBytes > 0;
}
项目:netty4.0.27Learn    文件:NativeDatagramPacketArray.java   
/**
 * Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
 * {@link ChannelOutboundBuffer}.
 */
static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
    NativeDatagramPacketArray array = ARRAY.get();
    array.count = 0;
    buffer.forEachFlushedMessage(array);
    return array;
}
项目:netty4.0.27Learn    文件:AbstractEpollStreamChannel.java   
/**
 * Write a {@link DefaultFileRegion}
 *
 * @param region        the {@link DefaultFileRegion} from which the bytes should be written
 * @return amount       the amount of written bytes
 */
private boolean writeFileRegion(
        ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
    final long regionCount = region.count();
    if (region.transfered() >= regionCount) {
        in.remove();
        return true;
    }

    final long baseOffset = region.position();
    boolean done = false;
    long flushedAmount = 0;

    for (int i = writeSpinCount - 1; i >= 0; i--) {
        final long offset = region.transfered();
        final long localFlushedAmount =
                Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
        if (localFlushedAmount == 0) {
            break;
        }

        flushedAmount += localFlushedAmount;
        if (region.transfered() >= regionCount) {
            done = true;
            break;
        }
    }

    if (flushedAmount > 0) {
        in.progress(flushedAmount);
    }

    if (done) {
        in.remove();
    } else {
        // Returned EAGAIN need to set EPOLLOUT
        setFlag(Native.EPOLLOUT);
    }
    return done;
}
项目:netty4.0.27Learn    文件:AbstractEpollStreamChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();
    for (;;) {
        final int msgCount = in.size();

        if (msgCount == 0) {
            // Wrote all messages.
            clearFlag(Native.EPOLLOUT);
            break;
        }

        // Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
        if (msgCount > 1 && in.current() instanceof ByteBuf) {
            if (!doWriteMultiple(in, writeSpinCount)) {
                break;
            }

            // We do not break the loop here even if the outbound buffer was flushed completely,
            // because a user might have triggered another write and flush when we notify his or her
            // listeners.
        } else { // msgCount == 1
            if (!doWriteSingle(in, writeSpinCount)) {
                break;
            }
        }
    }
}
项目:netty4.0.27Learn    文件:EpollDomainSocketChannel.java   
@Override
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
    Object msg = in.current();
    if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
        // File descriptor was written, so remove it.
        in.remove();
        return true;
    }
    return super.doWriteSingle(in, writeSpinCount);
}
项目:netty4.0.27Learn    文件:IovArrayThreadLocal.java   
/**
 * Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
 */
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
    IovArray array = ARRAY.get();
    array.clear();
    buffer.forEachFlushedMessage(array);
    return array;
}
项目:netty4study    文件:NioUdtMessageConnectorChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    // expects a message
    final UdtMessage message = (UdtMessage) msg;

    final ByteBuf byteBuf = message.content();

    final int messageSize = byteBuf.readableBytes();

    final long writtenBytes;
    if (byteBuf.nioBufferCount() == 1) {
        writtenBytes = javaChannel().write(byteBuf.nioBuffer());
    } else {
        writtenBytes = javaChannel().write(byteBuf.nioBuffers());
    }

    // did not write the message
    if (writtenBytes <= 0 && messageSize > 0) {
        return false;
    }

    // wrote message completely
    if (writtenBytes != messageSize) {
        throw new Error(
                "Provider error: failed to write message. Provider library should be upgraded.");
    }

    return true;
}
项目:netty4study    文件:EmbeddedChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            break;
        }

        ReferenceCountUtil.retain(msg);
        outboundMessages.add(msg);
        in.remove();
    }
}
项目:netty4study    文件:AbstractNioMessageChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    final SelectionKey key = selectionKey();
    final int interestOps = key.interestOps();

    for (; ; ) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
            }
            break;
        }

        boolean done = false;
        for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
            if (doWriteMessage(msg, in)) {
                done = true;
                break;
            }
        }

        if (done) {
            in.remove();
        } else {
            // Did not write all messages.
            if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                key.interestOps(interestOps | SelectionKey.OP_WRITE);
            }
            break;
        }
    }
}
项目:netty4study    文件:NioSctpChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    SctpMessage packet = (SctpMessage) msg;
    ByteBuf data = packet.content();
    int dataLen = data.readableBytes();
    if (dataLen == 0) {
        return true;
    }

    ByteBufAllocator alloc = alloc();
    boolean needsCopy = data.nioBufferCount() != 1;
    if (!needsCopy) {
        if (!data.isDirect() && alloc.isDirectBufferPooled()) {
            needsCopy = true;
        }
    }
    ByteBuffer nioData;
    if (!needsCopy) {
        nioData = data.nioBuffer();
    } else {
        data = alloc.directBuffer(dataLen).writeBytes(data);
        nioData = data.nioBuffer();
    }

    final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
    mi.payloadProtocolID(packet.protocolIdentifier());
    mi.streamNumber(packet.streamIdentifier());

    final int writtenBytes = javaChannel().send(nioData, mi);

    boolean done = writtenBytes > 0;
    if (needsCopy) {
        if (!done) {
            in.current(new SctpMessage(mi, data));
        } else {
            in.current(data);
        }
    }
    return done;
}
项目:netty-netty-5.0.0.Alpha1    文件:NioUdtMessageConnectorChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    // expects a message
    final UdtMessage message = (UdtMessage) msg;

    final ByteBuf byteBuf = message.content();

    final int messageSize = byteBuf.readableBytes();

    final long writtenBytes;
    if (byteBuf.nioBufferCount() == 1) {
        writtenBytes = javaChannel().write(byteBuf.nioBuffer());
    } else {
        writtenBytes = javaChannel().write(byteBuf.nioBuffers());
    }

    // did not write the message
    if (writtenBytes <= 0 && messageSize > 0) {
        return false;
    }

    // wrote message completely
    if (writtenBytes != messageSize) {
        throw new Error(
                "Provider error: failed to write message. Provider library should be upgraded.");
    }

    return true;
}
项目:netty-netty-5.0.0.Alpha1    文件:EmbeddedChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current(false);
        if (msg == null) {
            break;
        }

        ReferenceCountUtil.retain(msg);
        outboundMessages.add(msg);
        in.remove();
    }
}
项目:netty-netty-5.0.0.Alpha1    文件:AbstractNioMessageChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    final SelectionKey key = selectionKey();
    final int interestOps = key.interestOps();

    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
            }
            break;
        }

        boolean done = false;
        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
            if (doWriteMessage(msg, in)) {
                done = true;
                break;
            }
        }

        if (done) {
            in.remove();
        } else {
            // Did not write all messages.
            if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                key.interestOps(interestOps | SelectionKey.OP_WRITE);
            }
            break;
        }
    }
}
项目:netty-netty-5.0.0.Alpha1    文件:NioSctpChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    SctpMessage packet = (SctpMessage) msg;
    ByteBuf data = packet.content();
    int dataLen = data.readableBytes();
    if (dataLen == 0) {
        return true;
    }

    ByteBufAllocator alloc = alloc();
    boolean needsCopy = data.nioBufferCount() != 1;
    if (!needsCopy) {
        if (!data.isDirect() && alloc.isDirectBufferPooled()) {
            needsCopy = true;
        }
    }
    ByteBuffer nioData;
    if (!needsCopy) {
        nioData = data.nioBuffer();
    } else {
        data = alloc.directBuffer(dataLen).writeBytes(data);
        nioData = data.nioBuffer();
    }

    final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
    mi.payloadProtocolID(packet.protocolIdentifier());
    mi.streamNumber(packet.streamIdentifier());

    final int writtenBytes = javaChannel().send(nioData, mi);

    boolean done = writtenBytes > 0;
    if (needsCopy) {
        if (!done) {
            in.current(new SctpMessage(mi, data));
        } else {
            in.current(data);
        }
    }
    return done;
}
项目:reactor-netty    文件:ChannelOperationsHandler.java   
private boolean hasPendingWriteBytes() {
    // On close the outboundBuffer is made null. After that point
    // adding messages and flushes to outboundBuffer is not allowed.
    ChannelOutboundBuffer outBuffer = this.unsafe.outboundBuffer();
    return outBuffer != null && outBuffer.totalPendingWriteBytes() > 0;
}
项目:AnnihilationPro    文件:NullChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer arg0) throws Exception {
}
项目:AnnihilationPro    文件:NullChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer arg0) throws Exception {
}
项目:netty4.0.27Learn    文件:NioUdtAcceptorChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    throw new UnsupportedOperationException();
}
项目:netty4.0.27Learn    文件:AbstractTrafficShapingHandler.java   
void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
    ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
    if (cob != null) {
        cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
    }
}
项目:netty4.0.27Learn    文件:OioDatagramChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        final Object o = in.current();
        if (o == null) {
            break;
        }

        final ByteBuf data;
        final SocketAddress remoteAddress;
        if (o instanceof AddressedEnvelope) {
            @SuppressWarnings("unchecked")
            AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
            remoteAddress = envelope.recipient();
            data = envelope.content();
        } else {
            data = (ByteBuf) o;
            remoteAddress = null;
        }

        final int length = data.readableBytes();
        if (remoteAddress != null) {
            tmpPacket.setSocketAddress(remoteAddress);
        }
        if (data.hasArray()) {
            tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
        } else {
            byte[] tmp = new byte[length];
            data.getBytes(data.readerIndex(), tmp);
            tmpPacket.setData(tmp);
        }
        try {
            socket.send(tmpPacket);
            in.remove();
        } catch (IOException e) {
            // Continue on write error as a DatagramChannel can write to multiple remote peers
            //
            // See https://github.com/netty/netty/issues/2665
            in.remove(e);
        }
    }
}
项目:netty4.0.27Learn    文件:OioServerSocketChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    throw new UnsupportedOperationException();
}
项目:netty4.0.27Learn    文件:NioServerSocketChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    throw new UnsupportedOperationException();
}
项目:netty4.0.27Learn    文件:LocalChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    if (state < 2) {
        throw new NotYetConnectedException();
    }
    if (state > 2) {
        throw new ClosedChannelException();
    }

    final LocalChannel peer = this.peer;
    final ChannelPipeline peerPipeline = peer.pipeline();
    final EventLoop peerLoop = peer.eventLoop();

    if (peerLoop == eventLoop()) {
        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                break;
            }
            peer.inboundBuffer.add(msg);
            ReferenceCountUtil.retain(msg);
            in.remove();
        }
        finishPeerRead(peer, peerPipeline);
    } else {
        // Use a copy because the original msgs will be recycled by AbstractChannel.
        final Object[] msgsCopy = new Object[in.size()];
        for (int i = 0; i < msgsCopy.length; i ++) {
            msgsCopy[i] = ReferenceCountUtil.retain(in.current());
            in.remove();
        }

        peerLoop.execute(new Runnable() {
            @Override
            public void run() {
                Collections.addAll(peer.inboundBuffer, msgsCopy);
                finishPeerRead(peer, peerPipeline);
            }
        });
    }
}
项目:netty4.0.27Learn    文件:AbstractNioMessageChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    final SelectionKey key = selectionKey();
    final int interestOps = key.interestOps();

    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
            }
            break;
        }
        try {
            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                if (doWriteMessage(msg, in)) {
                    done = true;
                    break;
                }
            }

            if (done) {
                in.remove();
            } else {
                // Did not write all messages.
                if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                    key.interestOps(interestOps | SelectionKey.OP_WRITE);
                }
                break;
            }
        } catch (IOException e) {
            if (continueOnWriteError()) {
                in.remove(e);
            } else {
                throw e;
            }
        }
    }
}
项目:netty4.0.27Learn    文件:OioSctpChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    if (!writeSelector.isOpen()) {
        return;
    }
    final int size = in.size();
    final int selectedKeys = writeSelector.select(SO_TIMEOUT);
    if (selectedKeys > 0) {
        final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
        if (writableKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
        int written = 0;
        for (;;) {
            if (written == size) {
                // all written
                return;
            }
            writableKeysIt.next();
            writableKeysIt.remove();

            SctpMessage packet = (SctpMessage) in.current();
            if (packet == null) {
                return;
            }

            ByteBuf data = packet.content();
            int dataLen = data.readableBytes();
            ByteBuffer nioData;

            if (data.nioBufferCount() != -1) {
                nioData = data.nioBuffer();
            } else {
                nioData = ByteBuffer.allocate(dataLen);
                data.getBytes(data.readerIndex(), nioData);
                nioData.flip();
            }

            final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
            mi.payloadProtocolID(packet.protocolIdentifier());
            mi.streamNumber(packet.streamIdentifier());

            ch.send(nioData, mi);
            written ++;
            in.remove();

            if (!writableKeysIt.hasNext()) {
                return;
            }
        }
    }
}
项目:netty4.0.27Learn    文件:OioSctpServerChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    throw new UnsupportedOperationException();
}
项目:netty4.0.27Learn    文件:NioSctpServerChannel.java   
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    throw new UnsupportedOperationException();
}
项目:netty4.0.27Learn    文件:EpollDatagramChannel.java   
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            clearFlag(Native.EPOLLOUT);
            break;
        }

        try {
            // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
            if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
                NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
                int cnt = array.count();

                if (cnt >= 1) {
                    // Try to use gathering writes via sendmmsg(...) syscall.
                    int offset = 0;
                    NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();

                    while (cnt > 0) {
                        int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
                        if (send == 0) {
                            // Did not write all messages.
                            setFlag(Native.EPOLLOUT);
                            return;
                        }
                        for (int i = 0; i < send; i++) {
                            in.remove();
                        }
                        cnt -= send;
                        offset += send;
                    }
                    continue;
                }
            }
            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                if (doWriteMessage(msg)) {
                    done = true;
                    break;
                }
            }

            if (done) {
                in.remove();
            } else {
                // Did not write all messages.
                setFlag(Native.EPOLLOUT);
                break;
            }
        } catch (IOException e) {
            // Continue on write error as a DatagramChannel can write to multiple remote peers
            //
            // See https://github.com/netty/netty/issues/2665
            in.remove(e);
        }
    }
}
项目:netty4.0.27Learn    文件:AbstractEpollStreamChannel.java   
private boolean writeBytesMultiple(
        ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException {

    long expectedWrittenBytes = array.size();
    final long initialExpectedWrittenBytes = expectedWrittenBytes;

    int cnt = array.count();

    assert expectedWrittenBytes != 0;
    assert cnt != 0;

    boolean done = false;
    int offset = 0;
    int end = offset + cnt;
    for (int i = writeSpinCount - 1; i >= 0; i--) {
        long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
        if (localWrittenBytes == 0) {
            break;
        }
        expectedWrittenBytes -= localWrittenBytes;

        if (expectedWrittenBytes == 0) {
            // Written everything, just break out here (fast-path)
            done = true;
            break;
        }

        do {
            long bytes = array.processWritten(offset, localWrittenBytes);
            if (bytes == -1) {
                // incomplete write
                break;
            } else {
                offset++;
                cnt--;
                localWrittenBytes -= bytes;
            }
        } while (offset < end && localWrittenBytes > 0);
    }
    if (!done) {
        setFlag(Native.EPOLLOUT);
    }
    in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
    return done;
}