@Override public void operationComplete(ChannelProgressiveFuture cf) { // The write operation failed. If the channel was cached, it means it got asynchronously closed. // Let's retry a second time. if (!abortOnThrowable(cf.cause(), cf.channel())) { future.touch(); /** * We need to make sure we aren't in the middle of an authorization * process before publishing events as we will re-publish again the * same event after the authorization, causing unpredictable * behavior. */ boolean startPublishing = !future.getInAuth().get() && !future.getInProxyAuth().get(); if (startPublishing && asyncHandler instanceof ProgressAsyncHandler) { ProgressAsyncHandler<?> progressAsyncHandler = (ProgressAsyncHandler<?>) asyncHandler; if (notifyHeaders) { progressAsyncHandler.onHeadersWritten(); } else { progressAsyncHandler.onContentWritten(); } } } }
@Override public void write(Channel channel, NettyResponseFuture<?> future) throws IOException { final InputStream is = inputStream; if (future.isStreamWasAlreadyConsumed()) { if (is.markSupported()) is.reset(); else { LOGGER.warn("Stream has already been consumed and cannot be reset"); return; } } else { future.setStreamWasAlreadyConsumed(true); } channel.write(new ChunkedStream(is), channel.newProgressivePromise()).addListener( new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) { public void operationComplete(ChannelProgressiveFuture cf) { closeSilently(is); super.operationComplete(cf); } }); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); }
@Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception { if (total < 0) { log.debug("Channel {} transfer progress: {}", future.channel(), progress); } else { log.debug("Channel {} transfer progress: {}, total {}", future.channel(), progress, total); } }
@Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { try { raf.close(); log.debug("Channel {} transfer complete", future.channel()); } catch (Exception e) { log.error("Close randomAccessFile with error", e); } }
@Override public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) { future.touch(); if (!notifyHeaders && asyncHandler instanceof ProgressAsyncHandler) { long lastLastProgress = lastProgress; lastProgress = progress; if (total < 0) total = expectedTotal; ProgressAsyncHandler.class.cast(asyncHandler).onContentWriteProgress(progress - lastLastProgress, progress, total); } }
@Override public void write(final Channel channel, NettyResponseFuture<?> future) throws IOException { Object msg; if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy()) { msg = new BodyFileRegion((RandomAccessBody) body); } else { msg = new BodyChunkedInput(body); BodyGenerator bg = future.getTargetRequest().getBodyGenerator(); if (bg instanceof FeedableBodyGenerator && !(bg instanceof ReactiveStreamsBodyGenerator)) { final ChunkedWriteHandler chunkedWriteHandler = channel.pipeline().get(ChunkedWriteHandler.class); FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() { @Override public void onContentAdded() { chunkedWriteHandler.resumeTransfer(); } @Override public void onError(Throwable t) {} }); } } ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise()); writeFuture.addListener(new ProgressListener(future.getAsyncHandler(), future, false, getContentLength()) { public void operationComplete(ChannelProgressiveFuture cf) { closeSilently(body); super.operationComplete(cf); } }); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); }
private void writeResponseWithFuture(ChannelHandlerContext ctx, DefaultHttpRequest request, HttpResponse response) { // TODO Auto-generated method stub ChannelFuture responseFuture; ChannelFuture lastresponseFuture; responseFuture = ctx.write(response,ctx.newProgressivePromise()); lastresponseFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); responseFuture.addListener(new ChannelProgressiveFutureListener() { @Override public void operationComplete(ChannelProgressiveFuture future) { System.err.println(future.channel() + " "+future.cause()+" "+future.isCancelled()+" "+future.isDone()+" "+future.isSuccess()+" "/*+future.sync()*/); } @Override public void operationProgressed(ChannelProgressiveFuture paramF, long paramLong1, long paramLong2) throws Exception { // TODO Auto-generated method stub } }); if (!HttpHeaders.isKeepAlive(request)) { lastresponseFuture.addListener(ChannelFutureListener.CLOSE); } }
@Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) { if (total < 0) { // total unknown log.debug("{} Transfer progress: {}", future.channel(), progress); } else { log.debug("{} Transfer progress: {}/{}", future.channel(), progress, total); } }
@Override public void operationComplete(ChannelProgressiveFuture future) { try { raf.close(); log.debug("{} Transfer complete.", future.channel()); } catch (Exception e) { log.error("RandomAccessFile close error", e); } }
/** * Called once the write is complete i.e. either all chunks that were needed to be written were written or there * was an error writing the chunks. * @param future the {@link ChannelProgressiveFuture} that is being listened on. */ @Override public void operationComplete(ChannelProgressiveFuture future) { if (future.isSuccess()) { logger.trace("Response sending complete on channel {}", ctx.channel()); completeRequest(request == null || !request.isKeepAlive()); } else { handleChannelWriteFailure(future.cause(), true); } }
/** * Uses {@code progress} to determine chunks whose callbacks need to be invoked. * @param future the {@link ChannelProgressiveFuture} that is being listened on. * @param progress the total number of bytes that have been written starting from the time writes were invoked via * {@link ChunkedWriteHandler}. * @param total the total number of bytes that need to be written i.e. the target number. This is not relevant to * {@link ChunkedWriteHandler} because there is no target number. All calls to this function except * the very last one will have a negative {@code total}. */ @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) { logger.trace("{} bytes of response written on channel {}", progress, ctx.channel()); while (chunksAwaitingCallback.peek() != null && progress >= chunksAwaitingCallback.peek().writeCompleteThreshold) { chunksAwaitingCallback.poll().resolveChunk(null); } }