private ChannelPoolMap<URI, ChannelPool> createChannelPoolMap() { return new SdkChannelPoolMap<URI, ChannelPool>() { @Override protected ChannelPool newPool(URI key) { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(resolveSocketChannelClass(group)) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectionTimeout()) .option(ChannelOption.TCP_NODELAY, true) .remoteAddress(key.getHost(), key.getPort()); SslContext sslContext = sslContext(key.getScheme()); return new FixedChannelPool(bootstrap, // TODO expose better options for this new ChannelPipelineInitializer(sslContext), ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, configuration.connectionAcquisitionTimeout(), configuration.maxConnectionsPerEndpoint(), 10_000); } }; }
protected static void releaseChannelBackToPoolIfCallIsActive(Channel ch, ChannelPool pool, ObjectHolder<Boolean> callActiveHolder, String contextReason, Deque<Span> distributedTracingStack, Map<String, String> distributedTracingMdcInfo) { if (callActiveHolder.heldObject) { if (logger.isDebugEnabled()) { runnableWithTracingAndMdc( () -> logger.debug( "Marking call as inactive and releasing channel back to pool. " + "channel_release_reason=\"{}\"", contextReason ), distributedTracingStack, distributedTracingMdcInfo ).run(); } callActiveHolder.heldObject = false; pool.release(ch); } }
/** * <p>Acquires a {@link Channel} from the {@link ChannelPool}</p> * * @param message * An {@link AbstractRequest} that will be used as the lookup reference for the {@link * io.netty.channel.pool.ChannelPoolMap} key * * @return A {@link CompletableFuture} containing the acquired {@link Channel} */ @Override public CompletableFuture<Channel> getChannel(M message) { final CompletableFuture<Channel> channelFuture = new CompletableFuture<>(); //Retrieve our channel pool based on the message final ChannelPool pool = poolMap.get(message); log.debug("Acquiring channel from pool '{}' for message : {}", pool, message); //Acquire a channel from the pool and listen for completion pool.acquire().addListener((Future<Channel> future) -> { if (future.isSuccess()) { log.debug("Successfully acquired Channel from pool"); Channel channel = future.get(); channel.attr(ChannelAttributes.CHANNEL_POOL).set(pool); channelFuture.complete(channel); } else { log.debug("Failed to acquire Channel from Pool"); channelFuture.completeExceptionally(new ConnectException(future.cause())); } }); return channelFuture; }
/** * Create a new client context with optional pool support * * @param sink * @param options * @param loggingHandler * @param secure * @param providedAddress * @param channelOpFactory * @param pool * @param <CHANNEL> * * @return a new {@link ContextHandler} for clients */ public static <CHANNEL extends Channel> ContextHandler<CHANNEL> newClientContext( MonoSink<NettyContext> sink, ClientOptions options, LoggingHandler loggingHandler, boolean secure, SocketAddress providedAddress, ChannelPool pool, ChannelOperations.OnNew<CHANNEL> channelOpFactory) { if (pool != null) { return new PooledClientContextHandler<>(channelOpFactory, options, sink, loggingHandler, secure, providedAddress, pool); } return new ClientContextHandler<>(channelOpFactory, options, sink, loggingHandler, secure, providedAddress); }
/** * Create a {@link ContextHandler} for {@link Bootstrap#handler()} * * @param handler user provided in/out handler * @param sink user provided bind handler * @param secure if operation should be secured * @param pool if channel pool * @param onSetup if operation has local setup callback * * @return a new {@link ContextHandler} */ protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler, MonoSink<NettyContext> sink, boolean secure, SocketAddress providedAddress, ChannelPool pool, Consumer<? super Channel> onSetup) { return ContextHandler.newClientContext(sink, options, loggingHandler, secure, providedAddress, pool, handler == null ? EMPTY : (ch, c, msg) -> ChannelOperations.bind(ch, handler, c)); }
@Override protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler, MonoSink<NettyContext> sink, boolean secure, SocketAddress providedAddress, ChannelPool pool, Consumer<? super Channel> onSetup) { return ContextHandler.<SocketChannel>newClientContext(sink, options, loggingHandler, secure, providedAddress, pool, handler != null ? (ch, c, msg) -> { if(onSetup != null){ onSetup.accept(ch); } return HttpClientOperations.bindHttp(ch, handler, c); } : EMPTY).onPipeline(this); }
public RequestContext(ChannelPool channelPool, SdkHttpRequest sdkRequest, SdkHttpRequestProvider requestProvider, HttpRequest nettyRequest, SdkHttpResponseHandler handler, NettyConfiguration configuration) { this.channelPool = channelPool; this.sdkRequest = sdkRequest; this.requestProvider = requestProvider; this.nettyRequest = nettyRequest; this.handler = handler; this.configuration = configuration; }
StreamingChannel(Channel channel, ChannelPool pool, ObjectHolder<Boolean> callActiveHolder, ObjectHolder<Boolean> downstreamLastChunkSentHolder, Deque<Span> distributedTracingSpanStack, Map<String, String> distributedTracingMdcInfo) { this.channel = channel; this.pool = pool; this.callActiveHolder = callActiveHolder; this.downstreamLastChunkSentHolder = downstreamLastChunkSentHolder; this.distributedTracingSpanStack = distributedTracingSpanStack; this.distributedTracingMdcInfo = distributedTracingMdcInfo; }
@Before public void beforeMethod() { channelMock = mock(Channel.class); channelPoolMock = mock(ChannelPool.class); eventLoopMock = mock(EventLoop.class); contentChunkMock = mock(HttpContent.class); callActiveHolder = new ObjectHolder<>(); callActiveHolder.heldObject = true; downstreamLastChunkSentHolder = new ObjectHolder<>(); downstreamLastChunkSentHolder.heldObject = false; streamingChannelSpy = spy(new StreamingChannel(channelMock, channelPoolMock, callActiveHolder, downstreamLastChunkSentHolder, null, null)); writeAndFlushChannelFutureMock = mock(ChannelFuture.class); doReturn(eventLoopMock).when(channelMock).eventLoop(); doReturn(writeAndFlushChannelFutureMock).when(channelMock).writeAndFlush(contentChunkMock); channelIsBrokenAttrMock = mock(Attribute.class); doReturn(channelIsBrokenAttrMock).when(channelMock).attr(CHANNEL_IS_BROKEN_ATTR); streamChunkChannelPromiseMock = mock(ChannelPromise.class); doReturn(streamChunkChannelPromiseMock).when(channelMock).newPromise(); failedFutureMock = mock(ChannelFuture.class); doReturn(failedFutureMock).when(channelMock).newFailedFuture(any(Throwable.class)); }
PooledClientContextHandler(ChannelOperations.OnNew<CHANNEL> channelOpFactory, ClientOptions options, MonoSink<NettyContext> sink, LoggingHandler loggingHandler, boolean secure, SocketAddress providedAddress, ChannelPool pool) { super(channelOpFactory, options, sink, loggingHandler, providedAddress); this.clientOptions = options; this.secure = secure; this.pool = pool; this.onReleaseEmitter = DirectProcessor.create(); }
@Override public ChannelPool selectOrCreate(SocketAddress remote, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) { SocketAddress address = remote; for (; ; ) { Pool pool = channelPools.get(remote); if (pool != null) { return pool; } Bootstrap b = bootstrap.get(); if (remote != null) { b = b.remoteAddress(remote); } else { address = b.config() .remoteAddress(); } if (log.isDebugEnabled()) { log.debug("New {} client pool for {}", name, address); } pool = new Pool(b, provider, onChannelCreate, group); if (channelPools.putIfAbsent(address, pool) == null) { return pool; } pool.close(); } }
@Override public ChannelPool selectOrCreate(SocketAddress address, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) { return defaultPools.selectOrCreate(address, bootstrap, onChannelCreate, group); }
/** * @param handler * @param address * @param secure * @param onSetup * * @return a new Mono to connect on subscribe */ protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler, InetSocketAddress address, boolean secure, Consumer<? super Channel> onSetup) { final BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> targetHandler = null == handler ? ChannelOperations.noopHandler() : handler; return Mono.create(sink -> { SocketAddress remote = address != null ? address : options.getAddress(); ChannelPool pool = null; PoolResources poolResources = options.getPoolResources(); if (poolResources != null) { pool = poolResources.selectOrCreate(remote, options, doHandler(null, sink, secure, remote, null, null), options.getLoopResources().onClient(options.preferNative())); } ContextHandler<SocketChannel> contextHandler = doHandler(targetHandler, sink, secure, remote, pool, onSetup); sink.onCancel(contextHandler); if (pool == null) { Bootstrap b = options.get(); b.remoteAddress(remote); b.handler(contextHandler); contextHandler.setFuture(b.connect()); } else { contextHandler.setFuture(pool.acquire()); } }); }
RequestSender(String s3Location, Request requestData, SubscriptionCompletionHandler completionHandler, HandlerDemultiplexer demultiplexer, ChannelPool channelPool) { this.s3Location = s3Location; this.requestData = requestData; this.completionHandler = completionHandler; this.demultiplexer = demultiplexer; this.channelPool = channelPool; }
ChannelPool channelPool() { return channelPool; }
protected ChannelPool getPooledChannelFuture(String downstreamHost, int downstreamPort) { return getPoolMap().get( resolveHostnameToInetSocketAddressWithMultiIpSupport(downstreamHost, downstreamPort) ); }
private void verifyChannelReleasedBackToPool(ObjectHolder<Boolean> callActiveHolder, ChannelPool theChannelPoolMock, Channel theChannelMock) { assertThat(callActiveHolder.heldObject).isFalse(); verify(theChannelPoolMock).release(theChannelMock); }
/** * Acquires a channel pool * @return a channel pool */ protected ChannelPool channelPool() { final InetSocketAddress sockAddr = endpointSocketAddress(); return poolMap.get(sockAddr); }
@Override public ChannelPool createChannelPool(Class<?> key) { log.debug("Creating Channel Pool For : {}", key.getSimpleName()); return new ConnectionlessChannelPool(getBootstrap(), channelPoolHandler); }
@Override protected ChannelPool newPool(K key) { return poolFactory.apply(key); }
@Override public Iterator<Map.Entry<K, ChannelPool>> iterator() { return internalPoolMap.iterator(); }
@Override public ChannelPool createChannelPool(InetSocketAddress key) { return new SimpleChannelPool(getBootstrap().remoteAddress(key), channelPoolHandler); }
ChannelPool newPool(Bootstrap b, ChannelPoolHandler handler, ChannelHealthChecker checker);
@Before public void before() { loopDisposed = new AtomicBoolean(); poolDisposed = new AtomicBoolean(); loopResources = new LoopResources() { @Override public EventLoopGroup onServer(boolean useNative) { return null; } @Override public Mono<Void> disposeLater() { return Mono.<Void>empty().doOnSuccess(c -> loopDisposed.set(true)); } @Override public boolean isDisposed() { return loopDisposed.get(); } }; poolResources = new PoolResources() { @Override public ChannelPool selectOrCreate(SocketAddress address, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) { return null; } public Mono<Void> disposeLater() { return Mono.<Void>empty().doOnSuccess(c -> poolDisposed.set(true)); } @Override public boolean isDisposed() { return poolDisposed.get(); } }; tcpResources = new TcpResources(loopResources, poolResources); }
@Before public void before() { loopDisposed = new AtomicBoolean(); poolDisposed = new AtomicBoolean(); loopResources = new LoopResources() { @Override public EventLoopGroup onServer(boolean useNative) { return null; } @Override public Mono<Void> disposeLater() { return Mono.<Void>empty().doOnSuccess(c -> loopDisposed.set(true)); } @Override public boolean isDisposed() { return loopDisposed.get(); } }; poolResources = new PoolResources() { @Override public ChannelPool selectOrCreate(SocketAddress address, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) { return null; } public Mono<Void> disposeLater() { return Mono.<Void>empty().doOnSuccess(c -> poolDisposed.set(true)); } @Override public boolean isDisposed() { return poolDisposed.get(); } }; testResources = new HttpResources(loopResources, poolResources); }
public MintDsClientHandler(ChannelPool channelPool) { this.channelPool = channelPool; }
HttpClientHandler(ChannelPool channelPool, SubscriptionCompletionHandler completionHandler) { this.channelPool = channelPool; this.completionHandler = completionHandler; }
/** * Creates a new RequestCompletion * @param queryCount The number of queries submitted * @param timeoutMs The timeout in ms. * @param pool the pool to return the channel to */ public RequestCompletion(final long timeoutMs, final ChannelPool pool) { this.timeoutMs = timeoutMs; latch = new CountDownLatch(1); this.pool = pool; }
/** * <p>Accepts two functions that will be used internally for processing the key and creation of the {@link * ChannelPool} instance</p> * * @param keyResolver * A function that accepts an {@link AbstractMessage} as the input and returns a type of a key (as * specified). * This will be used to resolve keys based on the {@link AbstractMessage} argument. * @param poolFactory * A factory function that returns a {@link ChannelPool} implementation based on the key provided. */ public MessageChannelPoolMap(Function<M, K> keyResolver, Function<K, ChannelPool> poolFactory) { this.keyResolver = keyResolver; this.poolFactory = poolFactory; }
/** * Retrieve a {@link ChannelPool} instance using {@link AbstractMessage} as the search key. * * @param message * An {@link AbstractMessage} to be used to derive the actual key from * * @return A {@link ChannelPool} instance */ @Override public ChannelPool get(M message) { return internalPoolMap.get(keyResolver.apply(message)); }
/** * A factory method that creates a {@link ChannelPool} based on the key provided. * * @param key * The key to be used for {@link ChannelPool} creation * * @return A {@link ChannelPool} instance */ abstract public ChannelPool createChannelPool(K key);
/** * Return an existing or new {@link ChannelPool}. The implementation will take care * of pulling {@link Bootstrap} lazily when a {@link ChannelPool} creation is actually * needed. * * @param address the remote address to resolve for existing or * new {@link ChannelPool} * @param bootstrap the {@link Bootstrap} supplier if a {@link ChannelPool} must be * created * @param onChannelCreate callback only when new connection is made * @return an existing or new {@link ChannelPool} */ ChannelPool selectOrCreate(SocketAddress address, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group);