/** * starts server to handle discovery-request from client-channel * * @throws Exception */ public void startServer() throws Exception { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); EventLoopUtil.enableTriggeredMode(bootstrap); bootstrap.childHandler(new ServiceChannelInitializer(this, config, false)); // Bind and start to accept incoming connections. bootstrap.bind(config.getServicePort()).sync(); LOG.info("Started Pulsar Discovery service on port {}", config.getServicePort()); if (config.isTlsEnabled()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new ServiceChannelInitializer(this, config, true)); tlsBootstrap.bind(config.getServicePortTls()).sync(); LOG.info("Started Pulsar Discovery TLS service on port {}", config.getServicePortTls()); } }
public void start() throws Exception { this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, pulsar.getConfiguration().getClusterName()); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); EventLoopUtil.enableTriggeredMode(bootstrap); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); bootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, false)); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), port)).sync(); log.info("Started Pulsar Broker service on port {}", port); if (serviceConfig.isTlsEnabled()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, true)); tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort)).sync(); log.info("Started Pulsar Broker TLS service on port {}", tlsPort); } // start other housekeeping functions this.startStatsUpdater(); this.startInactivityMonitor(); this.startMessageExpiryMonitor(); this.startBacklogQuotaChecker(); // register listener to capture zk-latency ClientCnxnAspect.addListener(zkStatsListener); ClientCnxnAspect.registerExecutor(pulsar.getExecutor()); }
public void start() throws Exception { localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs()); localZooKeeperConnectionService.start(new ShutdownService() { @Override public void shutdown(int exitCode) { LOG.error("Lost local ZK session. Shutting down the proxy"); Runtime.getRuntime().halt(-1); } }); discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory()); this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache); ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig); authenticationService = new AuthenticationService(serviceConfiguration); authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); EventLoopUtil.enableTriggeredMode(bootstrap); bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false)); // Bind and start to accept incoming connections. bootstrap.bind(proxyConfig.getServicePort()).sync(); LOG.info("Started Pulsar Proxy at {}", serviceUrl); if (proxyConfig.isTlsEnabledInProxy()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true)); tlsBootstrap.bind(proxyConfig.getServicePortTls()).sync(); LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getWebServicePortTls()); } }
@Test public void testOptionsHaveCorrectTypes() throws Exception { final ServerBootstrap bootstrap = new ServerBootstrap(); final ChannelOptions options = new ChannelOptions(); options.setAllocator(new PooledByteBufAllocator()); options.setRecvBufAllocator(new AdaptiveRecvByteBufAllocator()); options.setConnectTimeout(1); options.setWriteSpinCount(1); options.setWriteBufferWaterMark(new WriteBufferWaterMark(8192, 32768)); options.setAllowHalfClosure(true); options.setAutoRead(true); options.setSoBroadcast(true); options.setSoKeepAlive(true); options.setSoReuseAddr(true); options.setSoSndBuf(8192); options.setSoRcvBuf(8192); options.setSoLinger(0); options.setSoBacklog(0); options.setSoTimeout(0); options.setIpTos(0); options.setIpMulticastAddr(getLoopbackAddress()); options.setIpMulticastIf(getNetworkInterfaces().nextElement()); options.setIpMulticastTtl(300); options.setIpMulticastLoopDisabled(true); options.setTcpNodelay(true); final Map<ChannelOption, Object> channelOptionMap = options.get(); for (final Map.Entry<ChannelOption, Object> entry : channelOptionMap.entrySet()) { bootstrap.option(entry.getKey(), entry.getValue()); bootstrap.childOption(entry.getKey(), entry.getValue()); } }
public FileMsgSender(String zkAddrs, String zkNode) { this.zkNode = zkNode; this.zkClient = ZKClientCache.get(zkAddrs); this.bootstrap = new Bootstrap(); bootstrap.group(GROUP); bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new FileMsgSendInitializer()); }
private boolean bindUdpPort(final InetAddress addr, final int port) { EventLoopGroup group = new NioEventLoopGroup(); bootstrapUDP = new Bootstrap(); bootstrapUDP.group(group).channel(NioDatagramChannel.class) .handler(new DatagramHandler(this, Transport.UDP)); bootstrapUDP.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1500, 1500, RECV_BUFFER_SIZE)); bootstrapUDP.option(ChannelOption.SO_RCVBUF, RECV_BUFFER_SIZE); bootstrapUDP.option(ChannelOption.SO_SNDBUF, SEND_BUFFER_SIZE); // bootstrap.setOption("trafficClass", trafficClass); // bootstrap.setOption("soTimeout", soTimeout); // bootstrap.setOption("broadcast", broadcast); bootstrapUDP.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS); bootstrapUDP.option(ChannelOption.SO_REUSEADDR, true); try { InetSocketAddress iAddr = new InetSocketAddress(addr, port); udpChannel = (DatagramChannel) bootstrapUDP.bind(iAddr).sync().channel(); //addLocalSocket(iAddr, c); logger.info("Successfully bound to ip:port {}:{}", addr, port); } catch (InterruptedException e) { logger.error("Problem when trying to bind to {}:{}", addr.getHostAddress(), port); return false; } return true; }
@Override public void init(Object object) { logger.info("初始化Netty 开始"); PropertiesWrapper propertiesWrapper = (PropertiesWrapper) object; int defaultValue = Runtime.getRuntime().availableProcessors() * 2; bossGroupNum = propertiesWrapper.getIntProperty(SystemEnvironment.NETTY_BOSS_GROUP_NUM, defaultValue); workerGroupNum = propertiesWrapper.getIntProperty(SystemEnvironment.NETTY_WORKER_GROUP_NUM, defaultValue); backlog = propertiesWrapper.getIntProperty(SystemEnvironment.NETTY_BACKLOG, BACKLOG); name = propertiesWrapper.getProperty(SystemEnvironment.NETTY_SERVER_NAME, "NETTY_SERVER"); int port = propertiesWrapper.getIntProperty(SystemEnvironment.TCP_PROT, D_PORT); Thread thread = new Thread(new Runnable() { public void run() { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(bossGroupNum); workerGroup = new NioEventLoopGroup(workerGroupNum); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, backlog) .option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(true)) // .option(ChannelOption.TCP_NODELAY, // Boolean.valueOf(true)) // .option(ChannelOption.SO_KEEPALIVE, // Boolean.valueOf(true)) .childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new NettyServerInitializer()); ChannelFuture f; try { f = bootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Netty-Start-Thread"); thread.start(); logger.info("初始化Netty 线程启动"); super.setActive(); }
private HubManager(final Properties properties) { Runtime.getRuntime().addShutdownHook(new Thread(){ public void run() { try { close(); } catch (Exception x) {/* No Op */} } }); log.info(">>>>> Initializing HubManager..."); metricMetaService = new MetricsMetaAPIImpl(properties); tsdbEndpoint = TSDBEndpoint.getEndpoint(metricMetaService.getSqlWorker()); for(String url: tsdbEndpoint.getUpServers()) { final URL tsdbUrl = URLHelper.toURL(url); tsdbAddresses.add(new InetSocketAddress(tsdbUrl.getHost(), tsdbUrl.getPort())); } endpointCount = tsdbAddresses.size(); endpointSequence = new AtomicInteger(endpointCount); group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, metricMetaService.getForkJoinPool()); bootstrap = new Bootstrap(); bootstrap .handler(channelInitializer) .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, BufferManager.getInstance()); final ChannelPoolHandler poolHandler = this; poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() { @Override protected SimpleChannelPool newPool(final InetSocketAddress key) { final Bootstrap b = new Bootstrap().handler(channelInitializer) .group(group) .remoteAddress(key) .channel(NioSocketChannel.class) .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, BufferManager.getInstance()); return new SimpleChannelPool(b, poolHandler); } }; eventExecutor = new DefaultEventExecutor(metricMetaService.getForkJoinPool()); channelGroup = new DefaultChannelGroup("MetricHubChannelGroup", eventExecutor); // tsdbAddresses.parallelStream().forEach(addr -> { // final Set<Channel> channels = Collections.synchronizedSet(new HashSet<Channel>(3)); // IntStream.of(1,2,3).parallel().forEach(i -> { // final ChannelPool pool = poolMap.get(addr); // try {channels.add(pool.acquire().awaitUninterruptibly().get()); // } catch (Exception e) {} // log.info("Acquired [{}] Channels", channels.size()); // channels.parallelStream().forEach(ch -> pool.release(ch)); // }); // }); log.info("<<<<< HubManager Initialized."); }
private void connectAndSend(Integer command, String message) { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.option(ChannelOption.MAX_MESSAGES_PER_READ, CommonConstants.MAX_MESSAGES_PER_READ); // (4) // b.option(ChannelOption.RCVBUF_ALLOCATOR, // new FixedRecvByteBufAllocator(CommonConstants.MAX_RECEIVED_BUFFER_SIZE)); b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator( CommonConstants.MIN_RECEIVED_BUFFER_SIZE, CommonConstants.RECEIVED_BUFFER_SIZE, CommonConstants.MAX_RECEIVED_BUFFER_SIZE)); if (command == null || message == null) { b.handler(new ChannelInitializerImpl()); } else { b.handler(new ChannelInitializerImpl(command, message)); } // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. //f.channel().closeFuture().sync(); boolean completed = f.channel().closeFuture().await(timeout, TimeUnit.SECONDS); // (6) if (!completed) { String PRSCODE_PREFIX = "<" + PRS_CODE + ">"; String PRSCODE_AFFIX = "</" + PRS_CODE + ">"; String prsCode = "[Unkonwn prscode]"; if (message == null) { } else { int start = message.indexOf(PRSCODE_PREFIX); int end = message.indexOf(PRSCODE_AFFIX) + PRSCODE_AFFIX.length(); prsCode = (start == -1 || end == -1) ? prsCode : message.substring(start, end); } Logger.getLogger(P2PTunnel.class.getName()).log(Level.WARNING, "[{0}] operation exceeds {1}seconds and is timeout, channel is forcily closed", new Object[]{prsCode, timeout}); //forcily close channel to avoid connection leak //acutally if no forcible channel close calling, connection still will be closed. //but for comprehensive consideration, I call channel close again f.channel().close().sync(); } } catch (InterruptedException ex) { Logger.getLogger(P2PTunnel.class.getName()).log(Level.SEVERE, "channel management hits a problem, due to\n{0}", ex); } finally { workerGroup.shutdownGracefully(); } }