/** * This code is more complicated than you would think because we might require multiple * transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting. * * The contract is that the caller will ensure position is properly set to the total number * of bytes transferred so far (i.e. value returned by transfered()). */ @Override public long transferTo(final WritableByteChannel target, final long position) throws IOException { Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position."); // Bytes written for header in this call. long writtenHeader = 0; if (header.readableBytes() > 0) { writtenHeader = copyByteBuf(header, target); totalBytesTransferred += writtenHeader; if (header.readableBytes() > 0) { return writtenHeader; } } // Bytes written for body in this call. long writtenBody = 0; if (body instanceof FileRegion) { writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength); } else if (body instanceof ByteBuf) { writtenBody = copyByteBuf((ByteBuf) body, target); } totalBytesTransferred += writtenBody; return writtenHeader + writtenBody; }
public void checkProducerTransactionState( final Channel channel, final CheckTransactionStateRequestHeader requestHeader, final SelectMappedBufferResult selectMappedBufferResult) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.markOnewayRPC(); try { FileRegion fileRegion = new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()), selectMappedBufferResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { log.error("invokeProducer failed,", future.cause()); } } }); } catch (Throwable e) { log.error("invokeProducer exception", e); selectMappedBufferResult.release(); } }
@Override protected void messageReceived(ChannelHandlerContext cxt, String msg) throws Exception { File file = new File(msg); if(file.exists()) { if(!file.isFile()){ cxt.writeAndFlush("No file " + file + CR); } cxt.writeAndFlush("file " + file.length() + CR); RandomAccessFile randomAccessFile = new RandomAccessFile(msg, "r"); FileRegion fileRegion = new DefaultFileRegion(randomAccessFile.getChannel(), 0, randomAccessFile.length()); cxt.write(fileRegion); cxt.writeAndFlush(CR); randomAccessFile.close(); }else{ cxt.writeAndFlush("File not found: " + file + CR); } }
/** * {@inheritDoc} * @see io.netty.channel.ChannelOutboundInvoker#write(java.lang.Object) */ @Override public ChannelFuture write(Object message) { if(message!=null) { if(message instanceof FileRegion) { try { Pipe pipe = Pipe.open(); FileRegion fr = (FileRegion)message; long bytesToRead = fr.count(); fr.transferTo(pipe.sink(), 0L); byte[] content = new byte[(int)bytesToRead]; pipe.source().read(ByteBuffer.wrap(content)); channelWrites.add(content); } catch (Exception ex) { log.error("Failed to read content from pipe", ex); channelWrites.add(ex); } } else { channelWrites.add(message); } log.info("Received Channel Write [{}] type:[{}]", message, message.getClass().getName()); } return null; }
/** * This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to * {@link ByteBuf}. * @throws IOException if there is an error. */ @Test public void testEncode() throws IOException { FileRegionEncoder fileRegionEncoder = new FileRegionEncoder(); EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder); File file = File.createTempFile(UUID.randomUUID().toString(), ".data"); file.deleteOnExit(); Random random = new Random(System.currentTimeMillis()); int dataLength = 1 << 10; byte[] data = new byte[dataLength]; random.nextBytes(data); write(file, data); FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength); Assert.assertEquals(0, fileRegion.transfered()); Assert.assertEquals(dataLength, fileRegion.count()); Assert.assertTrue(channel.writeOutbound(fileRegion)); ByteBuf out = (ByteBuf) channel.readOutbound(); byte[] arr = new byte[out.readableBytes()]; out.getBytes(0, arr); Assert.assertArrayEquals("Data should be identical", data, arr); }
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { File file = new File(msg); if (file.exists()) { if (!file.isFile()) { ctx.writeAndFlush("Not a file : " + file + CR); return; } ctx.write(file + " " + file.length() + CR); RandomAccessFile randomAccessFile = new RandomAccessFile(msg, "r"); FileRegion region = new DefaultFileRegion( randomAccessFile.getChannel(), 0, randomAccessFile.length()); ctx.write(region); ctx.writeAndFlush(CR); randomAccessFile.close(); } else { ctx.writeAndFlush("File not found: " + file + CR); } }
@Override protected void doWriteFileRegion(FileRegion region) throws Exception { OutputStream os = this.os; if (os == null) { throw new NotYetConnectedException(); } if (outChannel == null) { outChannel = Channels.newChannel(os); } long written = 0; for (;;) { long localWritten = region.transferTo(outChannel, written); if (localWritten == -1) { checkEOF(region); return; } written += localWritten; if (written >= region.count()) { return; } } }
@Override protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
@Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { File file = new File(msg); if (file.exists()) { if (!file.isFile()) { ctx.writeAndFlush("not a file :" + file + CR); return; } ctx.write(file + " " + file.length() + CR); RandomAccessFile raf = new RandomAccessFile(file, "r"); FileRegion fileRegion = new DefaultFileRegion(raf.getChannel(), 0, raf.length()); ctx.write(fileRegion); ctx.writeAndFlush(CR); raf.close(); } else { ctx.writeAndFlush("file not found: " + file + CR); } }
@Override public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { File file = new File(msg); if (file.exists()) { if (!file.isFile()) { ctx.writeAndFlush("Not a file: " + file + '\n'); return; } ctx.write(file + " " + file.length() + '\n'); FileInputStream fis = new FileInputStream(file); FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length()); ctx.write(region); ctx.writeAndFlush("\n"); fis.close(); } else { ctx.writeAndFlush("File not found: " + file + '\n'); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(); if (msg == null) { // nothing left to write break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; while (buf.isReadable()) { doWriteBytes(buf); } in.remove(); } else if (msg instanceof FileRegion) { doWriteFileRegion((FileRegion) msg); in.remove(); } else { in.remove(new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg))); } } }
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { File file = new File(msg); if (file.exists()) { if (!file.isFile()) { ctx.writeAndFlush("Not a file: " + file + '\n'); return; } ctx.write(file + " " + file.length() + '\n'); FileInputStream fis = new FileInputStream(file); FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length()); ctx.write(region); ctx.writeAndFlush("\n"); fis.close(); } else { ctx.writeAndFlush("File not found: " + file + '\n'); } }
@Override public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { File file = new File(msg); if (file.exists()) { if (!file.isFile()) { ctx.writeAndFlush("Not a file: " + file + '\n'); return; } ctx.write(file + " " + file.length() + '\n'); FileInputStream fis = new FileInputStream(file); FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length()); ctx.write(region); ctx.writeAndFlush("\n"); fis.close(); } else { ctx.writeAndFlush("File not found: " + file + '\n'); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(false); if (msg == null) { // nothing left to write break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; while (buf.isReadable()) { doWriteBytes(buf); } in.remove(); } else if (msg instanceof FileRegion) { doWriteFileRegion((FileRegion) msg); in.remove(); } else { in.remove(new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg))); } } }
@Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (msg instanceof MemcacheMessage) { if (expectingMoreContent) { throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)); } @SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" }) final M m = (M) msg; out.add(encodeMessage(ctx, m)); } if (msg instanceof MemcacheContent || msg instanceof ByteBuf || msg instanceof FileRegion) { int contentLength = contentLength(msg); if (contentLength > 0) { out.add(encodeAndRetain(msg)); } else { out.add(Unpooled.EMPTY_BUFFER); } expectingMoreContent = !(msg instanceof LastMemcacheContent); } }
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ViewMessageRequestHeader requestHeader = (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); response.setOpaque(request.getOpaque()); final SelectMappedBufferResult selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); if (selectMappedBufferResult != null) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); try { FileRegion fileRegion = new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), selectMappedBufferResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { log.error("Transfer one message from page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); selectMappedBufferResult.release(); } return null; } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); } return response; }
/** * Broker主动回查Producer事务状态,Oneway */ public void checkProducerTransactionState( final Channel channel, final CheckTransactionStateRequestHeader requestHeader, final SelectMappedBufferResult selectMappedBufferResult) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.markOnewayRPC(); try { FileRegion fileRegion = new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()), selectMappedBufferResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { log.error("invokeProducer failed,", future.cause()); } } }); } catch (Throwable e) { log.error("invokeProducer exception", e); selectMappedBufferResult.release(); } }
public void checkProducerTransactionState(// final Channel channel,// final CheckTransactionStateRequestHeader requestHeader,// final SelectMapedBufferResult selectMapedBufferResult// ) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.markOnewayRPC(); try { FileRegion fileRegion = new OneMessageTransfer(request.encodeHeader(selectMapedBufferResult.getSize()), selectMapedBufferResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMapedBufferResult.release(); if (!future.isSuccess()) { log.error("invokeProducer failed,", future.cause()); } } }); } catch (Throwable e) { log.error("invokeProducer exception", e); selectMapedBufferResult.release(); } }
/** * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that * can be handled by this encoder. * * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link * io.netty.handler.codec.MessageToByteEncoder} belongs to * @param msg the message to encode * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written * @throws Exception is thrown if an error occurs */ @Override protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception { WritableByteChannel writableByteChannel = new WritableByteChannel() { @Override public int write(ByteBuffer src) throws IOException { out.writeBytes(src); return out.capacity(); } @Override public boolean isOpen() { return true; } @Override public void close() throws IOException { } }; long toTransfer = msg.count(); while (true) { long transferred = msg.transfered(); if (toTransfer - transferred <= 0) { break; } msg.transferTo(writableByteChannel, transferred); } }
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof ByteBuf) { writtenBytes.addAndGet(((ByteBuf) msg).readableBytes()); } else if (msg instanceof FileRegion) { writtenBytes.addAndGet(((FileRegion) msg).count()); } doWrite(ctx, msg, promise); super.write(ctx, msg, promise); }
private static Object encodeAndRetain(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).retain(); } if (msg instanceof HttpContent) { return ((HttpContent) msg).content().retain(); } if (msg instanceof FileRegion) { return ((FileRegion) msg).retain(); } throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)); }
private static long contentLength(Object msg) { if (msg instanceof HttpContent) { return ((HttpContent) msg).content().readableBytes(); } if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof FileRegion) { return ((FileRegion) msg).count(); } throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)); }
@Test public void testLargeFileRegionChunked() throws Exception { EmbeddedChannel channel = new EmbeddedChannel(new HttpResponseEncoder()); HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED); assertTrue(channel.writeOutbound(response)); ByteBuf buffer = (ByteBuf) channel.readOutbound(); assertEquals("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n", buffer.toString(CharsetUtil.US_ASCII)); buffer.release(); assertTrue(channel.writeOutbound(FILE_REGION)); buffer = (ByteBuf) channel.readOutbound(); assertEquals("80000000\r\n", buffer.toString(CharsetUtil.US_ASCII)); buffer.release(); FileRegion region = (FileRegion) channel.readOutbound(); assertSame(FILE_REGION, region); region.release(); buffer = (ByteBuf) channel.readOutbound(); assertEquals("\r\n", buffer.toString(CharsetUtil.US_ASCII)); buffer.release(); assertTrue(channel.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT)); buffer = (ByteBuf) channel.readOutbound(); assertEquals("0\r\n\r\n", buffer.toString(CharsetUtil.US_ASCII)); buffer.release(); assertFalse(channel.finish()); }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(); if (msg == null) { // nothing left to write break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); while (readableBytes > 0) { doWriteBytes(buf); int newReadableBytes = buf.readableBytes(); in.progress(readableBytes - newReadableBytes); readableBytes = newReadableBytes; } in.remove(); } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; long transfered = region.transfered(); doWriteFileRegion(region); in.progress(region.transfered() - transfered); in.remove(); } else { in.remove(new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg))); } } }
@Override protected final Object filterOutboundMessage(Object msg) throws Exception { if (msg instanceof ByteBuf || msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
/** * Broker主动回查Producer事务状态,Oneway */ public void checkProducerTransactionState(// final Channel channel,// final CheckTransactionStateRequestHeader requestHeader,// final SelectMapedBufferResult selectMapedBufferResult// ) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.markOnewayRPC(); try { FileRegion fileRegion = new OneMessageTransfer(request.encodeHeader(selectMapedBufferResult.getSize()), selectMapedBufferResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMapedBufferResult.release(); if (!future.isSuccess()) { log.error("invokeProducer failed,", future.cause()); } } }); } catch (Throwable e) { log.error("invokeProducer exception", e); selectMapedBufferResult.release(); } }
private static int contentLength(Object msg) { if (msg instanceof HttpContent) { return ((HttpContent) msg).content().readableBytes(); } if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof FileRegion) { return (int) ((FileRegion) msg).count(); } throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)); }
void sendFile(File file) throws Exception { long length = file.length(); LOG.debug("Got request of sending file {} of length {}.", file, length); Message handshake = MessageBuilder.buildFileHeader(length); byte[] bytes = handshake.toByteArray(); // Sends HANDSHAKE first before transferring actual file data, the // HANDSHAKE will tell the peer's channel to prepare for the file // transferring. channel.writeAndFlush(Unpooled.wrappedBuffer(bytes)).sync(); ChannelHandler prepender = channel.pipeline().get("frameEncoder"); // Removes length prepender, we don't need this handler for file // transferring. channel.pipeline().remove(prepender); // Adds ChunkedWriteHandler for file transferring. ChannelHandler cwh = new ChunkedWriteHandler(); channel.pipeline().addLast(cwh); // Begins file transferring. RandomAccessFile raf = new RandomAccessFile(file, "r"); if (channel.pipeline().get(SslHandler.class) != null) { // Zero-Copy file transferring is not supported for ssl. channel.writeAndFlush(new ChunkedFile(raf, 0, length, 8912)); } else { // Use Zero-Copy file transferring in non-ssl mode. FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, length); channel.writeAndFlush(region); } // Restores pipeline to original state. channel.pipeline().remove(cwh); channel.pipeline().addLast("frameEncoder", prepender); }
/** * Determine the content length of the given object. * * @param msg the object to determine the length of. * @return the determined content length. */ private static int contentLength(Object msg) { if (msg instanceof MemcacheContent) { return ((MemcacheContent) msg).content().readableBytes(); } if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof FileRegion) { return (int) ((FileRegion) msg).count(); } throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)); }