Java 类io.netty.channel.pool.ChannelPool 实例源码

项目:aws-sdk-java-v2    文件:NettyNioAsyncHttpClient.java   
private ChannelPoolMap<URI, ChannelPool> createChannelPoolMap() {
    return new SdkChannelPoolMap<URI, ChannelPool>() {
        @Override
        protected ChannelPool newPool(URI key) {
            Bootstrap bootstrap =
                    new Bootstrap()
                            .group(group)
                            .channel(resolveSocketChannelClass(group))
                            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectionTimeout())
                            .option(ChannelOption.TCP_NODELAY, true)
                            .remoteAddress(key.getHost(), key.getPort());
            SslContext sslContext = sslContext(key.getScheme());
            return new FixedChannelPool(bootstrap,
                                        // TODO expose better options for this
                                        new ChannelPipelineInitializer(sslContext),
                                        ChannelHealthChecker.ACTIVE,
                                        FixedChannelPool.AcquireTimeoutAction.FAIL,
                                        configuration.connectionAcquisitionTimeout(),
                                        configuration.maxConnectionsPerEndpoint(),
                                        10_000);
        }
    };
}
项目:riposte    文件:StreamingAsyncHttpClient.java   
protected static void releaseChannelBackToPoolIfCallIsActive(Channel ch, ChannelPool pool,
                                                             ObjectHolder<Boolean> callActiveHolder,
                                                             String contextReason,
                                                             Deque<Span> distributedTracingStack,
                                                             Map<String, String> distributedTracingMdcInfo) {
    if (callActiveHolder.heldObject) {
        if (logger.isDebugEnabled()) {
            runnableWithTracingAndMdc(
                () -> logger.debug(
                    "Marking call as inactive and releasing channel back to pool. "
                    + "channel_release_reason=\"{}\"", contextReason
                ),
                distributedTracingStack, distributedTracingMdcInfo
            ).run();
        }

        callActiveHolder.heldObject = false;
        pool.release(ch);
    }
}
项目:async-gamequery-lib    文件:NettyPooledTransport.java   
/**
 * <p>Acquires a {@link Channel} from the {@link ChannelPool}</p>
 *
 * @param message
 *         An {@link AbstractRequest} that will be used as the lookup reference for the {@link
 *         io.netty.channel.pool.ChannelPoolMap} key
 *
 * @return A {@link CompletableFuture} containing the acquired {@link Channel}
 */
@Override
public CompletableFuture<Channel> getChannel(M message) {
    final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
    //Retrieve our channel pool based on the message
    final ChannelPool pool = poolMap.get(message);

    log.debug("Acquiring channel from pool '{}' for message : {}", pool, message);

    //Acquire a channel from the pool and listen for completion
    pool.acquire().addListener((Future<Channel> future) -> {
        if (future.isSuccess()) {
            log.debug("Successfully acquired Channel from pool");
            Channel channel = future.get();
            channel.attr(ChannelAttributes.CHANNEL_POOL).set(pool);
            channelFuture.complete(channel);
        } else {
            log.debug("Failed to acquire Channel from Pool");
            channelFuture.completeExceptionally(new ConnectException(future.cause()));
        }
    });
    return channelFuture;
}
项目:reactor-netty    文件:ContextHandler.java   
/**
 * Create a new client context with optional pool support
 *
 * @param sink
 * @param options
 * @param loggingHandler
 * @param secure
 * @param providedAddress
 * @param channelOpFactory
 * @param pool
 * @param <CHANNEL>
 *
 * @return a new {@link ContextHandler} for clients
 */
public static <CHANNEL extends Channel> ContextHandler<CHANNEL> newClientContext(
        MonoSink<NettyContext> sink,
        ClientOptions options,
        LoggingHandler loggingHandler,
        boolean secure,
        SocketAddress providedAddress,
        ChannelPool pool, ChannelOperations.OnNew<CHANNEL> channelOpFactory) {
    if (pool != null) {
        return new PooledClientContextHandler<>(channelOpFactory,
                options,
                sink,
                loggingHandler,
                secure,
                providedAddress,
                pool);
    }
    return new ClientContextHandler<>(channelOpFactory,
            options,
            sink,
            loggingHandler,
            secure,
            providedAddress);
}
项目:reactor-netty    文件:TcpClient.java   
/**
 * Create a {@link ContextHandler} for {@link Bootstrap#handler()}
 *
 * @param handler user provided in/out handler
 * @param sink user provided bind handler
 * @param secure if operation should be secured
 * @param pool if channel pool
 * @param onSetup if operation has local setup callback
 *
 * @return a new {@link ContextHandler}
 */
protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
        MonoSink<NettyContext> sink,
        boolean secure,
        SocketAddress providedAddress,
        ChannelPool pool,
        Consumer<? super Channel> onSetup) {
    return ContextHandler.newClientContext(sink,
            options,
            loggingHandler,
            secure,
            providedAddress,
            pool,
            handler == null ? EMPTY :
                    (ch, c, msg) -> ChannelOperations.bind(ch, handler, c));
}
项目:reactor-netty    文件:HttpClient.java   
@Override
protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
        MonoSink<NettyContext> sink,
        boolean secure,
        SocketAddress providedAddress,
        ChannelPool pool,
        Consumer<? super Channel> onSetup) {
    return ContextHandler.<SocketChannel>newClientContext(sink,
            options,
            loggingHandler,
            secure,
            providedAddress,
            pool,
            handler != null ? (ch, c, msg) -> {
                if(onSetup != null){
                    onSetup.accept(ch);
                }
                return HttpClientOperations.bindHttp(ch, handler, c);
            } : EMPTY).onPipeline(this);
}
项目:aws-sdk-java-v2    文件:RequestContext.java   
public RequestContext(ChannelPool channelPool,
                      SdkHttpRequest sdkRequest,
                      SdkHttpRequestProvider requestProvider,
                      HttpRequest nettyRequest,
                      SdkHttpResponseHandler handler,
                      NettyConfiguration configuration) {
    this.channelPool = channelPool;
    this.sdkRequest = sdkRequest;
    this.requestProvider = requestProvider;
    this.nettyRequest = nettyRequest;
    this.handler = handler;
    this.configuration = configuration;
}
项目:riposte    文件:StreamingAsyncHttpClient.java   
StreamingChannel(Channel channel,
                 ChannelPool pool,
                 ObjectHolder<Boolean> callActiveHolder,
                 ObjectHolder<Boolean> downstreamLastChunkSentHolder,
                 Deque<Span> distributedTracingSpanStack,
                 Map<String, String> distributedTracingMdcInfo) {
    this.channel = channel;
    this.pool = pool;
    this.callActiveHolder = callActiveHolder;
    this.downstreamLastChunkSentHolder = downstreamLastChunkSentHolder;
    this.distributedTracingSpanStack = distributedTracingSpanStack;
    this.distributedTracingMdcInfo = distributedTracingMdcInfo;
}
项目:riposte    文件:StreamingAsyncHttpClientTest.java   
@Before
public void beforeMethod() {
    channelMock = mock(Channel.class);
    channelPoolMock = mock(ChannelPool.class);
    eventLoopMock = mock(EventLoop.class);

    contentChunkMock = mock(HttpContent.class);

    callActiveHolder = new ObjectHolder<>();
    callActiveHolder.heldObject = true;

    downstreamLastChunkSentHolder = new ObjectHolder<>();
    downstreamLastChunkSentHolder.heldObject = false;

    streamingChannelSpy = spy(new StreamingChannel(channelMock, channelPoolMock, callActiveHolder,
                                                   downstreamLastChunkSentHolder, null, null));

    writeAndFlushChannelFutureMock = mock(ChannelFuture.class);

    doReturn(eventLoopMock).when(channelMock).eventLoop();

    doReturn(writeAndFlushChannelFutureMock).when(channelMock).writeAndFlush(contentChunkMock);

    channelIsBrokenAttrMock = mock(Attribute.class);
    doReturn(channelIsBrokenAttrMock).when(channelMock).attr(CHANNEL_IS_BROKEN_ATTR);

    streamChunkChannelPromiseMock = mock(ChannelPromise.class);
    doReturn(streamChunkChannelPromiseMock).when(channelMock).newPromise();

    failedFutureMock = mock(ChannelFuture.class);
    doReturn(failedFutureMock).when(channelMock).newFailedFuture(any(Throwable.class));
}
项目:reactor-netty    文件:PooledClientContextHandler.java   
PooledClientContextHandler(ChannelOperations.OnNew<CHANNEL> channelOpFactory,
        ClientOptions options,
        MonoSink<NettyContext> sink,
        LoggingHandler loggingHandler,
        boolean secure,
        SocketAddress providedAddress,
        ChannelPool pool) {
    super(channelOpFactory, options, sink, loggingHandler, providedAddress);
    this.clientOptions = options;
    this.secure = secure;
    this.pool = pool;
    this.onReleaseEmitter = DirectProcessor.create();
}
项目:reactor-netty    文件:DefaultPoolResources.java   
@Override
public ChannelPool selectOrCreate(SocketAddress remote,
        Supplier<? extends Bootstrap> bootstrap,
        Consumer<? super Channel> onChannelCreate,
        EventLoopGroup group) {
    SocketAddress address = remote;
    for (; ; ) {
        Pool pool = channelPools.get(remote);
        if (pool != null) {
            return pool;
        }
        Bootstrap b = bootstrap.get();
        if (remote != null) {
            b = b.remoteAddress(remote);
        }
        else {
            address = b.config()
                      .remoteAddress();
        }
        if (log.isDebugEnabled()) {
            log.debug("New {} client pool for {}", name, address);
        }
        pool = new Pool(b, provider, onChannelCreate, group);
        if (channelPools.putIfAbsent(address, pool) == null) {
            return pool;
        }
        pool.close();
    }
}
项目:reactor-netty    文件:TcpResources.java   
@Override
public ChannelPool selectOrCreate(SocketAddress address,
        Supplier<? extends Bootstrap> bootstrap,
        Consumer<? super Channel> onChannelCreate,
        EventLoopGroup group) {
    return defaultPools.selectOrCreate(address, bootstrap, onChannelCreate, group);
}
项目:reactor-netty    文件:TcpClient.java   
/**
 * @param handler
 * @param address
 * @param secure
 * @param onSetup
 *
 * @return a new Mono to connect on subscribe
 */
protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
        InetSocketAddress address,
        boolean secure,
        Consumer<? super Channel> onSetup) {

    final BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>
            targetHandler =
            null == handler ? ChannelOperations.noopHandler() : handler;

    return Mono.create(sink -> {
        SocketAddress remote = address != null ? address : options.getAddress();

        ChannelPool pool = null;

        PoolResources poolResources = options.getPoolResources();
        if (poolResources != null) {
            pool = poolResources.selectOrCreate(remote, options,
                    doHandler(null, sink, secure, remote, null, null),
                    options.getLoopResources().onClient(options.preferNative()));
        }

        ContextHandler<SocketChannel> contextHandler =
                doHandler(targetHandler, sink, secure, remote, pool, onSetup);
        sink.onCancel(contextHandler);

        if (pool == null) {
            Bootstrap b = options.get();
            b.remoteAddress(remote);
            b.handler(contextHandler);
            contextHandler.setFuture(b.connect());
        }
        else {
            contextHandler.setFuture(pool.acquire());
        }
    });
}
项目:RxS3    文件:RequestSender.java   
RequestSender(String s3Location, Request requestData, SubscriptionCompletionHandler completionHandler, HandlerDemultiplexer demultiplexer, ChannelPool channelPool) {
    this.s3Location = s3Location;
    this.requestData = requestData;
    this.completionHandler = completionHandler;
    this.demultiplexer = demultiplexer;
    this.channelPool = channelPool;
}
项目:aws-sdk-java-v2    文件:RequestContext.java   
ChannelPool channelPool() {
    return channelPool;
}
项目:riposte    文件:StreamingAsyncHttpClient.java   
protected ChannelPool getPooledChannelFuture(String downstreamHost, int downstreamPort) {
    return getPoolMap().get(
        resolveHostnameToInetSocketAddressWithMultiIpSupport(downstreamHost, downstreamPort)
    );
}
项目:riposte    文件:StreamingAsyncHttpClientTest.java   
private void verifyChannelReleasedBackToPool(ObjectHolder<Boolean> callActiveHolder,
                                             ChannelPool theChannelPoolMock,
                                             Channel theChannelMock) {
    assertThat(callActiveHolder.heldObject).isFalse();
    verify(theChannelPoolMock).release(theChannelMock);
}
项目:HeliosStreams    文件:HubManager.java   
/**
 * Acquires a channel pool
 * @return a channel pool
 */
protected ChannelPool channelPool() {
    final InetSocketAddress sockAddr = endpointSocketAddress();
    return poolMap.get(sockAddr);
}
项目:async-gamequery-lib    文件:NettyPooledUdpTransport.java   
@Override
public ChannelPool createChannelPool(Class<?> key) {
    log.debug("Creating Channel Pool For : {}", key.getSimpleName());
    return new ConnectionlessChannelPool(getBootstrap(), channelPoolHandler);
}
项目:async-gamequery-lib    文件:MessageChannelPoolMap.java   
@Override
protected ChannelPool newPool(K key) {
    return poolFactory.apply(key);
}
项目:async-gamequery-lib    文件:MessageChannelPoolMap.java   
@Override
public Iterator<Map.Entry<K, ChannelPool>> iterator() {
    return internalPoolMap.iterator();
}
项目:async-gamequery-lib    文件:NettyPooledTcpTransport.java   
@Override
public ChannelPool createChannelPool(InetSocketAddress key) {
    return new SimpleChannelPool(getBootstrap().remoteAddress(key), channelPoolHandler);
}
项目:reactor-netty    文件:DefaultPoolResources.java   
ChannelPool newPool(Bootstrap b,
ChannelPoolHandler handler,
ChannelHealthChecker checker);
项目:reactor-netty    文件:TcpResourcesTest.java   
@Before
public void before() {
    loopDisposed = new AtomicBoolean();
    poolDisposed = new AtomicBoolean();

    loopResources = new LoopResources() {
        @Override
        public EventLoopGroup onServer(boolean useNative) {
            return null;
        }

        @Override
        public Mono<Void> disposeLater() {
            return Mono.<Void>empty().doOnSuccess(c -> loopDisposed.set(true));
        }

        @Override
        public boolean isDisposed() {
            return loopDisposed.get();
        }
    };

    poolResources = new PoolResources() {
        @Override
        public ChannelPool selectOrCreate(SocketAddress address,
                Supplier<? extends Bootstrap> bootstrap,
                Consumer<? super Channel> onChannelCreate, EventLoopGroup group) {
            return null;
        }

        public Mono<Void> disposeLater() {
            return Mono.<Void>empty().doOnSuccess(c -> poolDisposed.set(true));
        }

        @Override
        public boolean isDisposed() {
            return poolDisposed.get();
        }
    };

    tcpResources = new TcpResources(loopResources, poolResources);
}
项目:reactor-netty    文件:HttpResourcesTest.java   
@Before
public void before() {
    loopDisposed = new AtomicBoolean();
    poolDisposed = new AtomicBoolean();

    loopResources = new LoopResources() {
        @Override
        public EventLoopGroup onServer(boolean useNative) {
            return null;
        }

        @Override
        public Mono<Void> disposeLater() {
            return Mono.<Void>empty().doOnSuccess(c -> loopDisposed.set(true));
        }

        @Override
        public boolean isDisposed() {
            return loopDisposed.get();
        }
    };

    poolResources = new PoolResources() {
        @Override
        public ChannelPool selectOrCreate(SocketAddress address,
                Supplier<? extends Bootstrap> bootstrap,
                Consumer<? super Channel> onChannelCreate, EventLoopGroup group) {
            return null;
        }

        public Mono<Void> disposeLater() {
            return Mono.<Void>empty().doOnSuccess(c -> poolDisposed.set(true));
        }

        @Override
        public boolean isDisposed() {
            return poolDisposed.get();
        }
    };

    testResources = new HttpResources(loopResources, poolResources);
}
项目:mintds-java    文件:MintDsClientHandler.java   
public MintDsClientHandler(ChannelPool channelPool) {
    this.channelPool = channelPool;
}
项目:RxS3    文件:HttpClientHandler.java   
HttpClientHandler(ChannelPool channelPool, SubscriptionCompletionHandler completionHandler) {
    this.channelPool = channelPool;
    this.completionHandler = completionHandler;
}
项目:HeliosStreams    文件:RequestCompletion.java   
/**
 * Creates a new RequestCompletion
 * @param queryCount The number of queries submitted
 * @param timeoutMs The timeout in ms.
 * @param pool the pool to return the channel to
 */
public RequestCompletion(final long timeoutMs, final ChannelPool pool) {
    this.timeoutMs = timeoutMs;
    latch = new CountDownLatch(1);
    this.pool = pool;
}
项目:async-gamequery-lib    文件:MessageChannelPoolMap.java   
/**
 * <p>Accepts two functions that will be used internally for processing the key and creation of the {@link
 * ChannelPool} instance</p>
 *
 * @param keyResolver
 *         A function that accepts an {@link AbstractMessage} as the input and returns a type of a key (as
 *         specified).
 *         This will be used to resolve keys based on the {@link AbstractMessage} argument.
 * @param poolFactory
 *         A factory function that returns a {@link ChannelPool} implementation based on the key provided.
 */
public MessageChannelPoolMap(Function<M, K> keyResolver, Function<K, ChannelPool> poolFactory) {
    this.keyResolver = keyResolver;
    this.poolFactory = poolFactory;
}
项目:async-gamequery-lib    文件:MessageChannelPoolMap.java   
/**
 * Retrieve a {@link ChannelPool} instance using {@link AbstractMessage} as the search key.
 *
 * @param message
 *         An {@link AbstractMessage} to be used to derive the actual key from
 *
 * @return A {@link ChannelPool} instance
 */
@Override
public ChannelPool get(M message) {
    return internalPoolMap.get(keyResolver.apply(message));
}
项目:async-gamequery-lib    文件:NettyPooledTransport.java   
/**
 * A factory method that creates a {@link ChannelPool} based on the key provided.
 *
 * @param key
 *         The key to be used for {@link ChannelPool} creation
 *
 * @return A {@link ChannelPool} instance
 */
abstract public ChannelPool createChannelPool(K key);
项目:reactor-netty    文件:PoolResources.java   
/**
 * Return an existing or new {@link ChannelPool}. The implementation will take care
 * of pulling {@link Bootstrap} lazily when a {@link ChannelPool} creation is actually
 * needed.
 *
 * @param address the remote address to resolve for existing or
 * new {@link ChannelPool}
 * @param bootstrap the {@link Bootstrap} supplier if a {@link ChannelPool} must be
 * created
 * @param onChannelCreate callback only when new connection is made
 * @return an existing or new {@link ChannelPool}
 */
ChannelPool selectOrCreate(SocketAddress address,
        Supplier<? extends Bootstrap> bootstrap,
        Consumer<? super Channel> onChannelCreate,
        EventLoopGroup group);