Java 类java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy 实例源码

项目:openjdk-jdk10    文件:ThreadPoolExecutorTest.java   
/** Directly test simple ThreadPoolExecutor RejectedExecutionHandlers. */
public void testStandardRejectedExecutionHandlers() {
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(1, 1, 1, SECONDS,
                               new ArrayBlockingQueue<Runnable>(1));
    final AtomicReference<Thread> thread = new AtomicReference<>();
    final Runnable r = new Runnable() { public void run() {
        thread.set(Thread.currentThread()); }};

    try {
        new AbortPolicy().rejectedExecution(r, p);
        shouldThrow();
    } catch (RejectedExecutionException success) {}
    assertNull(thread.get());

    new DiscardPolicy().rejectedExecution(r, p);
    assertNull(thread.get());

    new CallerRunsPolicy().rejectedExecution(r, p);
    assertSame(Thread.currentThread(), thread.get());

    // check that pool was not perturbed by handlers
    assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy);
    assertEquals(0, p.getTaskCount());
    assertTrue(p.getQueue().isEmpty());
}
项目:RekognitionS3Batch    文件:Processor.java   
public Processor(ProcessorConfig config) {
    ProfileCredentialsProvider creds = new ProfileCredentialsProvider(config.profile());
    creds.getCredentials(); // credible credential criteria

    if (config.disableCerts())
        System.setProperty("com.amazonaws.sdk.disableCertChecking", "true");

    // Rekognition init
    rek = new AmazonRekognitionClient(creds);
    if (config.endpointOverride())
        rek.setEndpoint(config.endpoint());
    minConfidence = Integer.parseInt(config.confidence());


    // The SQS queue to find jobs on
    sqs = new AmazonSQSClient(creds);
    queueUrl = sqs.createQueue(config.queue()).getQueueUrl();

    // Processors
    if (config.wantCloudSearch())
        processors.add(new CloudSearchIndexer(creds, config.cloudSearch()));
    if (config.wantDynamo())
        processors.add(new DynamoWriter(creds, config.dynamo()));
    if (config.wantTags3())
        processors.add(new S3ObjectTagger(creds, config.tagPrefix()));

    // Executor Service
    int maxWorkers = Integer.parseInt(config.concurrency());
    executor = new ThreadPoolExecutor(
        1, maxWorkers, 30, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(maxWorkers * 2, false),
        new CallerRunsPolicy() // prevents backing up too many jobs
    );

    maxImagesToProcess = Long.parseLong(config.max());
}
项目:BJAF3.x    文件:ThroughputPipe.java   
/**
 * 流量管道(流量控制器)
 * 
 * @param pipeSize
 *            --管道大小
 * @param poolSize
 *            --流量消费线程池大小
 * @param handler
 *            --消费接口实现
 * @param stepOfDispatcher
 *            --流量消费指派速度参数,单位ms毫秒。例如,每秒消费1个请求,则值为1000ms<br>
 *            特别地,值为0时,指派速度没有限制,管道流量消费速度只依赖于线程池的大小及handler计算速度。
 */
public ThroughputPipe(int pipeSize, int poolSize,
        IConsumeHandler<T> handler, long stepOfDispatcher) {
    if (pipeSize <= 0 || poolSize <= 0) {
        throw new RuntimeException("size's value must >0!");
    }
    if (null == handler) {
        throw new RuntimeException("handle can't be null!");
    }
    this.queue = new LinkedBlockingQueue<T>(pipeSize);
    this.handler = handler;
    tpe = new ThreadPoolExecutor(poolSize, poolSize, livetime,
            TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
            new CallerRunsPolicy());
    this.stepOfDispatcher = stepOfDispatcher;
}
项目:async-event    文件:Workbench.java   
/**
 * @throws java.io.IOException
 */
private void initWorkbench(String configFile) throws IOException {
    if (initialized) {
        return;
    }
    this.stressStrategy = getStressStrategy(configFile);
    this.config = stressStrategy.getStressConfig();
    if (this.config == null) {
        throw new NullPointerException("未配置压力测试参数!");
    }
    servicePool = new ThreadPoolExecutor(config.getThreadNum(), config.getThreadNum(), 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new CallerRunsPolicy());
    finish = new CountDownLatch(config.getThreadNum());

    scheduledThreadPool = Executors.newScheduledThreadPool(1);
    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
        public void run() {
            statistic();
        }
    }, config.getStatPeriod(), config.getStatPeriod(), TimeUnit.MILLISECONDS);

    if (!"".equals(config.getOutputFileName())) {
        resultWriter = new FileWriter(config.getOutputFileName());
    }
    this.initialized = true;// 初始化完成
}
项目:nexus-public    文件:EventExecutor.java   
/**
 * Move from direct to asynchronous subscriber processing.
 */
@Override
protected void doStart() throws Exception {
  // direct hand-off used! Host pool will use caller thread to execute async subscribers when pool full!
  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
      0,
      HOST_THREAD_POOL_SIZE,
      60L,
      TimeUnit.SECONDS,
      new SynchronousQueue<>(),
      new NexusThreadFactory("event", "event-manager"),
      new CallerRunsPolicy()
  );

  // begin distributing events in truly asynchronous fashion
  delegate = NexusExecutorService.forCurrentSubject(threadPool);
}
项目:coco    文件:ThreadPoolHolder.java   
public static ExecutorService getFixedPool(String poolName, Integer min, Integer max, Integer queueSize,
        Long aliveTimeInsecond) {
    if (threadPoolHolder == null) {
        synchronized (ThreadPoolHolder.class) {
            if (threadPoolHolder == null) {
                threadPoolHolder = new ThreadPoolHolder();
                map = Maps.newHashMap();
                lock = new ReentrantLock();
            }
        }
    }

    ExecutorService service = map.get(poolName);
    if (service == null) {
        lock.lock();
        if (service == null) {
            try {
                int min2 = Runtime.getRuntime().availableProcessors();
                int max2 = min2 * 2 + 2;
                if (min == null || min < min2) {
                    min = min2;
                }
                if (max == null || max < max2) {
                    max = max2;
                }
                queueSize = queueSize == null ? 100 : queueSize;
                aliveTimeInsecond = aliveTimeInsecond == null ? 100 : aliveTimeInsecond;
                ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").build();
                service = new ThreadPoolExecutor(min, max, aliveTimeInsecond, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(queueSize), threadFactory, new CallerRunsPolicy());
                map.put(poolName, service);
            } finally {
                lock.unlock();
            }
        }

    }
    return service;
}
项目:coco    文件:ThreadPoolHolder.java   
public static ExecutorService getFixedPool(String poolName, Integer min, Integer max) {
    if (threadPoolHolder == null) {
        synchronized (ThreadPoolHolder.class) {
            if (threadPoolHolder == null) {
                threadPoolHolder = new ThreadPoolHolder();
                map = Maps.newHashMap();
                lock = new ReentrantLock();
            }
        }
    }
    ExecutorService service = map.get(poolName);
    if (service == null) {
        lock.lock();
        if (service == null) {
            try {
                int min2 = Runtime.getRuntime().availableProcessors();
                int max2 = min2 * 2 + 1;
                if (min == null || min < min2) {
                    min = min2;
                }
                if (max == null || max < max2) {
                    max = max2;
                }
                ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").build();
                service = new ThreadPoolExecutor(min, max, 100, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(100), threadFactory, new CallerRunsPolicy());
                map.put(poolName, service);
            } finally {
                lock.unlock();
            }
        }

    }
    return service;
}
项目:BJAF3.x    文件:ServiceServer.java   
public ServiceServer(int port) {
    this.port = port;
    this.channelGroup = new DefaultChannelGroup();
    bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
            Executors.newCachedThreadPool(new NamedThreadFactory(
                    "ServiceServer-bossExecutor-", false)),
            Executors.newCachedThreadPool(new NamedThreadFactory(
                    "ServiceServer-workerExecutor-", true))));
    bootstrap.setOption("tcpNoDelay", Boolean.parseBoolean(AppProperties
            .get("rpc_server_tcpNoDelay", "true")));
    bootstrap.setOption("reuseAddress", Boolean.parseBoolean(AppProperties
            .get("rpc_server_reuseAddress", "true")));
    String c1 = AppProperties.get("rpc_server_child_tcpNoDelay");
    if (c1 != null && c1.trim().length() > 0) {
        bootstrap.setOption("child.tcpNoDelay", Boolean.parseBoolean(c1));
    }
    c1 = AppProperties.get("rpc_server_child_receiveBufferSize");
    if (c1 != null && c1.trim().length() > 0) {
        bootstrap
                .setOption("child.receiveBufferSize", Integer.parseInt(c1));
    }
    this.taskThreadPool = new TaskThreadPool(AppProperties.getAsInt(
            "rpc_server_workThreadPool_coreSize", 50),
            AppProperties
                    .getAsInt("rpc_server_workThreadPool_MaxSize", 200),
            AppProperties.getAsInt(
                    "rpc_server_workThreadPool_keepAliveTime",
                    60 * 1000 * 5), true, new CallerRunsPolicy());
}
项目:BJAF3.x    文件:AsynchroCallImp.java   
private AsynchroCallImp() {
    String size = AppProperties.get("command_asynchro_call_queue_size");
    if (size == null || size.length() == 0) {
        size = "10";
    }
    int size2 = Integer.parseInt(size.trim());
    wtp = new TaskThreadPool(size2, size2, 5 * 1000 * 60, true,
            new CallerRunsPolicy());
    cmdQueue = new BlockQueue();
    consumer = new Consumer("AsynchroCallConsumer");
    consumer.start();
}
项目:BJAF3.x    文件:Publisher.java   
private Publisher(int count, Map <String, SubWorker>  docSubWorkerMap) {
    this.pubWorkerAmount = 0;
    this.docQueue = new BlockQueue();
    this.docSubWorkerMap = docSubWorkerMap;
    this.re = new TaskExecutor(new TaskThreadPool(count, count,
            5 * 1000 * 60, false, new CallerRunsPolicy()));
}
项目:BJAF3.x    文件:Subscriber.java   
private Subscriber(int count, IQueue publisherDocqueue, Map<String, SubWorker> docSubWorkerMap) {
    this.concurrentCounter = new Counter(0);
    this.docSubWorkerMap = docSubWorkerMap;
    this.wkpool = new TaskThreadPool(count, count, 5 * 1000 * 60, false,
            new CallerRunsPolicy());
    dp = new Dispatcher(this.wkpool, this.concurrentCounter,
            publisherDocqueue, this.docSubWorkerMap);
}
项目:hawkbit    文件:ExecutorAutoConfiguration.java   
/**
 * @return the executor for UI background processes.
 */
@Bean(name = "uiExecutor")
@ConditionalOnMissingBean(name = "uiExecutor")
public Executor uiExecutor() {
    final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(20);
    final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS,
            blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build());
    threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return new DelegatingSecurityContextExecutor(threadPoolExecutor);
}
项目:entity-essentials    文件:EventBus.java   
public EventBus(Context context)
{
    myContext = context;
    myEventDispatcherFactory = new EventDispatcherFactoryImpl();
    mySubscriptionFactory = new SubscriptionFactoryImpl();
    myBookkeeper = new SubscriptionBookkeeperImpl();
    myWorkerPool = new ThreadPoolExecutor(4, 4, 0L, SECONDS, new ArrayBlockingQueue<Runnable>(256), new EventBusThreadFactory(), new CallerRunsPolicy());
}
项目:asteria-3.0    文件:ServiceQueue.java   
/**
 * Creates and configures a new {@link ScheduledExecutorService} with a
 * timeout value of {@code seconds}. If the timeout value is below or equal
 * to zero then the returned executor will never timeout.
 *
 * @return the newly created and configured executor service.
 */
private static ScheduledExecutorService createServiceExecutor(long seconds) {
    Preconditions.checkArgument(seconds >= 0, "The timeout value must be equal to or greater than 0!");
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    executor.setRejectedExecutionHandler(new CallerRunsPolicy());
    executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("ServiceQueueThread").build());
    if (seconds > 0) {
        executor.setKeepAliveTime(seconds, TimeUnit.SECONDS);
        executor.allowCoreThreadTimeOut(true);
    }
    return executor;
}
项目:killbill-analytics-plugin    文件:BusinessExecutor.java   
public static ExecutorService newCachedThreadPool(final int nbThreads, final String name) {
    // Note: we don't use the default rejection handler here (AbortPolicy) as we always want the tasks to be executed
    return Executors.newCachedThreadPool(0,
                                         nbThreads,
                                         name,
                                         60L,
                                         TimeUnit.SECONDS,
                                         new CallerRunsPolicy());

}
项目:logdb    文件:ParallelMergeSorter.java   
public ParallelMergeSorter(Comparator<Item> comparator, int runLength, int memoryRunCount) {
    this.runLength = runLength;
    this.comparator = comparator;
    this.buffer = new LinkedList<Item>();
    this.runIndexer = new AtomicInteger();
    this.executor = new ThreadPoolExecutor(8, 8, 10, TimeUnit.SECONDS, new LimitedQueue<Runnable>(8), new NamedThreadFactory(
            "Sort Worker"), new CallerRunsPolicy());

    this.cacheCount = new AtomicInteger(memoryRunCount);
}
项目:logdb    文件:StreamingResultEncoder.java   
public StreamingResultEncoder(String name, int poolSize) {
    if (poolSize < 1)
        throw new IllegalArgumentException("pool size should be positive");

    this.poolSize = poolSize;
    this.executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
            poolSize), new NamedThreadFactory(name), new CallerRunsPolicy());

    slog.info("araqne logdb: created encoder thread pool [{}]", poolSize);
}
项目:logdb    文件:StreamingResultEncoder.java   
public StreamingResultEncoder(String name, int poolSize) {
    if (poolSize < 1)
        throw new IllegalArgumentException("pool size should be positive");

    this.poolSize = poolSize;
    this.executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
            poolSize), new NamedThreadFactory(name), new CallerRunsPolicy());

    slog.debug("araqne logdb: created encoder thread pool [{}]", poolSize);
}
项目:adaptive-executor-java    文件:ExecutorFactory.java   
ExecutorService create(int corePoolSize, int maxPoolThread, Duration keepAliveTime,
BlockingQueue<Runnable> workQueue, CallerRunsPolicy policy);
项目:invesdwin-util    文件:Executors.java   
public static WrappedExecutorService newFixedCallerRunsThreadPool(final String name, final int nThreads) {
    final java.util.concurrent.ThreadPoolExecutor ex = new java.util.concurrent.ThreadPoolExecutor(nThreads,
            nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(nThreads),
            newFastThreadLocalThreadFactory(name), new CallerRunsPolicy());
    return new WrappedExecutorService(ex, name);
}
项目:KeyedThreadPool    文件:KeyedThreadPoolExecutor.java   
public KeyedThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    this(corePoolSize, threadFactory, new CallerRunsPolicy());
}
项目:staging-client-java    文件:IntegrationUtils.java   
public static IntegrationResult processSchemaSelection(final Staging staging, String fileName, InputStream is) throws IOException, InterruptedException {
    // initialize the threads pool (don't use more than 9 threads)
    int n = Math.min(9, Runtime.getRuntime().availableProcessors() + 1);
    ExecutorService pool = new ThreadPoolExecutor(n, n, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), new CallerRunsPolicy());

    Stopwatch stopwatch = Stopwatch.create();

    AtomicInteger processedCases = new AtomicInteger(0);
    final AtomicInteger failedCases = new AtomicInteger(0);

    System.out.println("Starting schema selection tests from " + fileName + " [" + n + " threads]");

    LineNumberReader reader = new LineNumberReader(new InputStreamReader(is, "UTF-8"));

    // loop over each line in the file
    String line = reader.readLine();
    while (line != null) {
        if (!line.startsWith("#")) {
            processedCases.getAndIncrement();

            // split the string; important to keep empty trailing values in the resulting array
            String[] parts = Arrays.stream(line.split(",", -1)).map(String::trim).toArray(String[]::new);

            if (parts.length != 4)
                throw new IllegalStateException("Bad record in schema_selection.txt on line number" + reader.getLineNumber());

            final int lineNum = reader.getLineNumber();
            final String fullLine = line;

            pool.submit(() -> {
                try {
                    SchemaLookup lookup = new SchemaLookup(parts[0], parts[1]);
                    lookup.setInput(CsStagingData.SSF25_KEY, parts[2]);

                    List<StagingSchema> lookups = staging.lookupSchema(lookup);
                    if (parts[3].length() == 0) {
                        if (lookups.size() == 1) {
                            System.out.println("Line #" + lineNum + " [" + fullLine + "] --> The schema selection should not have found any schema but did: " + lookups.get(0).getId());
                            failedCases.getAndIncrement();
                        }
                    }
                    else {
                        if (lookups.size() != 1) {
                            System.out.println("Line #" + lineNum + " [" + fullLine + "] --> The schema selection should have found a schema, " + parts[3] + ", but did not.");
                            failedCases.getAndIncrement();
                        }
                        else if (!Objects.equals(lookups.get(0).getId(), parts[3])) {
                            System.out.println(
                                    "Line #" + lineNum + " [" + fullLine + "] --> The schema selection found schema " + lookups.get(0).getId() + " but it should have found " + parts[3] + ".");
                            failedCases.getAndIncrement();
                        }
                    }
                }
                catch (Throwable t) {
                    if (failedCases.get() == 0)
                        System.out.println("Line #" + lineNum + " --> Exception processing schema selection: " + t.getMessage());
                    failedCases.getAndIncrement();
                }

                return null;
            });
        }

        line = reader.readLine();
    }

    pool.shutdown();
    pool.awaitTermination(30, TimeUnit.SECONDS);

    stopwatch.stop();
    String perMs = String.format("%.3f", ((float)stopwatch.elapsed(TimeUnit.MILLISECONDS) / processedCases.get()));
    System.out.print("Completed " + NumberFormat.getNumberInstance(Locale.US).format(processedCases.get()) + " cases in " + stopwatch + " (" + perMs + "ms/case).");
    if (failedCases.get() > 0)
        System.out.println("There were " + NumberFormat.getNumberInstance(Locale.US).format(failedCases.get()) + " failures.");
    else
        System.out.println();

    return new IntegrationResult(processedCases.get(), failedCases.get());
}
项目:killbill-analytics-plugin    文件:BusinessExecutor.java   
public static ScheduledExecutorService newSingleThreadScheduledExecutor(final String name) {
    return Executors.newSingleThreadScheduledExecutor(name,
                                                      new CallerRunsPolicy());
}
项目:logdb    文件:StreamingResultDecoder.java   
public StreamingResultDecoder(String name, int poolSize) {
    executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(poolSize),
            new NamedThreadFactory(name), new CallerRunsPolicy());
}
项目:logdb    文件:StreamingResultDecoder.java   
public StreamingResultDecoder(String name, int poolSize) {
    executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(poolSize),
            new NamedThreadFactory(name), new CallerRunsPolicy());
}
项目:asteria-3.0    文件:GameSyncExecutor.java   
/**
 * Creates and configures the update service for this game sync executor.
 * The returned executor is <b>unconfigurable</b> meaning it's configuration
 * can no longer be modified.
 * 
 * @param nThreads
 *            the amount of threads to create this service.
 * @return the newly created and configured service.
 */
private ExecutorService create(int nThreads) {
    if (nThreads <= 1)
        return null;
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
    executor.setRejectedExecutionHandler(new CallerRunsPolicy());
    executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("GameSyncThread").build());
    return Executors.unconfigurableExecutorService(executor);
}
项目:components-ness-executors    文件:ThreadPoolConfiguration.java   
@Override
RejectedExecutionHandler getHandler() {
    return new ThreadPoolExecutor.CallerRunsPolicy();
}