线程池之Executor框架


线程池之Executor框架

Java的线程既是工作单元,也是执行机制。从JDK5开始,把工作机单元和执行机制分离开来。 工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

1. Executor框架简介

1.1 Executor框架的两级调度模型

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。

在底层,操作系统内核将这些线程映射到硬件处理器上。

1.2 Executor框架的结构

Executor框架主要由3部分组成:

  • 任务 。包括被执行任务需要实现的接口:Runnable接口或者Callable接口。
  • 任务的执行 。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  • 异步计算的结果 。包括Future和实现Future的FutureTask类。

Executor框架的成员及其关系可以用一下的关系图表示:

Executor框架的使用示意图:

使用步骤:

  • 主线程首先创建实现Runnable或Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result))。
  • 创建Executor接口的实现类ThreadPoolExecutor类或者ScheduledThreadPoolExecutor类的对象,然后调用其execute()方法或者submit()方法把工作任务添加到线程中,如果有返回值则返回Future对象。其中Callable对象有返回值,因此使用submit()方法;而Runnable可以使用execute()方法,此外还可以使用submit()方法,只要使用callable(Runnable task)或者callable(Runnable task, Object result)方法把Runnable对象包装起来就可以,使用callable(Runnable task)方法返回的null,使用callable(Runnable task, Object result)方法返回result。
  • 主线程可以执行Future对象的get()方法获取返回值,也可以调用cancle()方法取消当前线程的执行。

1.3 Executor框架的使用案例

import java.util.concurrent.*;

    public class ExecutorDemo {
        // 创建ThreadPooljExecutor实现类
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                10,
                100,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5),
        );

        public static void main(String[] args) {
            // 采用submit()方法提交Callable对象并返回Future对象
            Future<String> future = executor.submit(new callableDemo());
            try {
                // get()方法获取返回值
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                // 处理异常
                e.printStackTrace();
            } finally {
                // 关闭线程池
                executor.shutdown();
            }
        }
    }

    /**
     * 创建Callable接口的实现类
     */
    class callableDemo implements Callable<String> {
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            String s = "return string";
            return s;
        }
    }

2. Executor框架成员

2.1 ThreadPoolExecutor

直接创建ThreadPoolExecutor的实例对象,见https://www.cnblogs.com/chiaki/p/13536624.html

ThreadPoolExecutor通常使用工厂类Executors创建,可以创建3种类型的ThreadPoolExecutor,即FixedThreadPool、SingleThreadExecutor以及CachedThreadPool。

  • FixedThreadPool适用于为了满足资源管理的需求,而需要限制当先线程数量的应用场景,适用于负载比较重的服务器。
public static ExecutorService es = Executors.newFixedThreadPool(int threadNums);
    public static ExecutorService es = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory);
public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

把线程池最大线程数量maxmumPoolSize和核心线程池的数量corePoolSize设置为threadNums,将参数keepAliveTime设置为0L。使用无界队列LinkedBlockingQueue作为阻塞队列,因此当任务不能立刻执行时,都会添加到阻塞队列中,而且maximumPoolSize,keepAliveTime都是无效的。

  • SingleThreadExecutor: 适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动地应用场景。**
public static ExecutorService es = Executors.newSingleThreadExecutor();
    public static ExecutorService es = Executors.newSingleThreadExecutor(ThreadFactory threadFactory);
public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

因为阻塞队列使用的是LinkedBlockingQueue,因此和FixedThreadPool一样,参数maximumPoolSize以及keepAliveTime都是无效的。corePoolSize为1,因此 最多只能创建一个线程

  • CachedThreadPool大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
public static ExecutorService es = Executors.newCachedThreadPool();
    public static ExecutorService es = Executors.newCachedThreadPool(ThreadFactory threadFactory);
public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

CachedThreadPool使用SynchronizedQueue作为阻塞队列,SynchronizedQueue是不存储元素的阻塞队列,实现 “一对一的交付” ,也就是说,每次向队列中put一个任务必须等有线程来take这个任务,否则就会一直阻塞该任务,如果一个线程要take一个任务就要一直阻塞知道有任务被put进阻塞队列。

因为CachedThreadPool的maximumPoolSize为Integer.MUX_VALUE,因此 CachedThreadPool是无界的线程池,也就是说可以一直不断的创建线程,这样可能会使CPU和内存资源耗尽 。corePoolSize为0,因此在 CachedThreadPool中直接通过阻塞队列来进行任务的提交

2.2 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。 主要用于在给定的延迟后执行任务或者定期执行任务。

ScheduledThreadPoolExecutor通常使用Executors工厂类来创建,可创建2种类型的ScheduledThreadPoolExecutor,即ScheduledThreadPoolExecutorSingleThreadScheduledExecutor

  • ScheduledThreadPoolExecutor:适用于若干个(固定)线程延时或者定期执行任务,同时为了满足资源管理的需求而需要限制后台线程数量的场景。

    public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums);

    public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums, ThreadFactory threadFactory);

  • SingleThreadScheduledExecutor:适用于需要单个线程延时或者定期的执行任务,同时需要保证各个任务顺序执行的应用场景。

    public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums);

    public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory);

ScheduledThreadPoolExecutor的实现:

ScheduledThreadPoolExecutor的实现主要是通过 把任务封装为ScheduledFutureTask来实现 。通过调用scheduledAtFixedTime()方法或者scheduledWithFixedDelay()方法向阻塞队列添加一个实现了RunnableScheduledFutureTask接口的ScheduledFutureTask类对象。ScheduledFutureTask主要包括3个成员变量:

// 序列号,用于保存任务添加到阻塞队列的顺序
    private final long sequenceNumber;
    // 用于保存该任务将要被执行的具体时间
    private long time;
    // 周期,用于保存任务直线的间隔周期
    private final long period;

ScheduledTreadPoolExecutor的阻塞队列是用无界队列DelayQueue实现的,可以实现元素延时delayTime后才能获取元素,在ScheduledThreadPoolExecutor中,DelayQueue内部封装了一个PriorityQueue,来对任务进行排序,首先对time排序,time小的在前,如果time一样,则sequence小的在前,也就是说如果time一样,那么先被提交的任务先执行。

因为DelayQueue是一个无界的队列,因此线程池的maximumPoolSize是无效的。ScheduledThreadPoolExecutor的工作流程大致如下:

2.3 Future接口/FutureTask实现类

Future接口和实现Future接口的FutureTask实现类,代表异步计算的结果。

2.3.1 FutureTask的使用

FutureTask除了实现Future接口外还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以条用线程直接执行(FutureTask.run())。根据FutureTask.run()方法被执行的时机,FutureTask可处于以下3种状态:

  • 未启动:创建了一个FutureTask对象但没有执行FutureTask.run();
  • 已启动:FutureTask.run()方法被执行的过程中;
  • 已完成:FutureTask.run()正常执行结束,或者FutureTask被取消(FutureTask.cancel()),或者执行FutureTask.run()时抛出异常而异常结束;

状态迁移示意图:

FutureTask的get和cancle执行示意图:

2.3.2 FutureTask的实现

FutureTask是一个基于AQS同步队列实现的一个自定义同步组件 ,通过对同步状态state的竞争实现acquire或者release操作。

FutureTask的内部类Sync实现了AQS接口,通过 对tryAcquire等抽象方法的重写和模板方法的调用来实现内部类Sync的tryAcquireShared等方法 ,然后聚合Sync的方法来实现FutureTask的get和cancel等方法。

FutureTask的设计示意图:

FutureTask的get方法最终会调用AQS.acquireSharedInterruptibly(int arg)方法:

  • 调用AQS.acquireSharedInterruptibly(int arg)方法会首先调用tryAcquireShared()方法判断acquire操作是否可以成功,可以成功的条件是state为执行完成状态RAN或者已取消状态CANCELLED,且runner不为null;
  • 如果成功则get()方法立即返回,如果失败则到线程等待队列执行release操作;
  • 当其他线程执行release操作唤醒当前线程后(比如FutureTask.run()FutureTask.cancle(...)),当前线程再次执行tryAcquireShared()将返回正值1,当前线程离开现场等待队列并唤醒它的后继线程(级联唤醒);
  • 最后返回计算的结果或抛出异常。

2.3.3 FutureTask的使用场景
  • 当一个线程需要等待另一个线程把某个任务执行完以后它才能继续执行时;
  • 有若干线程执行若干任务,每个任务最多只能被执行一次;
  • 当多个线程师徒执行同一个任务,但只能允许一个线程执行此任务,其它线程需要等这个任务被执行完毕以后才能继续执行时。

2.4 Runnable和Callable接口

用于实现线程要执行的工作单元。

2.5 Executors工厂类

提供了常见配置线程池的方法,因为ThreadPoolExecutor的参数众多且意义重大,为了避免配置出错,才有了Executors工厂类。

3. 为什么不建议使用Executors创建线程池?

FixedThreadPoolSingleThreadExecutor :允许请求的队列长度为Integer.MAX_VALUE(无界的阻塞队列),可能堆积大量的请求,从而导致OOM。

CachedThreadPoolScheduledThreadPool :允许创建的线程数量为Integer.MAX_VALUE(无界的阻塞队列),可能会创建大量线程,从而导致OOM。


原文链接:https://www.cnblogs.com/chiaki/p/13538829.html