Java的线程既是工作单元,也是执行机制。从JDK5开始,把工作机单元和执行机制分离开来。 工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。
在底层,操作系统内核将这些线程映射到硬件处理器上。
Executor框架主要由3部分组成:
Executor框架的成员及其关系可以用一下的关系图表示:
Executor框架的使用示意图:
使用步骤:
Executors.callable(Runnable task)
Executors.callable(Runnable task, Object result)
import java.util.concurrent.*; public class ExecutorDemo { // 创建ThreadPooljExecutor实现类 private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), ); public static void main(String[] args) { // 采用submit()方法提交Callable对象并返回Future对象 Future<String> future = executor.submit(new callableDemo()); try { // get()方法获取返回值 System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { // 处理异常 e.printStackTrace(); } finally { // 关闭线程池 executor.shutdown(); } } } /** * 创建Callable接口的实现类 */ class callableDemo implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(1000); String s = "return string"; return s; } }
直接创建ThreadPoolExecutor的实例对象,见https://www.cnblogs.com/chiaki/p/13536624.html
ThreadPoolExecutor通常使用工厂类Executors创建,可以创建3种类型的ThreadPoolExecutor,即FixedThreadPool、SingleThreadExecutor以及CachedThreadPool。
public static ExecutorService es = Executors.newFixedThreadPool(int threadNums); public static ExecutorService es = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory);
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
把线程池最大线程数量maxmumPoolSize和核心线程池的数量corePoolSize设置为threadNums,将参数keepAliveTime设置为0L。使用无界队列LinkedBlockingQueue作为阻塞队列,因此当任务不能立刻执行时,都会添加到阻塞队列中,而且maximumPoolSize,keepAliveTime都是无效的。
threadNums
0L
LinkedBlockingQueue
public static ExecutorService es = Executors.newSingleThreadExecutor(); public static ExecutorService es = Executors.newSingleThreadExecutor(ThreadFactory threadFactory);
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
因为阻塞队列使用的是LinkedBlockingQueue,因此和FixedThreadPool一样,参数maximumPoolSize以及keepAliveTime都是无效的。corePoolSize为1,因此 最多只能创建一个线程 。
maximumPoolSize
keepAliveTime
1
public static ExecutorService es = Executors.newCachedThreadPool(); public static ExecutorService es = Executors.newCachedThreadPool(ThreadFactory threadFactory);
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CachedThreadPool使用SynchronizedQueue作为阻塞队列,SynchronizedQueue是不存储元素的阻塞队列,实现 “一对一的交付” ,也就是说,每次向队列中put一个任务必须等有线程来take这个任务,否则就会一直阻塞该任务,如果一个线程要take一个任务就要一直阻塞知道有任务被put进阻塞队列。
SynchronizedQueue
因为CachedThreadPool的maximumPoolSize为Integer.MUX_VALUE,因此 CachedThreadPool是无界的线程池,也就是说可以一直不断的创建线程,这样可能会使CPU和内存资源耗尽 。corePoolSize为0,因此在 CachedThreadPool中直接通过阻塞队列来进行任务的提交 。
Integer.MUX_VALUE
0
ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。 主要用于在给定的延迟后执行任务或者定期执行任务。
ScheduledThreadPoolExecutor
ThreadPoolExecutor
ScheduledExecutorService
ScheduledThreadPoolExecutor通常使用Executors工厂类来创建,可创建2种类型的ScheduledThreadPoolExecutor,即ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。
SingleThreadScheduledExecutor
ScheduledThreadPoolExecutor:适用于若干个(固定)线程延时或者定期执行任务,同时为了满足资源管理的需求而需要限制后台线程数量的场景。
public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums);
public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums, ThreadFactory threadFactory);
SingleThreadScheduledExecutor:适用于需要单个线程延时或者定期的执行任务,同时需要保证各个任务顺序执行的应用场景。
public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums);
public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory);
ScheduledThreadPoolExecutor的实现:
ScheduledThreadPoolExecutor的实现主要是通过 把任务封装为ScheduledFutureTask来实现 。通过调用scheduledAtFixedTime()方法或者scheduledWithFixedDelay()方法向阻塞队列添加一个实现了RunnableScheduledFutureTask接口的ScheduledFutureTask类对象。ScheduledFutureTask主要包括3个成员变量:
// 序列号,用于保存任务添加到阻塞队列的顺序 private final long sequenceNumber; // 用于保存该任务将要被执行的具体时间 private long time; // 周期,用于保存任务直线的间隔周期 private final long period;
ScheduledTreadPoolExecutor的阻塞队列是用无界队列DelayQueue实现的,可以实现元素延时delayTime后才能获取元素,在ScheduledThreadPoolExecutor中,DelayQueue内部封装了一个PriorityQueue,来对任务进行排序,首先对time排序,time小的在前,如果time一样,则sequence小的在前,也就是说如果time一样,那么先被提交的任务先执行。
ScheduledTreadPoolExecutor
DelayQueue
因为DelayQueue是一个无界的队列,因此线程池的maximumPoolSize是无效的。ScheduledThreadPoolExecutor的工作流程大致如下:
Future接口和实现Future接口的FutureTask实现类,代表异步计算的结果。
FutureTask除了实现Future接口外还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以条用线程直接执行(FutureTask.run())。根据FutureTask.run()方法被执行的时机,FutureTask可处于以下3种状态:
FutureTask.run()
FutureTask.cancel()
状态迁移示意图:
FutureTask的get和cancle执行示意图:
FutureTask是一个基于AQS同步队列实现的一个自定义同步组件 ,通过对同步状态state的竞争实现acquire或者release操作。
FutureTask的内部类Sync实现了AQS接口,通过 对tryAcquire等抽象方法的重写和模板方法的调用来实现内部类Sync的tryAcquireShared等方法 ,然后聚合Sync的方法来实现FutureTask的get和cancel等方法。
FutureTask的设计示意图:
FutureTask的get方法最终会调用AQS.acquireSharedInterruptibly(int arg)方法:
AQS.acquireSharedInterruptibly(int arg)
tryAcquireShared()
get()
FutureTask.cancle(...)
用于实现线程要执行的工作单元。
提供了常见配置线程池的方法,因为ThreadPoolExecutor的参数众多且意义重大,为了避免配置出错,才有了Executors工厂类。
FixedThreadPool 和 SingleThreadExecutor :允许请求的队列长度为Integer.MAX_VALUE(无界的阻塞队列),可能堆积大量的请求,从而导致OOM。
CachedThreadPool 和 ScheduledThreadPool :允许创建的线程数量为Integer.MAX_VALUE(无界的阻塞队列),可能会创建大量线程,从而导致OOM。
原文链接:https://www.cnblogs.com/chiaki/p/13538829.html