Java 类io.netty.channel.pool.AbstractChannelPoolMap 实例源码

项目:nettyRpc    文件:ClientConnectionPool.java   
/**
 * 初始化连接池
 */
public void init() {
    bootstrap = new Bootstrap();
    eventLoopGroup = new NioEventLoopGroup();
    bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new LoggingHandler());
    //所有的公用一个eventloopgroup, 对于客户端来说应该问题不大!
    poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
        @Override
        protected FixedChannelPool newPool(InetSocketAddress key) {
            return new FixedChannelPool(bootstrap.remoteAddress(key), new FixedChannelPoolHandler(), 2);
        }
    };
    //预先建立好链接
    serverListConfig.getAddressList().stream().forEach(address -> {
        poolMap.get(address);
    });
}
项目:nettyRpc    文件:ClientConnectionPool.java   
/**
 * @return
 */
private FixedChannelPool choose() {
    counter.incrementAndGet();
    AbstractChannelPoolMap temp = ((AbstractChannelPoolMap) poolMap);
    int size = temp.size();
    int index = counter.get() % size;
    Iterator<Map.Entry<InetSocketAddress, FixedChannelPool>> it = temp.iterator();
    int i = 0;
    while (it.hasNext()) {
        i++;
        if (i == index) {
            return it.next().getValue();
        }
    }
    return null;
}
项目:nettyRpc    文件:ClientConnectionPool.java   
@SuppressWarnings("unchecked")
public void close() {
    Iterator<Map.Entry<InetSocketAddress, FixedChannelPool>> it = ((AbstractChannelPoolMap) poolMap).iterator();
    while (it.hasNext()) {
        it.next().getValue().close();
    }
    eventLoopGroup.shutdownGracefully();
}
项目:util4j    文件:PoolTest.java   
public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        final Bootstrap cb = new Bootstrap();
        cb.group(group).channel(NioSocketChannel.class);
        InetSocketAddress addr1 = new InetSocketAddress("10.0.0.10", 8888);
        InetSocketAddress addr2 = new InetSocketAddress("10.0.0.11", 8888);

        //连接池map
        ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
            @Override
            protected SimpleChannelPool newPool(InetSocketAddress key) {
                return new SimpleChannelPool(cb.remoteAddress(key), new TestChannelPoolHandler());
            }
        };

        final SimpleChannelPool pool1 = poolMap.get(addr1);//取出連接addr1地址的连接池
        final SimpleChannelPool pool2 = poolMap.get(addr2);//取出連接addr2地址的连接池
        Future<Channel> f1 = pool1.acquire();//获取一个连接
        f1.addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> f) {
                if (f.isSuccess()) {
                    Channel ch = f.getNow();
                   //连接地址1的某个channel
                    //使用连接发送消息
//                  ch.write(msg)
                    //用完释放
                    pool1.release(ch);
                }
            }
        });

    }
项目:HeliosStreams    文件:HubManager.java   
private HubManager(final Properties properties) {
        Runtime.getRuntime().addShutdownHook(new Thread(){
            public void run() { try { close(); } catch (Exception x) {/* No Op */} }
        });
        log.info(">>>>> Initializing HubManager...");
        metricMetaService = new MetricsMetaAPIImpl(properties);
        tsdbEndpoint = TSDBEndpoint.getEndpoint(metricMetaService.getSqlWorker());
        for(String url: tsdbEndpoint.getUpServers()) {
            final URL tsdbUrl = URLHelper.toURL(url);
            tsdbAddresses.add(new InetSocketAddress(tsdbUrl.getHost(), tsdbUrl.getPort()));
        }
        endpointCount = tsdbAddresses.size(); 
        endpointSequence = new AtomicInteger(endpointCount);
        group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, metricMetaService.getForkJoinPool());
        bootstrap = new Bootstrap();
        bootstrap
            .handler(channelInitializer)
            .group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator())
            .option(ChannelOption.ALLOCATOR, BufferManager.getInstance());
        final ChannelPoolHandler poolHandler = this;
        poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
            @Override
            protected SimpleChannelPool newPool(final InetSocketAddress key) {
                final Bootstrap b = new Bootstrap().handler(channelInitializer)
                .group(group)
                .remoteAddress(key)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator())
                .option(ChannelOption.ALLOCATOR, BufferManager.getInstance());              
                return new SimpleChannelPool(b, poolHandler);
            }
        };
        eventExecutor = new DefaultEventExecutor(metricMetaService.getForkJoinPool());
        channelGroup = new DefaultChannelGroup("MetricHubChannelGroup", eventExecutor);

//      tsdbAddresses.parallelStream().forEach(addr -> {
//          final Set<Channel> channels = Collections.synchronizedSet(new HashSet<Channel>(3));
//          IntStream.of(1,2,3).parallel().forEach(i -> {
//              final ChannelPool pool = poolMap.get(addr); 
//              try {channels.add(pool.acquire().awaitUninterruptibly().get());
//              } catch (Exception e) {}
//              log.info("Acquired [{}] Channels", channels.size());
//              channels.parallelStream().forEach(ch -> pool.release(ch));
//          });
//      });




        log.info("<<<<< HubManager Initialized.");
    }