public static void main(String[] args) throws TTransportException, IOException, InterruptedException { TNonblockingServerSocket trans_svr = new TNonblockingServerSocket(9090); TMultiplexedProcessor proc = new TMultiplexedProcessor(); proc.registerProcessor("Message", new Message.Processor<>(new MessageHandler())); proc.registerProcessor("ServerTime", new ServerTime.Processor<>(new ServerTimeHandler())); TServer server = new TThreadedSelectorServer( new TThreadedSelectorServer.Args(trans_svr) .processor(proc) .protocolFactory(new TJSONProtocol.Factory()) .workerThreads(6) .selectorThreads(3)); Thread server_thread = new Thread(new RunnableServer(server), "server_thread"); server_thread.start(); System.out.println("[Server] press enter to shutdown> "); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); br.readLine(); System.out.println("[Server] shutting down..."); server.stop(); server_thread.join(); System.out.println("[Server] down, exiting"); }
public void start(CountDownLatch latch, int port) { try { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); //异步IO,需要使用TFramedTransport,它将分块缓存读取。 TTransportFactory transportFactory = new TFramedTransport.Factory(); //使用高密度二进制协议 TProtocolFactory proFactory = new TBinaryProtocol.Factory(); //发布多个服务 TMultiplexedProcessor processor = new TMultiplexedProcessor(); processor.registerProcessor(ClassNameUtils.getClassName(Hello.class), new Hello.Processor<>(new HelloServer())); TServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport) .transportFactory(transportFactory) .protocolFactory(proFactory) .processor(processor) ); System.out.println("Starting the hello server..."); latch.countDown(); server.serve(); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) { try { handler = new CalculatorHandler(); processor = new Calculator.Processor(handler); try { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(9090); TServer server = new TThreadedSelectorServer( new TThreadedSelectorServer.Args(serverTransport).processor(processor)); System.out.println("Starting the server..."); server.serve(); } catch (TTransportException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (Exception x) { x.printStackTrace(); } }
private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, int workerThreads, int selectorThreads, int maxCallQueueSize, InetSocketAddress inetSocketAddress, ThriftMetrics metrics) throws TTransportException { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport); if (workerThreads > 0) { serverArgs.workerThreads(workerThreads); } if (selectorThreads > 0) { serverArgs.selectorThreads(selectorThreads); } ExecutorService executorService = createExecutor( workerThreads, maxCallQueueSize, metrics); serverArgs.executorService(executorService); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); serverArgs.protocolFactory(protocolFactory); return new TThreadedSelectorServer(serverArgs); }
public static TServer createThreadedSelectorServer(TProcessorFactory processorFactory, int port, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize) throws TTransportException { int numThreads = Math.max(2, Runtime.getRuntime().availableProcessors()); int selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2); TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(10000) .selectorThreads(selectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }
public static void main(String [] args) throws TTransportException, IOException, InterruptedException { Properties port_cfg = new Properties(); InputStream input = new FileInputStream("../config.properties"); port_cfg.load(input); String port_str = port_cfg.getProperty("CA_PORT"); Integer port = Integer.valueOf(port_str); TProcessor proc = new LucidaService.AsyncProcessor( new CAServiceHandler.AsyncCAServiceHandler()); TNonblockingServerTransport transport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args arguments = new TThreadedSelectorServer.Args(transport) .processor(proc) .protocolFactory(new TBinaryProtocol.Factory()) .transportFactory(new TFramedTransport.Factory()); final TThreadedSelectorServer server = new TThreadedSelectorServer(arguments); System.out.println("CA at port " + port_str); server.serve(); }
/** * Entry point for question-answer. * @param args the argument list. Provide port numbers * for both sirius and qa. */ public static void main(String [] args) throws TTransportException, IOException, InterruptedException { Properties port_cfg = new Properties(); InputStream input = new FileInputStream("../../config.properties"); port_cfg.load(input); String port_str = port_cfg.getProperty("QA_PORT"); Integer port = Integer.valueOf(port_str); TProcessor proc = new LucidaService.AsyncProcessor( new QAServiceHandler.AsyncQAServiceHandler()); TNonblockingServerTransport transport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args arguments = new TThreadedSelectorServer.Args(transport) .processor(proc) .protocolFactory(new TBinaryProtocol.Factory()) .transportFactory(new TFramedTransport.Factory()); final TThreadedSelectorServer server = new TThreadedSelectorServer(arguments); System.out.println("QA at port " + port_str); server.serve(); }
@PostConstruct public void start() { logger.info("Starting thift server " + name + " on port " + port); try { transport = new TNonblockingServerSocket(port); server = new TThreadedSelectorServer( new TThreadedSelectorServer.Args(transport) .processor(processor) .workerThreads(threads) .selectorThreads((threads / 16) + 1) .protocolFactory(protocolFactory) .transportFactory(new TFramedTransport.Factory())); thread.start(); } catch (TTransportException e) { throw new RuntimeException("Unable to start thrift server " + e, e); } }
public static AsyncEchoTestServer<TThreadedSelectorServer> threadedSelectorServer( final TestEnvironment environment) throws TTransportException { TThreadedSelectorServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args( new TNonblockingServerSocket(environment.getPort())).processor(getAsyncProcessor()) .inputProtocolFactory(environment.getProtocolFactory()) .outputProtocolFactory(environment.getProtocolFactory())); return new AsyncEchoTestServer<TThreadedSelectorServer>(server, environment) { @Override public SyncEchoTestClient getSynchronousClient() throws TTransportException { return new SyncEchoTestClient.ClientForNonblockingServer(environment); } @Override public AsyncEchoTestClient getAsynchronousClient() throws IOException { return new AsyncEchoTestClient.Client(environment); } }; }
public static SyncEchoTestServer<TThreadedSelectorServer> threadedSelectorServer( final TestEnvironment environment) throws TTransportException { TThreadedSelectorServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args( new TNonblockingServerSocket(environment.getPort())).processor(getProcessor()) .inputProtocolFactory(environment.getProtocolFactory()) .outputProtocolFactory(environment.getProtocolFactory())); return new SyncEchoTestServer<TThreadedSelectorServer>(server, environment) { @Override public SyncEchoTestClient getSynchronousClient() throws TTransportException { return new SyncEchoTestClient.ClientForNonblockingServer(environment); } @Override public AsyncEchoTestClient getAsynchronousClient() throws IOException { return new AsyncEchoTestClient.Client(environment); } }; }
private void setup() throws IOException, TTransportException { if (mZookeeperMode) { mEditLogProcessor.stop(); } mMasterInfo.init(); mWebServer = new UIWebServer("Tachyon Master Server", new InetSocketAddress( mMasterAddress.getHostName(), mWebPort), mMasterInfo); mMasterServiceHandler = new MasterServiceHandler(mMasterInfo); MasterService.Processor<MasterServiceHandler> masterServiceProcessor = new MasterService.Processor<MasterServiceHandler>(mMasterServiceHandler); mServerTNonblockingServerSocket = new TNonblockingServerSocket(mMasterAddress); mMasterServiceServer = new TThreadedSelectorServer(new TThreadedSelectorServer.Args( mServerTNonblockingServerSocket).processor(masterServiceProcessor) .selectorThreads(mSelectorThreads).acceptQueueSizePerThread(mAcceptQueueSizePerThread) .workerThreads(mWorkerThreads)); mIsStarted = true; }
/** * Creates a new server. * * @param port a listening port * @param processor a processor * @param executorService an executor service * @throws TTransportException */ public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService) throws TTransportException { super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port)) .workerThreads(MAX_WORKER_THREADS) .selectorThreads(MAX_SELECTOR_THREADS) .acceptQueueSizePerThread(ACCEPT_QUEUE_LEN) .executorService(executorService) .processor(processor)); }
@Override protected void joinThreads() throws InterruptedException { // Wait until the io threads exit. acceptThread.join(); for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) { thread.join(); } }
public void init() { try { TMultiplexedProcessor processor = new TMultiplexedProcessor(); for (ServiceArgs service : serverArgs.getServices()) { String className = service.getService(); if (className.endsWith("$Processor")) { className = className.substring(0, className.indexOf("$Processor")); } processor.registerProcessor(className, service.getProcessor()); } if (serverArgs.getNettyServerArgs() != null) { this.server = new TNettyServer(serverArgs.getNettyServerArgs().ip(serverArgs.getHost()).port(serverArgs.getPort())); } else { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(new InetSocketAddress(serverArgs.getHost(), serverArgs.getPort())); //异步IO,需要使用TFramedTransport,它将分块缓存读取。 TTransportFactory transportFactory = new TFramedTransport.Factory(); //使用高密度二进制协议 TProtocolFactory proFactory = new TBinaryProtocol.Factory(); // Use this for a multithreaded key this.server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport) .transportFactory(transportFactory) .protocolFactory(proFactory) .processor(processor) ); } log.info("Starting the Thrift key..."); this.server.setServerEventHandler(new TrpcRegistryEventHandler(serverArgs)); this.server.serve(); if (this.serverArgs.getNettyServerArgs() != null) { ((TNettyServer) this.server).waitForClose(); } } catch (Exception e) { log.error("publish thrift key error", e); } }
/** * 初始化Thrift服务 * * @param serverTransport * 服务传输类型 */ @Override protected void initServer(TServerTransport serverTransport) { ThriftServerConfiguration configuration = getThriftServerConfiguration(); // 使用多线程半同步半异步方式 TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args((TNonblockingServerSocket) serverTransport) .transportFactory(configuration.getTransportFactory()) .protocolFactory(configuration.getProtocolFactory()); if (configuration.getExecutorService() != null) args.executorService(configuration.getExecutorService()); server = new TThreadedSelectorServer( configuration.getServerArgsAspect().TThreadedSelectorServerArgsAspect(args).processor(getProcessor())); if (configuration.getServerEventHandler() != null) server.setServerEventHandler(configuration.getServerEventHandler()); }
/** * Entry point for ensemble. */ public static void setup(Controller controller) throws TTransportException, IOException, InterruptedException { connectToCMD(); TProcessor proc = new LucidaService.AsyncProcessor( new ThriftHandler.AsyncThriftHandler(controller)); TNonblockingServerTransport transport = new TNonblockingServerSocket(9090); TThreadedSelectorServer.Args arguments = new TThreadedSelectorServer.Args(transport) .processor(proc) .protocolFactory(new TBinaryProtocol.Factory()) .transportFactory(new TFramedTransport.Factory()); final TThreadedSelectorServer server = new TThreadedSelectorServer(arguments); System.out.println("ensemble at port 9090"); server.serve(); }
public static void runService(GalenApiRemoteService.Processor processor, int serverPort) { try { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(serverPort); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport); TServer server = new TThreadedSelectorServer(args.processor(processor)); server.serve(); } catch (Exception e) { e.printStackTrace(); } }
public void serve() { TThreadedSelectorServer.Args serverArgs = createServerArgs(); if (serverArgs == null) return; server = new ThriftServer(serverArgs, "server-selector-%d", servingListeners); // Make sure we at least try to clean up a little bit when the VM is shut down. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { logger.info("Shutting down server..."); if (gracefulShutdownListeners != null) { logger.debug("Executing graceful shutdown listeners..."); new ShutdownUtil(applicationContext).callShutdownListeners(); } logger.debug("Stopping all local beans..."); applicationContext.close(); logger.debug("Stopping thrift server..."); server.stop(); logger.debug("Shutting down everything of remaining queries..."); executorManager.shutdownEverythingOfAllQueries(); logger.info("Shutdown complete."); } }, "shutdown-thread")); if ("".equals(bind)) logger.info("Listening for incoming requests on port {}...", port); else logger.info("Listening for incoming requests on port {} (bound to {})...", port, bind); server.serve(); }
@Override protected TServer getServer(TProcessor processor) throws TTransportException { LOGGER.debug("Setting Server with {} selector threads and {} worker threads", numSelectorThreads, numWorkerThreads); TNonblockingServerSocket transport = new TNonblockingServerSocket(remotePort); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport); args.transportFactory(new TFramedTransport.Factory()); args.protocolFactory(new TBinaryProtocol.Factory()); args.processor(processor); args.selectorThreads(numSelectorThreads); args.workerThreads(numWorkerThreads); return new TThreadedSelectorServer(args); }
public TThreadedSelectorServer.Args TThreadedSelectorServerArgsAspect(TThreadedSelectorServer.Args args) { return args; }
@Override protected EchoTestServer<TThreadedSelectorServer> createEchoServer(TestEnvironment environment) throws TTransportException { return AsyncEchoTestServerFactory.threadedSelectorServer(environment); }
@Override protected EchoTestServer<TThreadedSelectorServer> createEchoServer(TestEnvironment environment) throws TTransportException { return SyncEchoTestServerFactory.threadedSelectorServer(environment); }
/** * Helper method to create a new framed-transport, threaded-selector * {@link TServer}. * * <p> * Note: if {@code numSelectorThreads < 1}, the {@link TServer} is created * with 2 selector threads. * </p> * * <p> * Note: if {@code numWorkerThreads < 1}, the {@link TServer} is created * with 8 worker threads. * </p> * * @param processorFactory * @param protocolFactory * @param port * @param numSelectorThreads * @param numWorkerThreads * @param clientTimeoutMillisecs * @param maxFrameSize * @param maxReadBufferSize * @return * @throws TTransportException */ public static TServer createThreadedSelectorServer(TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port, int numSelectorThreads, int numWorkerThreads, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize) throws TTransportException { if (numSelectorThreads < 1) { numSelectorThreads = 2; } if (numWorkerThreads < 1) { numWorkerThreads = 8; } TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numWorkerThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(10000) .selectorThreads(numSelectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }
/** * Creates a {@link TThreadedSelectorServer} server. * * <p> * Similar to {@link THsHaServer} but its use 2 thread pools: one for * handling network I/O (e.g. accepting client connections), one for * handling message such like {@link THsHaServer}. * </p> * * @param processorFactory * @param protocolFactory * @param port * port number on which the Thrift server will listen * @param clientTimeoutMillisecs * @param maxFrameSize * max size (in bytes) of a transport frame, supply {@code <=0} * value to let the method choose a default {@code maxFrameSize} * value (which is 1Mb) * @param maxReadBufferSize * max size (in bytes) of read buffer, supply {@code <=0} value * to let the method choose a default {@code maxReadBufferSize} * value (which is 16Mb) * @param numSelectorThreads * number of selector threads, supply {@code <=0} value to let * the method choose a default {@code numSelectorThreads} value * (which is {@code 2} ) * @param numWorkerThreads * number of worker threads, supply {@code <=0} value to let the * method choose a default {@code numWorkerThreads} value (which * is * {@code Math.max(4, Runtime.getRuntime().availableProcessors())} * ) * @return * @throws TTransportException */ public static TThreadedSelectorServer createThreadedSelectorServer( TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize, int numSelectorThreads, int numWorkerThreads) throws TTransportException { if (clientTimeoutMillisecs <= 0) { clientTimeoutMillisecs = DEFAULT_CLIENT_TIMEOUT_MS; } if (maxFrameSize <= 0) { maxFrameSize = DEFAULT_MAX_FRAMESIZE; } if (maxReadBufferSize <= 0) { maxReadBufferSize = DEFAULT_TOTAL_MAX_READ_BUFFERSIZE; } if (numSelectorThreads <= 0) { numSelectorThreads = DEFAULT_NUM_SELECTOR_THREADS; } if (numWorkerThreads <= 0) { numWorkerThreads = DEFAULT_NUM_WORKER_THREADS; } TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numWorkerThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(100000) .selectorThreads(numSelectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }
/** * Creates a {@link TThreadedSelectorServer} server. * * <p> * Similar to {@link THsHaServer} but its use 2 thread pools: one for * handling network I/O (e.g. accepting client connections), one for * handling message such like {@link THsHaServer}. * </p> * * @param processorFactory * @param protocolFactory * @param port * port number on which the Thrift server will listen * @param clientTimeoutMillisecs * @param maxFrameSize * max size (in bytes) of a transport frame, supply {@code <=0} * value to let the method choose a default {@code maxFrameSize} * value (which is 1Mb) * @param maxReadBufferSize * max size (in bytes) of read buffer, supply {@code <=0} value * to let the method choose a default {@code maxReadBufferSize} * value (which is 16Mb) * @param numSelectorThreads * number of selector threads, supply {@code <=0} value to let * the method choose a default {@code numSelectorThreads} value * (which is {@code 2} ) * @param numWorkerThreads * number of worker threads, supply {@code <=0} value to let the * method choose a default {@code numWorkerThreads} value (which * is * {@code Math.max(2, Runtime.getRuntime().availableProcessors())} * ) * @return * @throws TTransportException */ public static TThreadedSelectorServer createThreadedSelectorServer( TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize, int numSelectorThreads, int numWorkerThreads) throws TTransportException { if (clientTimeoutMillisecs <= 0) { clientTimeoutMillisecs = DEFAULT_CLIENT_TIMEOUT_MS; } if (maxFrameSize <= 0) { maxFrameSize = DEFAULT_MAX_FRAMESIZE; } if (maxReadBufferSize <= 0) { maxReadBufferSize = DEFAULT_TOTAL_MAX_READ_BUFFERSIZE; } if (numSelectorThreads <= 0) { numSelectorThreads = DEFAULT_NUM_SELECTOR_THREADS; } if (numWorkerThreads <= 0) { numWorkerThreads = DEFAULT_NUM_WORKER_THREADS; } TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numWorkerThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(100000) .selectorThreads(numSelectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }