第 六章:为什么我们需要 Executor 框架?


为什么我们需要 Executor 框架?

当我们创建一个简单的多线程应用程序时,我们创建 Runnable对象并使用 Runnable 构造 Thread 对象,我们需要创建、执行和管理线程。我们可能很难做到这一点。Executor Framework为您完成。它负责创建、执行和管理线程,不仅如此,它还提高了应用程序的性能。

当您遵循task每个线程策略时,您为每个任务创建一个新线程,然后如果系统高度过载,您将出现内存不足错误并且您的系统将失败。如果使用ThreadPoolExecutor,则不会为新任务创建线程。一旦线程完成一项任务,您将任务分配给有限数量的线程,它将被赋予另一项任务。

Executor框架的核心接口是Executor。它有一个称为“执行”的方法。

public interface Executor {
void execute(Runnable command);
}

还有另一个名为ExecutorService的接口扩展了 Executor 接口。它可以称为 Executor,它提供可以控制终止的方法和可以产生 Future 以跟踪一个或多个异步任务的进度的方法。它具有提交、关闭、shutdownNow 等方法。

ThreadPoolExecutor是线程池的实际实现。它扩展了实现ExecutorService接口的 AbstractThreadPoolExecutor。您可以从 Executor 类的工厂方法创建 ThreadPoolExecutor。推荐一种获取ThreadPoolExecutor.

Executors 类中有4 factory methods可以用来获取 ThreadPoolExecutor 的实例。我们正在使用 Executors 的 newFixedThreadPool 来获取 ThreadPoolExecutor 的实例。

例子:

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);

以下是 Executors 类中存在的四个工厂方法。

newFixedThreadPool:此方法返回最大大小(假设 n 个线程)固定的线程池执行程序。如果所有 n 个线程都忙于执行任务并且提交了其他任务,那么它们将必须在队列中,直到线程可用。

newCachedThreadPool:此方法返回一个无界线程池。它没有最大大小,但如果它的任务数量较少,那么它将拆除未使用的线程。如果线程已经 1 分钟未使用(keepAliveTime),那么它将把它拆掉。

newSingleThreadedExecutor:此方法返回一个保证使用单线程的执行器。

newScheduledThreadPool:此方法返回一个固定大小的线程池,可以安排命令在给定延迟后运行,或定期执行。

让我们创建一个基本示例,ThreadPoolExecutor我们将使用newFixedThreadPool创建一个ThreadPoolExecutor. 让我们创建一个 Task FetchDataFromFile。这里Task将读取不同的文件并处理它们。

package org.arpit.java2blog.bean;

public class FetchDataFromFile implements Runnable{

private final String fileName;

public FetchDataFromFile(String fileName) {
  super();
  this.fileName = fileName;
}

@Override
public void run() {
  try {
   System.out.println("Fetching data from "+fileName+" by "+Thread.currentThread().getName());
   Thread.sleep(5000); // Reading file
   System.out.println("Read file successfully: "+fileName+" by "+Thread.currentThread().getName());
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
}
public String getFileName() {
  return fileName;
}
}

让我们创建ThreadPoolExecutor它将消耗上述任务并处理它。

package org.arpit.java2blog;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ThreadPoolExecutorMain {
public static void main(String args[]) {
  // Getting instance of ThreadPoolExecutor using  Executors.newFixedThreadPool factory method
  ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
  for (int i = 0; i < 10; i++) {
   FetchDataFromFile fdff = new FetchDataFromFile("File " + i);
   System.out.println("A new file has been added to read : " + fdff.getFileName());
   // Submitting task to executor
   threadPoolExecutor.execute(fdff);
  }
  threadPoolExecutor.shutdown();
}
}

当你运行上面的程序时,你会得到下面的输出:

A new file has been added to read : File 0
A new file has been added to read : File 1
A new file has been added to read : File 2
Fetching data from File 0 by pool-1-thread-1
Fetching data from File 1 by pool-1-thread-2
A new file has been added to read : File 3
Fetching data from File 2 by pool-1-thread-3
A new file has been added to read : File 4
Fetching data from File 3 by pool-1-thread-4
A new file has been added to read : File 5
Fetching data from File 4 by pool-1-thread-5
A new file has been added to read : File 6
A new file has been added to read : File 7
A new file has been added to read : File 8
A new file has been added to read : File 9
Read file successfully: File 1 by pool-1-thread-2
Read file successfully: File 3 by pool-1-thread-4
Fetching data from File 5 by pool-1-thread-4
Read file successfully: File 4 by pool-1-thread-5
Read file successfully: File 2 by pool-1-thread-3
Read file successfully: File 0 by pool-1-thread-1
Fetching data from File 8 by pool-1-thread-3
Fetching data from File 7 by pool-1-thread-5
Fetching data from File 6 by pool-1-thread-2
Fetching data from File 9 by pool-1-thread-1
Read file successfully: File 5 by pool-1-thread-4
Read file successfully: File 7 by pool-1-thread-5
Read file successfully: File 6 by pool-1-thread-2
Read file successfully: File 8 by pool-1-thread-3
Read file successfully: File 9 by pool-1-thread-1

我们使用了新的newFixedThreadPool,所以当我们提交 10 个任务时,将创建5 个新线程5 tasks并执行。其他 5 个任务将在等待队列中等待。一旦线程完成任何任务,该线程将选择另一个任务并执行它。

使用 ThreadPoolExecutor 的构造函数

如果要自定义创建 ThreadPoolExecutor,也可以使用它的构造函数。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;

corePoolSize:corePoolSize 是要保留在池中的线程数,即使它们处于空闲状态 MaximumPoolSize:池中允许的最大线程数 keepAliveTime:当您的可用线程数超过corePoolSize时,keepAliveTime 是该线程在终止前等待任务的时间. unit:时间单位是keepAliveTime, workQueue:workQueue是BlockingQueue,它在执行之前保存任务。 threadFactory:用于创建新线程的工厂。 handler:``RejectedExecutionHandler用于执行被阻塞或队列已满的情况。让我们创建一个RejectedExecutionHandler用于处理被拒绝的任务。

package org.arpit.java2blog.bean;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectTaskHandler implements RejectedExecutionHandler{

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  FetchDataFromFile ffdf=(FetchDataFromFile) r;
  System.out.println("Sorry!! We won't be able to read :"+ffdf.getFileName());  
}
}

让我们更改ThreadPoolExecutorMain.java为下面的代码以使用 ThreadPoolExecutor 构造函数。

package org.arpit.java2blog.bean;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorMain {
public static void main(String args[]) {

  // Wait queue is used to store waiting task
  BlockingQueue queue=new LinkedBlockingQueue(4);

  // Thread factory to create new threads
  ThreadFactory threadFactory=Executors.defaultThreadFactory();

  // Rejection handler in case task get rejected
  RejectTaskHandler rth=new RejectTaskHandler();
  // ThreadPoolExecutor constructor to create its instance
  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10L, TimeUnit.MILLISECONDS, queue,
    threadFactory,rth
    );
  for (int i = 1; i <= 10; i++) {
   FetchDataFromFile fdff = new FetchDataFromFile("File " + i);
   System.out.println("A new file has been added to read : " + fdff.getFileName());
   // Submitting task to executor
   threadPoolExecutor.execute(fdff);
  }
  threadPoolExecutor.shutdown();
}
}

当你运行上面的程序时,你会得到下面的输出:

A new file has been added to read : File 1
A new file has been added to read : File 2
A new file has been added to read : File 3
A new file has been added to read : File 4
Fetching data from File 1 by pool-1-thread-1
A new file has been added to read : File 5
A new file has been added to read : File 6
A new file has been added to read : File 7
Sorry!! We won’t be able to read :File 7
A new file has been added to read : File 8
Sorry!! We won’t be able to read :File 8
A new file has been added to read : File 9
Sorry!! We won’t be able to read :File 9
A new file has been added to read : File 10
Sorry!! We won’t be able to read :File 10
Fetching data from File 6 by pool-1-thread-2
Read file successfully: File 1 by pool-1-thread-1
Read file successfully: File 6 by pool-1-thread-2
Fetching data from File 2 by pool-1-thread-1
Fetching data from File 3 by pool-1-thread-2
Read file successfully: File 3 by pool-1-thread-2
Read file successfully: File 2 by pool-1-thread-1
Fetching data from File 4 by pool-1-thread-2
Fetching data from File 5 by pool-1-thread-1

如果您在此处注意到文件 7、文件 8、文件 9 和文件 10 被拒绝。让我们了解他们被拒绝的原因。ThreadPoolExecutor 的Constructor中的最大池大小为 2,所以当我们向线程池提交 10 个任务时,创建了 2 个线程并开始处理 2 个任务,4 个任务排队LinkedBlockingQueue,所以一旦 LinkedBlockingQueue 满了,其余任务就会被拒绝。

如何决定线程池的大小

您不应该硬编码线程池的大小。它应该由配置提供或从 计算 Runtime.availableProcessors()

螺纹尺寸不宜过大或过小。如果您选择的线程池大小太大,则会导致系统过载,无法正常工作。如果选择的线程池大小太小,会影响吞吐量和性能。

这就是 Java ThreadPoolExecutor 示例的全部内容。


原文链接:https://codingdict.com/