public static TServer createThreadedSelectorServer(TProcessorFactory processorFactory, int port, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize) throws TTransportException { int numThreads = Math.max(2, Runtime.getRuntime().availableProcessors()); int selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2); TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(10000) .selectorThreads(selectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }
/** * Helper method to create a new framed-transport, threaded-selector * {@link TServer}. * * <p> * Note: if {@code numSelectorThreads < 1}, the {@link TServer} is created * with 2 selector threads. * </p> * * <p> * Note: if {@code numWorkerThreads < 1}, the {@link TServer} is created * with 8 worker threads. * </p> * * @param processorFactory * @param protocolFactory * @param port * @param numSelectorThreads * @param numWorkerThreads * @param clientTimeoutMillisecs * @param maxFrameSize * @param maxReadBufferSize * @return * @throws TTransportException */ public static TServer createThreadedSelectorServer(TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port, int numSelectorThreads, int numWorkerThreads, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize) throws TTransportException { if (numSelectorThreads < 1) { numSelectorThreads = 2; } if (numWorkerThreads < 1) { numWorkerThreads = 8; } TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numWorkerThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(10000) .selectorThreads(numSelectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }
/** * Creates a {@link TThreadedSelectorServer} server. * * <p> * Similar to {@link THsHaServer} but its use 2 thread pools: one for * handling network I/O (e.g. accepting client connections), one for * handling message such like {@link THsHaServer}. * </p> * * @param processorFactory * @param protocolFactory * @param port * port number on which the Thrift server will listen * @param clientTimeoutMillisecs * @param maxFrameSize * max size (in bytes) of a transport frame, supply {@code <=0} * value to let the method choose a default {@code maxFrameSize} * value (which is 1Mb) * @param maxReadBufferSize * max size (in bytes) of read buffer, supply {@code <=0} value * to let the method choose a default {@code maxReadBufferSize} * value (which is 16Mb) * @param numSelectorThreads * number of selector threads, supply {@code <=0} value to let * the method choose a default {@code numSelectorThreads} value * (which is {@code 2} ) * @param numWorkerThreads * number of worker threads, supply {@code <=0} value to let the * method choose a default {@code numWorkerThreads} value (which * is * {@code Math.max(4, Runtime.getRuntime().availableProcessors())} * ) * @return * @throws TTransportException */ public static TThreadedSelectorServer createThreadedSelectorServer( TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize, int numSelectorThreads, int numWorkerThreads) throws TTransportException { if (clientTimeoutMillisecs <= 0) { clientTimeoutMillisecs = DEFAULT_CLIENT_TIMEOUT_MS; } if (maxFrameSize <= 0) { maxFrameSize = DEFAULT_MAX_FRAMESIZE; } if (maxReadBufferSize <= 0) { maxReadBufferSize = DEFAULT_TOTAL_MAX_READ_BUFFERSIZE; } if (numSelectorThreads <= 0) { numSelectorThreads = DEFAULT_NUM_SELECTOR_THREADS; } if (numWorkerThreads <= 0) { numWorkerThreads = DEFAULT_NUM_WORKER_THREADS; } TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numWorkerThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(100000) .selectorThreads(numSelectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }
/** * Creates a {@link TThreadedSelectorServer} server. * * <p> * Similar to {@link THsHaServer} but its use 2 thread pools: one for * handling network I/O (e.g. accepting client connections), one for * handling message such like {@link THsHaServer}. * </p> * * @param processorFactory * @param protocolFactory * @param port * port number on which the Thrift server will listen * @param clientTimeoutMillisecs * @param maxFrameSize * max size (in bytes) of a transport frame, supply {@code <=0} * value to let the method choose a default {@code maxFrameSize} * value (which is 1Mb) * @param maxReadBufferSize * max size (in bytes) of read buffer, supply {@code <=0} value * to let the method choose a default {@code maxReadBufferSize} * value (which is 16Mb) * @param numSelectorThreads * number of selector threads, supply {@code <=0} value to let * the method choose a default {@code numSelectorThreads} value * (which is {@code 2} ) * @param numWorkerThreads * number of worker threads, supply {@code <=0} value to let the * method choose a default {@code numWorkerThreads} value (which * is * {@code Math.max(2, Runtime.getRuntime().availableProcessors())} * ) * @return * @throws TTransportException */ public static TThreadedSelectorServer createThreadedSelectorServer( TProcessorFactory processorFactory, TProtocolFactory protocolFactory, int port, int clientTimeoutMillisecs, int maxFrameSize, long maxReadBufferSize, int numSelectorThreads, int numWorkerThreads) throws TTransportException { if (clientTimeoutMillisecs <= 0) { clientTimeoutMillisecs = DEFAULT_CLIENT_TIMEOUT_MS; } if (maxFrameSize <= 0) { maxFrameSize = DEFAULT_MAX_FRAMESIZE; } if (maxReadBufferSize <= 0) { maxReadBufferSize = DEFAULT_TOTAL_MAX_READ_BUFFERSIZE; } if (numSelectorThreads <= 0) { numSelectorThreads = DEFAULT_NUM_SELECTOR_THREADS; } if (numWorkerThreads <= 0) { numWorkerThreads = DEFAULT_NUM_WORKER_THREADS; } TNonblockingServerTransport transport = new TNonblockingServerSocket(port, clientTimeoutMillisecs); TTransportFactory transportFactory = new TFramedTransport.Factory(maxFrameSize); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .processorFactory(processorFactory).protocolFactory(protocolFactory) .transportFactory(transportFactory).workerThreads(numWorkerThreads) .acceptPolicy(AcceptPolicy.FAIR_ACCEPT).acceptQueueSizePerThread(100000) .selectorThreads(numSelectorThreads); args.maxReadBufferBytes = maxReadBufferSize; TThreadedSelectorServer server = new TThreadedSelectorServer(args); return server; }