Cursor blinking

Java 线程池

Java 基础|Java线程|字数 2,992|阅读时长≈ 8 分钟
Java 线程池

前言

Java 中的线程池是一种线程管理机制,它可以在需要执行任务时重用线程,减少线程创建和销毁的开销,提高程序的性能。Java 提供了 java.util.concurrent 包来支持线程池的实现。

线程池的构造

ThreadPoolExecutor

ThreadPoolExecutor 是 Java 中用于创建和管理线程池的主要类。您可以使用它来自定义线程池的行为,如核心线程数、最大线程数、线程存活时间、工作队列等。

Code
// ThreadPoolExecutor 构造函数定义public ThreadPoolExecutor(int corePoolSize, // 线程池核心线程数                              int maximumPoolSize, // 线程池最大线程数                              long keepAliveTime,// 当线程池中线程数大于核心线程数时,多余的空闲线程存活的最长时间                              TimeUnit unit,                               BlockingQueue<Runnable> workQueue,// 任务队列,用来储存等待执行任务的队列                              ThreadFactory threadFactory,// 线程工厂,用来创建线程,一般默认即可                              RejectedExecutionHandler handler // 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务                              ) {         if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }

根据构造函数所传参数,线程池有以下行为:

  • 任务数 < corePoolSize + workQueue.size,使用核心线程执行任务,其它任务在阻塞队列等待
  • corePoolSize + workQueue.size < 任务数 < maxPoolSize + workQueue.size,(任务数 - workQueue.size)个线程执行任务,其他任务在阻塞队列等待
  • 任务数 > maxPoolSize + workQueue.size,触发拒绝策略
Code
public class ExecutorsUnitTest {    private int queueCapacity = 20;    BlockingQueue blockingQueue = new ArrayBlockingQueue(queueCapacity);     class MyRunnable implements Runnable {        private String taskName;         public MyRunnable(String taskName) {            this.taskName = taskName;        }         @Override        public void run() {            LogExtKt.println(this, "start " + taskName);            try {                Thread.sleep(5000);            } catch (InterruptedException e) {                e.printStackTrace();            }            LogExtKt.println(this, "end " + taskName);        }    }     @Test    public void test_threadPoolExecutor() {        // 当任务数 < corePoolSize+ queueCapacity        // 使用核心线程执行任务,其他任务在阻塞队列等待        int corePoolSize = 5;        int maxPoolSize = 10;        long keepAliveTime = 2;        ThreadPoolExecutor executor = new ThreadPoolExecutor(                corePoolSize,                maxPoolSize,                keepAliveTime,                TimeUnit.SECONDS,                blockingQueue,                new ThreadPoolExecutor.CallerRunsPolicy());        for (int i = 0; i < 10; i++) {            Runnable temp = new MyRunnable("task " + i);            executor.execute(temp);        }        executor.shutdown();        while (!executor.isTerminated()) {        }        LogExtKt.println(this, "执行结束");    }     @Test    public void test_threadPoolExecutor1() {        // 当 corePoolSize+ queueCapacity  < 任务数 <  maxPoolSize+queueCapacity        // 创建 6 个线程执行任务,其他任务在阻塞队列等待        int corePoolSize = 5;        int maxPoolSize = 10;        long keepAliveTime = 2;        ThreadPoolExecutor executor = new ThreadPoolExecutor(                corePoolSize,                maxPoolSize,                keepAliveTime,                TimeUnit.SECONDS,                blockingQueue,                new ThreadPoolExecutor.CallerRunsPolicy());        for (int i = 0; i < 26; i++) {            Runnable temp = new MyRunnable("task " + i);            executor.execute(temp);        }        executor.shutdown();        while (!executor.isTerminated()) {        }        LogExtKt.println(this, "执行结束");    }     @Test    public void test_threadPoolExecutor2() {        //  任务数 > maxPoolSize + queueCapacity,        int corePoolSize = 5;        int maxPoolSize = 10;        long keepAliveTime = 2;        ThreadPoolExecutor executor = new ThreadPoolExecutor(                corePoolSize,                maxPoolSize,                keepAliveTime,                TimeUnit.SECONDS,                blockingQueue,                new ThreadPoolExecutor.AbortPolicy());        for (int i = 0; i < 35; i++) {            Runnable temp = new MyRunnable("task " + i);            executor.execute(temp);        }        executor.shutdown();        while (!executor.isTerminated()) {        }        LogExtKt.println(this, "执行结束");    }} 

RejectedExecutionHandler

当 任务数 > maxPoolSize + workQueue.size 时触发拒绝策略,以下是关于 RejectedExecutionHandler 接口常用实现类的展示:

类名描述
AbortPolicy默认的拒绝策略,直接抛出 RejectedExecutionException 异常。
CallerRunsPolicy在调用者线程中直接执行被拒绝的任务。
DiscardPolicy直接丢弃被拒绝的任务,不做任何处理。
DiscardOldestPolicy丢弃队列中等待时间最长的任务,然后尝试重新提交当前任务。
Custom 实现您可以实现自定义的 RejectedExecutionHandler 接口来定义自己的拒绝策略。

这些拒绝策略可以根据您的需求来选择,以便更好地处理线程池中被拒绝的任务。

BlockingQueue

以下是关于常用的阻塞队列类型及其特性的展示:

类名描述
ArrayBlockingQueue基于数组的有界阻塞队列,按照先进先出的顺序存储元素。需要指定容量大小。
LinkedBlockingQueue基于链表的有界或无界阻塞队列,按照先进先出的顺序存储元素。如果不指定容量,则为无界。
PriorityBlockingQueue无界阻塞队列,根据元素的自然顺序或者通过构造函数提供的 Comparator 进行优先级排序。
DelayQueue无界阻塞队列,用于存放实现了 Delayed 接口的元素,元素只有在延迟期满时才能被取出。
SynchronousQueue不存储元素的阻塞队列,生产者必须等待消费者取走元素,反之亦然。
LinkedTransferQueue无界阻塞队列,支持生产者等待消费者接收元素。
LinkedBlockingDeque双端阻塞队列,可以在两端插入和移除元素。

这些阻塞队列提供了不同的特性和适用场景,您可以根据需求选择最适合的阻塞队列类型。

线程池使用

线程池创建

  • 通过 ThreadPoolExecutor 构造函数来创建(推荐的方式)
截屏2024-04-03 15.11.22.png
截屏2024-04-03 15.11.22.png
  • 通过 Executor 框架的工具类 Executors 来创建

以下是 Executors 类创建线程池的常用方法的展示:

方法描述注意
newFixedThreadPool(int nThreads)创建固定大小的线程池,包含固定数量的线程。使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
newSingleThreadExecutor()创建单线程的线程池,保证任务按顺序执行。使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
newCachedThreadPool()创建可根据需要创建新线程的线程池。使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。
newScheduledThreadPool(int corePoolSize)创建支持定时以及周期性任务执行的线程池。使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
newWorkStealingPool()创建使用工作窃取算法的线程池。

线程池状态

在Java中,线程池有几种状态,主要包括以下几种:

  • RUNNING(运行状态):线程池处于运行状态,可以接受新任务,并处理阻塞队列中的任务。
  • SHUTDOWN(关闭状态):不再接受新任务,但会继续处理阻塞队列中的任务,直到队列为空。
  • STOP(停止状态):不再接受新任务,不处理阻塞队列中的任务,会尝试中断正在执行的任务。
  • TERMINATED(终止状态):线程池完全终止,不再处理任何任务。

这些状态可以通过线程池的方法来进行转换,

  • shutdown() 方法将线程池状态从 RUNNING 转换为 SHUTDOWN。

线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

  • shutdownNow() 方法将线程池状态从 RUNNING 转换为 STOP。

线程池的状态立刻变成 STOP 状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。 它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有 sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前线程的。所以,shutdownNow() 并不代表线程池立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

  • awaitTermination() 方法用于等待线程池进入 TERMINATED 状态。

awaitTermination 阻塞,直到所有任务在关闭请求后完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。如果线程池已经关闭,则直接返回 true;如果线程池未关闭,该方法会根据 Timeout + TimeUnit 的延时等待线程结束,并根据到期后的线程池状态返回 true 或者 false,该方法不会关闭线程池,只负责延时以及检测状态。

提交任务

任务提交可以通过 ThreadPoolExecutor 的 execute() 方法或 submit() 方法来实现。

  • execute()

execute() 方法用于提交不需要返回结果的任务,参数为 Runnable 对象。

Code
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); executor.execute(new Runnable() {    @Override    public void run() {        // 任务逻辑    }}); 
  • submit()

submit() 方法用于提交需要返回结果的任务,参数可以是 Runnable 或 Callable 对象,返回一个 Future 对象,通过 Future 对象可以获取任务的执行结果或取消任务。

Code
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); Future<?> future = executor.submit(new Callable<Void>() {    @Override    public Void call() {        // 任务逻辑        return null;    }}); 

在提交任务后,线程池会根据配置的核心线程数、最大线程数等参数来执行任务。如果线程池中的线程数小于核心线程数,会创建新线程来执行任务;如果线程池中的线程数达到核心线程数,会将任务放入工作队列中等待执行;如果工作队列已满,会根据最大线程数来创建新线程执行任务;如果线程数达到最大线程数且工作队列已满,会根据拒绝策略来处理无法执行的任务。

异常处理

在 Java 中,线程池的异常处理通常涉及到两个方面:一是如何处理线程池中任务的异常,二是如何处理线程池本身可能出现的异常。下面分别介绍这两个方面的处理方法:

任务异常处理

默认情况下,线程池内所有任务执行都被 try catch 包裹。

  • 使用Future对象获取任务执行结果并处理异常

通过 Future 对象的 get() 方法可以获取任务执行的结果,同时可以捕获任务执行过程中抛出的异常:

Code
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); Future<?> future = executor.submit(new Callable<Void>() {    @Override    public Void call() throws Exception {        // 任务逻辑,可能会抛出异常        return null;    }}); try {    future.get(); // 获取任务执行结果} catch (ExecutionException e) {    Throwable cause = e.getCause();    // 处理任务执行过程中抛出的异常} 
  • 使用 UncaughtExceptionHandler 处理线程池中线程的未捕获异常

通过设置线程池的 UncaughtExceptionHandler,可以捕获线程池中线程抛出的未捕获异常:

Code
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());executor.setThreadFactory(new ThreadFactory() {    public Thread newThread(Runnable r) {        Thread t = new Thread(r);        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {            public void uncaughtException(Thread t, Throwable e) {                // 处理线程抛出的未捕获异常            }        });        return t;    }}); 

线程池异常处理

  • 使用 ThreadPoolExecutor 的 afterExecute() 方法处理任务执行过程中的异常

可以通过重写 ThreadPoolExecutor 的 afterExecute() 方法,在任务执行完成后处理任务执行过程中的异常:

Code
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {    protected void afterExecute(Runnable r, Throwable t) {        super.afterExecute(r, t);        if (t != null) {            // 处理任务执行过程中的异常        }    }};
  • 使用RejectedExecutionHandler处理无法执行的任务

当线程池无法执行任务时,可以通过设置RejectedExecutionHandler来处理无法执行的任务:

Code
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        // 处理无法执行的任务    }});

通过以上方法,可以有效地处理线程池中任务的异常和线程池本身可能出现的异常,保证线程池的稳定运行。

为什么使用线程池

线程的开销主要有哪些,线程池使用池化对象的方式怎么在性能和内存之间寻求平衡的?

在Java中,线程调度的成本和线程创建的成本是需要考虑的重要因素,特别是在使用线程池等多线程编程时。下面分别介绍线程调度的成本和线程创建的成本:

线程调度的成本

《深入理解 Java 虚拟机》所述,Java 线程调度成本主要来自于用户态与核心态之间状态切换和 “响应中断、保护和恢复执行现场”成本。

线程调度是指操作系统在多个线程之间进行切换执行的过程。线程调度的成本主要包括以下几个方面:

  • 上下文切换成本:当操作系统在多个线程之间进行切换时,需要保存和恢复线程的上下文信息,包括栈信息等。这个过程会带来一定的性能开销。
  • 竞争和锁开销:多个线程之间可能会竞争共享资源,导致锁的获取和释放,以及相关的同步开销。
  • 缓存失效:线程切换可能导致缓存失效,从而影响程序的性能。

线程创建的成本

线程创建是指在程序中动态创建线程的过程。线程创建的成本主要包括以下方面:

  • 内存开销:每个线程都需要一定的内存空间来存储线程的上下文信息、栈空间等。
  • 系统调用开销:线程的创建需要操作系统进行相应的系统调用,包括分配资源、初始化线程等。
  • 初始化开销:线程的初始化过程可能包括一些额外的操作,比如初始化线程的状态、设置优先级等。

在实际编程中,需要权衡线程调度的成本和线程创建的成本,避免创建过多线程导致系统资源消耗过大,同时尽量减少线程间的频繁切换,以提高程序的性能和效率。使用线程池可以有效管理线程的创建和复用,减少线程创建的开销,同时可以控制并发度,避免过多线程导致的性能下降。

参考文档

《深入理解 JVM 虚拟机》

Featrue

  • 异常处理的示例