Java 类org.apache.thrift.server.TThreadedSelectorServer 实例源码

项目:ThriftBook    文件:MultiServiceServer.java   
public static void main(String[] args) throws TTransportException, IOException, InterruptedException {
    TNonblockingServerSocket trans_svr = new TNonblockingServerSocket(9090);
    TMultiplexedProcessor proc = new TMultiplexedProcessor();
    proc.registerProcessor("Message", new Message.Processor<>(new MessageHandler()));
    proc.registerProcessor("ServerTime", new ServerTime.Processor<>(new ServerTimeHandler()));

    TServer server = new TThreadedSelectorServer(
            new TThreadedSelectorServer.Args(trans_svr)
            .processor(proc)
            .protocolFactory(new TJSONProtocol.Factory())
            .workerThreads(6)
            .selectorThreads(3));
    Thread server_thread = new Thread(new RunnableServer(server), "server_thread");
    server_thread.start();

    System.out.println("[Server] press enter to shutdown> ");
    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    br.readLine();
    System.out.println("[Server] shutting down...");
    server.stop();
    server_thread.join();
    System.out.println("[Server] down, exiting");
}
项目:trpc    文件:DemoServer.java   
public void start(CountDownLatch latch, int port) {
    try {
        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
        //异步IO,需要使用TFramedTransport,它将分块缓存读取。
        TTransportFactory transportFactory = new TFramedTransport.Factory();
        //使用高密度二进制协议
        TProtocolFactory proFactory = new TBinaryProtocol.Factory();
        //发布多个服务
        TMultiplexedProcessor processor = new TMultiplexedProcessor();
        processor.registerProcessor(ClassNameUtils.getClassName(Hello.class), new Hello.Processor<>(new HelloServer()));

        TServer server = new TThreadedSelectorServer(new
                TThreadedSelectorServer.Args(serverTransport)
                .transportFactory(transportFactory)
                .protocolFactory(proFactory)
                .processor(processor)
        );
        System.out.println("Starting the hello server...");
        latch.countDown();
        server.serve();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
项目:springside-engine    文件:StandardNIOServer.java   
public static void main(String[] args) {
    try {
        handler = new CalculatorHandler();
        processor = new Calculator.Processor(handler);
        try {
            TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(9090);
            TServer server = new TThreadedSelectorServer(
                    new TThreadedSelectorServer.Args(serverTransport).processor(processor));

            System.out.println("Starting the  server...");
            server.serve();
        } catch (TTransportException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    } catch (Exception x) {
        x.printStackTrace();
    }
}
项目:hbase    文件:ThriftServer.java   
private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
    TProcessor processor, TTransportFactory transportFactory,
    int workerThreads, int selectorThreads, int maxCallQueueSize,
    InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
    throws TTransportException {
  TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
  log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
  TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
  if (workerThreads > 0) {
    serverArgs.workerThreads(workerThreads);
  }
  if (selectorThreads > 0) {
    serverArgs.selectorThreads(selectorThreads);
  }

  ExecutorService executorService = createExecutor(
      workerThreads, maxCallQueueSize, metrics);
  serverArgs.executorService(executorService);
  serverArgs.processor(processor);
  serverArgs.transportFactory(transportFactory);
  serverArgs.protocolFactory(protocolFactory);
  return new TThreadedSelectorServer(serverArgs);
}
项目:osgi-bundle-frontapi    文件:ThriftUtils.java   
public static TServer createThreadedSelectorServer(TProcessorFactory processorFactory,
        int port, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize)
        throws TTransportException {
    int numThreads = Math.max(2, Runtime.getRuntime().availableProcessors());
    int selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
    TNonblockingServerTransport transport = new TNonblockingServerSocket(port,
            clientTimeoutMillisecs);
    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
    TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize);
    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
            .processorFactory(processorFactory).protocolFactory(protocolFactory)
            .transportFactory(transportFactory).workerThreads(numThreads)
            .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(10000)
            .selectorThreads(selectorThreads);
    args.maxReadBufferBytes = maxReadBufferSize;
    TThreadedSelectorServer server = new TThreadedSelectorServer(args);
    return server;
}
项目:lucida    文件:CalendarDaemon.java   
public static void main(String [] args) 
        throws TTransportException, IOException, InterruptedException { 
    Properties port_cfg = new Properties();
    InputStream input = new FileInputStream("../config.properties");
    port_cfg.load(input);
    String port_str = port_cfg.getProperty("CA_PORT");
    Integer port = Integer.valueOf(port_str);
    TProcessor proc = new LucidaService.AsyncProcessor(
            new CAServiceHandler.AsyncCAServiceHandler());
    TNonblockingServerTransport transport = new TNonblockingServerSocket(port);
    TThreadedSelectorServer.Args arguments = new TThreadedSelectorServer.Args(transport)
    .processor(proc)    
    .protocolFactory(new TBinaryProtocol.Factory())
    .transportFactory(new TFramedTransport.Factory());
    final TThreadedSelectorServer server = new TThreadedSelectorServer(arguments);
    System.out.println("CA at port " + port_str);
    server.serve();
}
项目:lucida    文件:QADaemon.java   
/** 
 * Entry point for question-answer.
 * @param args the argument list. Provide port numbers
 * for both sirius and qa.
 */
public static void main(String [] args) 
        throws TTransportException, IOException, InterruptedException { 
    Properties port_cfg = new Properties();
    InputStream input = new FileInputStream("../../config.properties");
    port_cfg.load(input);
    String port_str = port_cfg.getProperty("QA_PORT");
    Integer port = Integer.valueOf(port_str);
    TProcessor proc = new LucidaService.AsyncProcessor(
            new QAServiceHandler.AsyncQAServiceHandler());
    TNonblockingServerTransport transport = new TNonblockingServerSocket(port);
    TThreadedSelectorServer.Args arguments = new TThreadedSelectorServer.Args(transport)
    .processor(proc)    
    .protocolFactory(new TBinaryProtocol.Factory())
    .transportFactory(new TFramedTransport.Factory());
    final TThreadedSelectorServer server = new TThreadedSelectorServer(arguments);
    System.out.println("QA at port " + port_str);
    server.serve();
}
项目:plow    文件:ThriftServer.java   
@PostConstruct
public void start() {
    logger.info("Starting thift server " + name + " on port " + port);

    try {
        transport = new TNonblockingServerSocket(port);
        server = new TThreadedSelectorServer(
                new TThreadedSelectorServer.Args(transport)
            .processor(processor)
            .workerThreads(threads)
            .selectorThreads((threads / 16) + 1)
            .protocolFactory(protocolFactory)
            .transportFactory(new TFramedTransport.Factory()));
        thread.start();

    } catch (TTransportException e) {
        throw new RuntimeException("Unable to start thrift server " + e, e);
    }
}
项目:pinpoint    文件:AsyncEchoTestServer.java   
public static AsyncEchoTestServer<TThreadedSelectorServer> threadedSelectorServer(
        final TestEnvironment environment) throws TTransportException {
    TThreadedSelectorServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(
            new TNonblockingServerSocket(environment.getPort())).processor(getAsyncProcessor())
            .inputProtocolFactory(environment.getProtocolFactory())
            .outputProtocolFactory(environment.getProtocolFactory()));
    return new AsyncEchoTestServer<TThreadedSelectorServer>(server, environment) {
        @Override
        public SyncEchoTestClient getSynchronousClient() throws TTransportException {
            return new SyncEchoTestClient.ClientForNonblockingServer(environment);
        }

        @Override
        public AsyncEchoTestClient getAsynchronousClient() throws IOException {
            return new AsyncEchoTestClient.Client(environment);
        }
    };
}
项目:pinpoint    文件:SyncEchoTestServer.java   
public static SyncEchoTestServer<TThreadedSelectorServer> threadedSelectorServer(
        final TestEnvironment environment) throws TTransportException {
    TThreadedSelectorServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(
            new TNonblockingServerSocket(environment.getPort())).processor(getProcessor())
            .inputProtocolFactory(environment.getProtocolFactory())
            .outputProtocolFactory(environment.getProtocolFactory()));
    return new SyncEchoTestServer<TThreadedSelectorServer>(server, environment) {
        @Override
        public SyncEchoTestClient getSynchronousClient() throws TTransportException {
            return new SyncEchoTestClient.ClientForNonblockingServer(environment);
        }

        @Override
        public AsyncEchoTestClient getAsynchronousClient() throws IOException {
            return new AsyncEchoTestClient.Client(environment);
        }
    };
}
项目:tachyon-rdma    文件:TachyonMaster.java   
private void setup() throws IOException, TTransportException {
  if (mZookeeperMode) {
    mEditLogProcessor.stop();
  }
  mMasterInfo.init();

  mWebServer =
      new UIWebServer("Tachyon Master Server", new InetSocketAddress(
          mMasterAddress.getHostName(), mWebPort), mMasterInfo);

  mMasterServiceHandler = new MasterServiceHandler(mMasterInfo);
  MasterService.Processor<MasterServiceHandler> masterServiceProcessor =
      new MasterService.Processor<MasterServiceHandler>(mMasterServiceHandler);

  mServerTNonblockingServerSocket = new TNonblockingServerSocket(mMasterAddress);
  mMasterServiceServer =
      new TThreadedSelectorServer(new TThreadedSelectorServer.Args(
          mServerTNonblockingServerSocket).processor(masterServiceProcessor)
          .selectorThreads(mSelectorThreads).acceptQueueSizePerThread(mAcceptQueueSizePerThread)
          .workerThreads(mWorkerThreads));

  mIsStarted = true;
}
项目:athena    文件:Bmv2ControlPlaneThriftServer.java   
/**
 * Creates a new server.
 *
 * @param port            a listening port
 * @param processor       a processor
 * @param executorService an executor service
 * @throws TTransportException
 */
public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService)
        throws TTransportException {
    super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
                  .workerThreads(MAX_WORKER_THREADS)
                  .selectorThreads(MAX_SELECTOR_THREADS)
                  .acceptQueueSizePerThread(ACCEPT_QUEUE_LEN)
                  .executorService(executorService)
                  .processor(processor));
}
项目:athena    文件:Bmv2ControlPlaneThriftServer.java   
@Override
protected void joinThreads() throws InterruptedException {
    // Wait until the io threads exit.
    acceptThread.join();
    for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) {
        thread.join();
    }
}
项目:trpc    文件:ThriftServerPublisher.java   
public void init() {
    try {
        TMultiplexedProcessor processor = new TMultiplexedProcessor();
        for (ServiceArgs service : serverArgs.getServices()) {
            String className = service.getService();
            if (className.endsWith("$Processor")) {
                className = className.substring(0, className.indexOf("$Processor"));
            }
            processor.registerProcessor(className, service.getProcessor());
        }
        if (serverArgs.getNettyServerArgs() != null) {
            this.server = new TNettyServer(serverArgs.getNettyServerArgs().ip(serverArgs.getHost()).port(serverArgs.getPort()));
        } else {
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(new InetSocketAddress(serverArgs.getHost(), serverArgs.getPort()));
            //异步IO,需要使用TFramedTransport,它将分块缓存读取。
            TTransportFactory transportFactory = new TFramedTransport.Factory();
            //使用高密度二进制协议
            TProtocolFactory proFactory = new TBinaryProtocol.Factory();
            // Use this for a multithreaded key
            this.server = new TThreadedSelectorServer(new
                    TThreadedSelectorServer.Args(serverTransport)
                    .transportFactory(transportFactory)
                    .protocolFactory(proFactory)
                    .processor(processor)
            );
        }
        log.info("Starting the Thrift key...");
        this.server.setServerEventHandler(new TrpcRegistryEventHandler(serverArgs));
        this.server.serve();
        if (this.serverArgs.getNettyServerArgs() != null) {
            ((TNettyServer) this.server).waitForClose();
        }
    } catch (Exception e) {
        log.error("publish thrift key error", e);
    }
}
项目:ikasoa    文件:NonblockingThriftServerImpl.java   
/**
 * 初始化Thrift服务
 * 
 * @param serverTransport
 *            服务传输类型
 */
@Override
protected void initServer(TServerTransport serverTransport) {
    ThriftServerConfiguration configuration = getThriftServerConfiguration();
    // 使用多线程半同步半异步方式
    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args((TNonblockingServerSocket) serverTransport)
            .transportFactory(configuration.getTransportFactory())
            .protocolFactory(configuration.getProtocolFactory());
    if (configuration.getExecutorService() != null)
        args.executorService(configuration.getExecutorService());
    server = new TThreadedSelectorServer(
            configuration.getServerArgsAspect().TThreadedSelectorServerArgsAspect(args).processor(getProcessor()));
    if (configuration.getServerEventHandler() != null)
        server.setServerEventHandler(configuration.getServerEventHandler());
}
项目:ensemble    文件:ThriftServer.java   
/** 
 * Entry point for ensemble.
 */
public static void setup(Controller controller)
        throws TTransportException, IOException, InterruptedException { 
    connectToCMD();
    TProcessor proc = new LucidaService.AsyncProcessor(
            new ThriftHandler.AsyncThriftHandler(controller));
    TNonblockingServerTransport transport = new TNonblockingServerSocket(9090);
    TThreadedSelectorServer.Args arguments = new TThreadedSelectorServer.Args(transport)
    .processor(proc)    
    .protocolFactory(new TBinaryProtocol.Factory())
    .transportFactory(new TFramedTransport.Factory());
    final TThreadedSelectorServer server = new TThreadedSelectorServer(arguments);
    System.out.println("ensemble at port 9090");
    server.serve();
}
项目:galen-api-ports    文件:GalenApiServer.java   
public static void runService(GalenApiRemoteService.Processor processor, int serverPort) {
    try {
        TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(serverPort);
        TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
        TServer server = new TThreadedSelectorServer(args.processor(processor));
        server.serve();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
项目:diqube    文件:ServerImplementation.java   
public void serve() {
  TThreadedSelectorServer.Args serverArgs = createServerArgs();

  if (serverArgs == null)
    return;

  server = new ThriftServer(serverArgs, "server-selector-%d", servingListeners);

  // Make sure we at least try to clean up a little bit when the VM is shut down.
  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    @Override
    public void run() {
      logger.info("Shutting down server...");
      if (gracefulShutdownListeners != null) {
        logger.debug("Executing graceful shutdown listeners...");
        new ShutdownUtil(applicationContext).callShutdownListeners();
      }
      logger.debug("Stopping all local beans...");
      applicationContext.close();
      logger.debug("Stopping thrift server...");
      server.stop();
      logger.debug("Shutting down everything of remaining queries...");
      executorManager.shutdownEverythingOfAllQueries();
      logger.info("Shutdown complete.");
    }
  }, "shutdown-thread"));

  if ("".equals(bind))
    logger.info("Listening for incoming requests on port {}...", port);
  else
    logger.info("Listening for incoming requests on port {} (bound to {})...", port, bind);
  server.serve();
}
项目:spring-thrift-service-manager    文件:ServiceThreadPoolWrapper.java   
@Override
protected TServer getServer(TProcessor processor) throws TTransportException {
    LOGGER.debug("Setting Server with {} selector threads and {} worker threads", numSelectorThreads, numWorkerThreads);

    TNonblockingServerSocket transport = new TNonblockingServerSocket(remotePort);

    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport);
    args.transportFactory(new TFramedTransport.Factory());
    args.protocolFactory(new TBinaryProtocol.Factory());
    args.processor(processor);
    args.selectorThreads(numSelectorThreads);
    args.workerThreads(numWorkerThreads);

    return new TThreadedSelectorServer(args);
}
项目:ikasoa    文件:ServerArgsAspect.java   
public TThreadedSelectorServer.Args TThreadedSelectorServerArgsAspect(TThreadedSelectorServer.Args args) {
    return args;
}
项目:pinpoint    文件:ThriftThreadedSelectorServerAsyncIT.java   
@Override
protected EchoTestServer<TThreadedSelectorServer> createEchoServer(TestEnvironment environment)
        throws TTransportException {
    return AsyncEchoTestServerFactory.threadedSelectorServer(environment);
}
项目:pinpoint    文件:ThriftThreadedSelectorServerIT.java   
@Override
protected EchoTestServer<TThreadedSelectorServer> createEchoServer(TestEnvironment environment)
        throws TTransportException {
    return SyncEchoTestServerFactory.threadedSelectorServer(environment);
}
项目:ddth-commons    文件:ThriftUtils.java   
/**
 * Helper method to create a new framed-transport, threaded-selector
 * {@link TServer}.
 * 
 * <p>
 * Note: if {@code numSelectorThreads < 1}, the {@link TServer} is created
 * with 2 selector threads.
 * </p>
 * 
 * <p>
 * Note: if {@code numWorkerThreads < 1}, the {@link TServer} is created
 * with 8 worker threads.
 * </p>
 * 
 * @param processorFactory
 * @param protocolFactory
 * @param port
 * @param numSelectorThreads
 * @param numWorkerThreads
 * @param clientTimeoutMillisecs
 * @param maxFrameSize
 * @param maxReadBufferSize
 * @return
 * @throws TTransportException
 */
public static TServer createThreadedSelectorServer(TProcessorFactory processorFactory,
        TProtocolFactory protocolFactory, int port, int numSelectorThreads,
        int numWorkerThreads, int clientTimeoutMillisecs, int maxFrameSize,
        long maxReadBufferSize) throws TTransportException {
    if (numSelectorThreads < 1) {
        numSelectorThreads = 2;
    }
    if (numWorkerThreads < 1) {
        numWorkerThreads = 8;
    }
    TNonblockingServerTransport transport = new TNonblockingServerSocket(port,
            clientTimeoutMillisecs);
    TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize);
    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
            .processorFactory(processorFactory).protocolFactory(protocolFactory)
            .transportFactory(transportFactory).workerThreads(numWorkerThreads)
            .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(10000)
            .selectorThreads(numSelectorThreads);
    args.maxReadBufferBytes = maxReadBufferSize;
    TThreadedSelectorServer server = new TThreadedSelectorServer(args);
    return server;
}
项目:queue-server    文件:ThriftServerUtils.java   
/**
 * Creates a {@link TThreadedSelectorServer} server.
 * 
 * <p>
 * Similar to {@link THsHaServer} but its use 2 thread pools: one for
 * handling network I/O (e.g. accepting client connections), one for
 * handling message such like {@link THsHaServer}.
 * </p>
 * 
 * @param processorFactory
 * @param protocolFactory
 * @param port
 *            port number on which the Thrift server will listen
 * @param clientTimeoutMillisecs
 * @param maxFrameSize
 *            max size (in bytes) of a transport frame, supply {@code <=0}
 *            value to let the method choose a default {@code maxFrameSize}
 *            value (which is 1Mb)
 * @param maxReadBufferSize
 *            max size (in bytes) of read buffer, supply {@code <=0} value
 *            to let the method choose a default {@code maxReadBufferSize}
 *            value (which is 16Mb)
 * @param numSelectorThreads
 *            number of selector threads, supply {@code <=0} value to let
 *            the method choose a default {@code numSelectorThreads} value
 *            (which is {@code 2} )
 * @param numWorkerThreads
 *            number of worker threads, supply {@code <=0} value to let the
 *            method choose a default {@code numWorkerThreads} value (which
 *            is
 *            {@code Math.max(4, Runtime.getRuntime().availableProcessors())}
 *            )
 * @return
 * @throws TTransportException
 */
public static TThreadedSelectorServer createThreadedSelectorServer(
        TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port,
        int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize,
        int numSelectorThreads, int numWorkerThreads) throws TTransportException {
    if (clientTimeoutMillisecs <= 0) {
        clientTimeoutMillisecs = DEFAULT_CLIENT_TIMEOUT_MS;
    }
    if (maxFrameSize <= 0) {
        maxFrameSize = DEFAULT_MAX_FRAMESIZE;
    }
    if (maxReadBufferSize <= 0) {
        maxReadBufferSize = DEFAULT_TOTAL_MAX_READ_BUFFERSIZE;
    }
    if (numSelectorThreads <= 0) {
        numSelectorThreads = DEFAULT_NUM_SELECTOR_THREADS;
    }
    if (numWorkerThreads <= 0) {
        numWorkerThreads = DEFAULT_NUM_WORKER_THREADS;
    }

    TNonblockingServerTransport transport = new TNonblockingServerSocket(port,
            clientTimeoutMillisecs);
    TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize);
    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
            .processorFactory(processorFactory).protocolFactory(protocolFactory)
            .transportFactory(transportFactory).workerThreads(numWorkerThreads)
            .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(100000)
            .selectorThreads(numSelectorThreads);
    args.maxReadBufferBytes = maxReadBufferSize;
    TThreadedSelectorServer server = new TThreadedSelectorServer(args);
    return server;
}
项目:id-server    文件:ThriftServerUtils.java   
/**
 * Creates a {@link TThreadedSelectorServer} server.
 * 
 * <p>
 * Similar to {@link THsHaServer} but its use 2 thread pools: one for
 * handling network I/O (e.g. accepting client connections), one for
 * handling message such like {@link THsHaServer}.
 * </p>
 * 
 * @param processorFactory
 * @param protocolFactory
 * @param port
 *            port number on which the Thrift server will listen
 * @param clientTimeoutMillisecs
 * @param maxFrameSize
 *            max size (in bytes) of a transport frame, supply {@code <=0}
 *            value to let the method choose a default {@code maxFrameSize}
 *            value (which is 1Mb)
 * @param maxReadBufferSize
 *            max size (in bytes) of read buffer, supply {@code <=0} value
 *            to let the method choose a default {@code maxReadBufferSize}
 *            value (which is 16Mb)
 * @param numSelectorThreads
 *            number of selector threads, supply {@code <=0} value to let
 *            the method choose a default {@code numSelectorThreads} value
 *            (which is {@code 2} )
 * @param numWorkerThreads
 *            number of worker threads, supply {@code <=0} value to let the
 *            method choose a default {@code numWorkerThreads} value (which
 *            is
 *            {@code Math.max(2, Runtime.getRuntime().availableProcessors())}
 *            )
 * @return
 * @throws TTransportException
 */
public static TThreadedSelectorServer createThreadedSelectorServer(
        TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port,
        int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize,
        int numSelectorThreads, int numWorkerThreads) throws TTransportException {
    if (clientTimeoutMillisecs <= 0) {
        clientTimeoutMillisecs = DEFAULT_CLIENT_TIMEOUT_MS;
    }
    if (maxFrameSize <= 0) {
        maxFrameSize = DEFAULT_MAX_FRAMESIZE;
    }
    if (maxReadBufferSize <= 0) {
        maxReadBufferSize = DEFAULT_TOTAL_MAX_READ_BUFFERSIZE;
    }
    if (numSelectorThreads <= 0) {
        numSelectorThreads = DEFAULT_NUM_SELECTOR_THREADS;
    }
    if (numWorkerThreads <= 0) {
        numWorkerThreads = DEFAULT_NUM_WORKER_THREADS;
    }

    TNonblockingServerTransport transport = new TNonblockingServerSocket(port,
            clientTimeoutMillisecs);
    TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize);
    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
            .processorFactory(processorFactory).protocolFactory(protocolFactory)
            .transportFactory(transportFactory).workerThreads(numWorkerThreads)
            .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(100000)
            .selectorThreads(numSelectorThreads);
    args.maxReadBufferBytes = maxReadBufferSize;
    TThreadedSelectorServer server = new TThreadedSelectorServer(args);
    return server;
}