ExecutorService-10个技巧和窍门


ExecutorService 自Java 5以来,抽象就已经存在。我们在这里谈论2004。提醒一下:Java 5和6不再受支持,Java 7 将不在半年之内。之所以提出这一点,是因为许多Java程序员仍然不完全了解其 ExecutorService 工作原理。有很多地方可以学习,今天,我想分享一些鲜为人知的功能和做法。但是,本文仍然针对中级程序员,没有什么特别高级的。

1.名称池线程 我不能强调这一点。转储正在运行的JVM的线程时或在调试过程中,默认线程池命名方案为 pool-N-thread-M,其中 N 代表池序列号(每次创建新的线程池时,全局 N 计数器都会增加),并且 M 是池中的线程序列号。例如, pool-2-thread-3 意味着在JVM生命周期中创建的第二个池中的第三个线程。请参阅:Executors.defaultThreadFactory()。描述性不强。JDK使正确命名线程变得有些复杂,因为命名策略隐藏在内部ThreadFactory。幸运的是,番石榴为此提供了一个帮助器类:

import com.google.common.util.concurrent.ThreadFactoryBuilder;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Orders-%d")
.setDaemon(true)
.build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

默认情况下,线程池会创建非守护线程,并决定是否适合您。

2.根据上下文切换名称 这是我从Supercharged jstack学到的技巧 :如何以100mph的速度调试服务器。一旦我们记住了线程名,我们就可以在运行时随时更改它们!这是有道理的,因为线程转储显示类和方法名称,而不显示参数和局部变量。通过调整线程名称以保留一些必要的事务标识符,我们可以轻松跟踪哪个消息/记录/查询/等。速度慢或导致死锁。例子:

private void process(String messageId) {
executorService.submit(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + messageId);
try {
//real logic here...
} finally {
currentThread.setName(oldName);
}
});
}

内部 try-finally 块当前线程被命名Processing-WHATEVER-MESSAGE-ID-IS。在跟踪通过系统的消息流时,这可能会派上用场。

3.明确且安全的关机 在客户端线程和线程池之间有一个任务队列。当您的应用程序关闭时,您必须注意两件事:排队任务正在发生的事情以及已运行的任务的行为方式(稍后会详细介绍)。令人惊讶的是,许多开发人员没有正确或有意识地关闭线程池。有两种技术:让所有排队的任务执行(shutdown())或放下它们(shutdownNow())-这完全取决于您的用例。例如,如果我们提交了一堆任务,并希望在所有任务完成后立即返回,请使用 shutdown():

private void sendAllEmails(List<String> emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
log.debug("All e-mails were sent so far? {}", done);
}

在这种情况下,我们发送了一堆电子邮件,每个电子邮件都是线程池中的一个单独任务。提交这些任务后,我们将关闭池,以使其不再接受任何新任务。然后,我们最多等待一分钟,直到所有这些任务完成。但是,如果某些任务仍在处理中,awaitTermination() 将简单地返回 false。此外,待处理的任务将继续处理。我知道赶时髦的人会去:

emails.parallelStream().forEach(this::sendEmail);

称我为老式,但我喜欢控制并行线程的数量。没关系,优美的替代方法shutdown()shutdownNow()

final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());

这次,所有排队的任务都将被丢弃并返回。已经运行的作业被允许继续。

4.小心处理中断 Future 接口鲜为人知的功能 是取消。与其重复自己,不如查看我的较早文章:InterruptedException和中断线程说明

5.监视队列长度并使其有界 大小不正确的线程池可能会导致运行缓慢,不稳定和内存泄漏。如果配置的线程太少,则会建立队列,从而消耗大量内存。另一方面,由于上下文切换过多,线程过多会减慢整个系统的速度,并导致相同的症状。重要的是要查看队列的深度并使其有界,以便过载的线程池只是暂时拒绝新任务:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);

上面的代码等效于Executors.newFixedThreadPool(n),但是LinkedBlockingQueue 我们使用 ArrayBlockingQueue的固定容量为,而不是默认的无限制 限制 100。这意味着,如果已将100个任务排队(并 n正在执行),则将使用拒绝新任务RejectedExecutionException。另外,由于 queue 现在可以从外部使用,因此我们可以定期调用 size()它并将其放入日志/ JMX /您使用的任何监视机制。

6.记住关于异常处理 以下代码段将产生什么结果?

executorService.submit(() -> {
System.out.println(1 / 0);
});

我被那太多次咬伤:它不会打印 任何东西。没有迹象 java.lang.ArithmeticException: / by zero,什么也没有。线程池只是吞没了这个异常,就好像它从未发生过一样。如果这是一个java.lang.Thread 从头开始创建的好东西,那就UncaughtExceptionHandler 行得通。但是对于线程池,您必须更加小心。如果您要提交 Runnable (没有任何结果,如上所示),则必须 用try-包围整个正文 catch ,至少要记录下来。如果您要提交 Callable<Integer>,请确保始终使用阻塞解除引用, get() 以重新引发异常:

final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();

有趣的是,即使是Spring框架也导致了此错误@Async,请参见: SPR-8995和 SPR-12090

7.监视队列中的等待时间 监视工作队列深度是一方面。但是,在对单个事务/任务进行故障排除时,值得一看的是在提交任务和实际执行之间经过了多少时间。此持续时间最好应接近0(当池中有一些空闲线程时),但是当必须将任务排队时,它将持续增长。此外,如果池中没有固定数量的线程,则运行新任务可能需要生成线程,这也消耗了很短的时间。为了干净地监视此指标,请ExecutorService 使用类似于以下内容的东西包装原件 :

public class WaitTimeMonitoringExecutorService implements ExecutorService {
private final ExecutorService target;
public WaitTimeMonitoringExecutorService(ExecutorService target) {
this.target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
final long startTime = System.currentTimeMillis();
return target.submit(() -> {
final long queueDuration = System.currentTimeMillis() - startTime;
log.debug("Task {} spent {}ms in queue", task, queueDuration);
return task.call();
}
);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}
@Override
public Future<?> submit(Runnable task) {
return submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
task.run();
return null;
}
});
}
//...
}

这不是一个完整的实现,但是您可以了解基本思想。从向线程池提交任务的那一刻起,我们立即开始计算时间。我们一接到任务便立即停止并开始执行。不要被 源代码startTime 和 queueDuration源代码的紧密接近所迷惑 。实际上,这两行是在不同的线程中进行评估的,可能相隔数毫秒甚至数秒,例如:

Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue

8.保留客户端堆栈跟踪 最近,反应式编程似乎引起了很多关注。 反应性清单, 反应性流, RxJava (刚刚发布1.0!), Clojure代理, scala.rx ……它们都很好用,但是堆栈跟踪不再是您的朋友,它们最多没有用。以提交给线程池的任务中发生的异常为例:

java.lang.NullPointerException: null
at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]

我们可以很容易地发现MyTask 在NPE的第76行上 扔了NPE。但是我们不知道是谁提交了此任务,因为堆栈跟踪仅显示 Thread 和ThreadPoolExecutor。从技术上讲,我们可以在源代码中导航,以期仅找到一个 MyTask 创建位置。但是如果没有线程(更不用说事件驱动,反应式,演员忍者编程),我们将立即看到完整的画面。如果我们可以保留客户端代码(提交任务的代码)的堆栈跟踪并显示出来(例如在失败的情况下),该怎么办?这个想法并不新鲜,例如 Hazelcast 将异常从所有者节点传播 到客户端代码。这看起来可能是天真的支持,以便在发生故障的情况下保持客户端堆栈的跟踪:

public class ExecutorServiceWithClientTrace implements ExecutorService {
protected final ExecutorService target;
public ExecutorServiceWithClientTrace(ExecutorService target) {
this.target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {
return () -> {
try {
return task.call();
} catch (Exception e) {
log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
throw e;
}
};
}
private Exception clientTrace() {
return new Exception("Client stack trace");
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return tasks.stream().map(this::submit).collect(toList());
}
//...
}

这次如果发生故障,我们将检索提交任务的地方的完整堆栈跟踪和线程名称。与之前看到的标准异常相比,它具有更大的价值:

Exception java.lang.NullPointerException in task submitted from thrad main here:
java.lang.Exception: Client stack trace
at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]

9.首选CompletableFuture 在Java 8中CompletableFuture ,引入了更强大的功能 。请尽可能使用它。ExecutorService 没有扩展以支持这种增强的抽象,因此您必须自己照顾它。代替:

final Future<BigDecimal> future =
executorService.submit(this::calculate);

做:

final CompletableFuture<BigDecimal> future =
CompletableFuture.supplyAsync(this::calculate, executorService);

10.同步队列 SynchronousQueue有趣BlockingQueue的是, 这并不是真正的排队。它本身并不是一个数据结构 。最好将其解释为容量为0的队列。引用JavaDoc:

每个 insert 操作必须等待remove 另一个线程执行相应的 操作,反之亦然。同步队列没有任何内部容量,甚至没有一个容量。您无法窥视同步队列,因为仅当您尝试删除它时,该元素才存在。您不能插入元素(使用任何方法),除非另一个线程试图将其删除;您无法进行迭代,因为没有要迭代的内容。[...]

同步队列类似于在CSP和Ada使用会合通道。 这与线程池有什么关系?尝试SynchronousQueue 搭配使用 ThreadPoolExecutor

BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);

我们创建了一个具有两个线程和一个线程的线程池

SynchronousQueue 在它前面。因为

SynchronousQueue 本质上是一个容量为0的队列,例如

ExecutorService 仅在有空闲线程可用时才接受新任务。如果所有线程都忙,则新任务将立即被拒绝,并且永远不会等待。当后台处理必须立即开始或被丢弃时,此行为可能是理想的。

就是这样,我希望您至少找到了一个有趣的功能!


原文链接:https://codingdict.com/