/** * 初始化连接池 */ public void init() { bootstrap = new Bootstrap(); eventLoopGroup = new NioEventLoopGroup(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler()); //所有的公用一个eventloopgroup, 对于客户端来说应该问题不大! poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() { @Override protected FixedChannelPool newPool(InetSocketAddress key) { return new FixedChannelPool(bootstrap.remoteAddress(key), new FixedChannelPoolHandler(), 2); } }; //预先建立好链接 serverListConfig.getAddressList().stream().forEach(address -> { poolMap.get(address); }); }
/** * @return */ private FixedChannelPool choose() { counter.incrementAndGet(); AbstractChannelPoolMap temp = ((AbstractChannelPoolMap) poolMap); int size = temp.size(); int index = counter.get() % size; Iterator<Map.Entry<InetSocketAddress, FixedChannelPool>> it = temp.iterator(); int i = 0; while (it.hasNext()) { i++; if (i == index) { return it.next().getValue(); } } return null; }
@SuppressWarnings("unchecked") public void close() { Iterator<Map.Entry<InetSocketAddress, FixedChannelPool>> it = ((AbstractChannelPoolMap) poolMap).iterator(); while (it.hasNext()) { it.next().getValue().close(); } eventLoopGroup.shutdownGracefully(); }
public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); final Bootstrap cb = new Bootstrap(); cb.group(group).channel(NioSocketChannel.class); InetSocketAddress addr1 = new InetSocketAddress("10.0.0.10", 8888); InetSocketAddress addr2 = new InetSocketAddress("10.0.0.11", 8888); //连接池map ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() { @Override protected SimpleChannelPool newPool(InetSocketAddress key) { return new SimpleChannelPool(cb.remoteAddress(key), new TestChannelPoolHandler()); } }; final SimpleChannelPool pool1 = poolMap.get(addr1);//取出連接addr1地址的连接池 final SimpleChannelPool pool2 = poolMap.get(addr2);//取出連接addr2地址的连接池 Future<Channel> f1 = pool1.acquire();//获取一个连接 f1.addListener(new FutureListener<Channel>() { @Override public void operationComplete(Future<Channel> f) { if (f.isSuccess()) { Channel ch = f.getNow(); //连接地址1的某个channel //使用连接发送消息 // ch.write(msg) //用完释放 pool1.release(ch); } } }); }
private HubManager(final Properties properties) { Runtime.getRuntime().addShutdownHook(new Thread(){ public void run() { try { close(); } catch (Exception x) {/* No Op */} } }); log.info(">>>>> Initializing HubManager..."); metricMetaService = new MetricsMetaAPIImpl(properties); tsdbEndpoint = TSDBEndpoint.getEndpoint(metricMetaService.getSqlWorker()); for(String url: tsdbEndpoint.getUpServers()) { final URL tsdbUrl = URLHelper.toURL(url); tsdbAddresses.add(new InetSocketAddress(tsdbUrl.getHost(), tsdbUrl.getPort())); } endpointCount = tsdbAddresses.size(); endpointSequence = new AtomicInteger(endpointCount); group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, metricMetaService.getForkJoinPool()); bootstrap = new Bootstrap(); bootstrap .handler(channelInitializer) .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, BufferManager.getInstance()); final ChannelPoolHandler poolHandler = this; poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() { @Override protected SimpleChannelPool newPool(final InetSocketAddress key) { final Bootstrap b = new Bootstrap().handler(channelInitializer) .group(group) .remoteAddress(key) .channel(NioSocketChannel.class) .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, BufferManager.getInstance()); return new SimpleChannelPool(b, poolHandler); } }; eventExecutor = new DefaultEventExecutor(metricMetaService.getForkJoinPool()); channelGroup = new DefaultChannelGroup("MetricHubChannelGroup", eventExecutor); // tsdbAddresses.parallelStream().forEach(addr -> { // final Set<Channel> channels = Collections.synchronizedSet(new HashSet<Channel>(3)); // IntStream.of(1,2,3).parallel().forEach(i -> { // final ChannelPool pool = poolMap.get(addr); // try {channels.add(pool.acquire().awaitUninterruptibly().get()); // } catch (Exception e) {} // log.info("Acquired [{}] Channels", channels.size()); // channels.parallelStream().forEach(ch -> pool.release(ch)); // }); // }); log.info("<<<<< HubManager Initialized."); }