Java 线程池
前言
Java 中的线程池是一种线程管理机制,它可以在需要执行任务时重用线程,减少线程创建和销毁的开销,提高程序的性能。Java 提供了 java.util.concurrent 包来支持线程池的实现。
线程池的构造
ThreadPoolExecutor
ThreadPoolExecutor 是 Java 中用于创建和管理线程池的主要类。您可以使用它来自定义线程池的行为,如核心线程数、最大线程数、线程存活时间、工作队列等。
// ThreadPoolExecutor 构造函数定义public ThreadPoolExecutor(int corePoolSize, // 线程池核心线程数 int maximumPoolSize, // 线程池最大线程数 long keepAliveTime,// 当线程池中线程数大于核心线程数时,多余的空闲线程存活的最长时间 TimeUnit unit, BlockingQueue<Runnable> workQueue,// 任务队列,用来储存等待执行任务的队列 ThreadFactory threadFactory,// 线程工厂,用来创建线程,一般默认即可 RejectedExecutionHandler handler // 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }根据构造函数所传参数,线程池有以下行为:
- 任务数 < corePoolSize + workQueue.size,使用核心线程执行任务,其它任务在阻塞队列等待
- corePoolSize + workQueue.size < 任务数 < maxPoolSize + workQueue.size,(任务数 - workQueue.size)个线程执行任务,其他任务在阻塞队列等待
- 任务数 > maxPoolSize + workQueue.size,触发拒绝策略
public class ExecutorsUnitTest { private int queueCapacity = 20; BlockingQueue blockingQueue = new ArrayBlockingQueue(queueCapacity); class MyRunnable implements Runnable { private String taskName; public MyRunnable(String taskName) { this.taskName = taskName; } @Override public void run() { LogExtKt.println(this, "start " + taskName); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } LogExtKt.println(this, "end " + taskName); } } @Test public void test_threadPoolExecutor() { // 当任务数 < corePoolSize+ queueCapacity // 使用核心线程执行任务,其他任务在阻塞队列等待 int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 2; ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, blockingQueue, new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { Runnable temp = new MyRunnable("task " + i); executor.execute(temp); } executor.shutdown(); while (!executor.isTerminated()) { } LogExtKt.println(this, "执行结束"); } @Test public void test_threadPoolExecutor1() { // 当 corePoolSize+ queueCapacity < 任务数 < maxPoolSize+queueCapacity // 创建 6 个线程执行任务,其他任务在阻塞队列等待 int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 2; ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, blockingQueue, new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 26; i++) { Runnable temp = new MyRunnable("task " + i); executor.execute(temp); } executor.shutdown(); while (!executor.isTerminated()) { } LogExtKt.println(this, "执行结束"); } @Test public void test_threadPoolExecutor2() { // 任务数 > maxPoolSize + queueCapacity, int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 2; ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, blockingQueue, new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 35; i++) { Runnable temp = new MyRunnable("task " + i); executor.execute(temp); } executor.shutdown(); while (!executor.isTerminated()) { } LogExtKt.println(this, "执行结束"); }} RejectedExecutionHandler
当 任务数 > maxPoolSize + workQueue.size 时触发拒绝策略,以下是关于 RejectedExecutionHandler 接口常用实现类的展示:
| 类名 | 描述 |
|---|---|
| AbortPolicy | 默认的拒绝策略,直接抛出 RejectedExecutionException 异常。 |
| CallerRunsPolicy | 在调用者线程中直接执行被拒绝的任务。 |
| DiscardPolicy | 直接丢弃被拒绝的任务,不做任何处理。 |
| DiscardOldestPolicy | 丢弃队列中等待时间最长的任务,然后尝试重新提交当前任务。 |
| Custom 实现 | 您可以实现自定义的 RejectedExecutionHandler 接口来定义自己的拒绝策略。 |
这些拒绝策略可以根据您的需求来选择,以便更好地处理线程池中被拒绝的任务。
BlockingQueue
以下是关于常用的阻塞队列类型及其特性的展示:
| 类名 | 描述 |
|---|---|
| ArrayBlockingQueue | 基于数组的有界阻塞队列,按照先进先出的顺序存储元素。需要指定容量大小。 |
| LinkedBlockingQueue | 基于链表的有界或无界阻塞队列,按照先进先出的顺序存储元素。如果不指定容量,则为无界。 |
| PriorityBlockingQueue | 无界阻塞队列,根据元素的自然顺序或者通过构造函数提供的 Comparator 进行优先级排序。 |
| DelayQueue | 无界阻塞队列,用于存放实现了 Delayed 接口的元素,元素只有在延迟期满时才能被取出。 |
| SynchronousQueue | 不存储元素的阻塞队列,生产者必须等待消费者取走元素,反之亦然。 |
| LinkedTransferQueue | 无界阻塞队列,支持生产者等待消费者接收元素。 |
| LinkedBlockingDeque | 双端阻塞队列,可以在两端插入和移除元素。 |
这些阻塞队列提供了不同的特性和适用场景,您可以根据需求选择最适合的阻塞队列类型。
线程池使用
线程池创建
- 通过 ThreadPoolExecutor 构造函数来创建(推荐的方式)

- 通过 Executor 框架的工具类 Executors 来创建
以下是 Executors 类创建线程池的常用方法的展示:
| 方法 | 描述 | 注意 |
|---|---|---|
| newFixedThreadPool(int nThreads) | 创建固定大小的线程池,包含固定数量的线程。 | 使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。 |
| newSingleThreadExecutor() | 创建单线程的线程池,保证任务按顺序执行。 | 使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。 |
| newCachedThreadPool() | 创建可根据需要创建新线程的线程池。 | 使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。 |
| newScheduledThreadPool(int corePoolSize) | 创建支持定时以及周期性任务执行的线程池。 | 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。 |
| newWorkStealingPool() | 创建使用工作窃取算法的线程池。 |
线程池状态
在Java中,线程池有几种状态,主要包括以下几种:
- RUNNING(运行状态):线程池处于运行状态,可以接受新任务,并处理阻塞队列中的任务。
- SHUTDOWN(关闭状态):不再接受新任务,但会继续处理阻塞队列中的任务,直到队列为空。
- STOP(停止状态):不再接受新任务,不处理阻塞队列中的任务,会尝试中断正在执行的任务。
- TERMINATED(终止状态):线程池完全终止,不再处理任何任务。
这些状态可以通过线程池的方法来进行转换,
- shutdown() 方法将线程池状态从 RUNNING 转换为 SHUTDOWN。
线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
- shutdownNow() 方法将线程池状态从 RUNNING 转换为 STOP。
线程池的状态立刻变成 STOP 状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。 它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有 sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前线程的。所以,shutdownNow() 并不代表线程池立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
- awaitTermination() 方法用于等待线程池进入 TERMINATED 状态。
awaitTermination 阻塞,直到所有任务在关闭请求后完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。如果线程池已经关闭,则直接返回 true;如果线程池未关闭,该方法会根据 Timeout + TimeUnit 的延时等待线程结束,并根据到期后的线程池状态返回 true 或者 false,该方法不会关闭线程池,只负责延时以及检测状态。
提交任务
任务提交可以通过 ThreadPoolExecutor 的 execute() 方法或 submit() 方法来实现。
- execute()
execute() 方法用于提交不需要返回结果的任务,参数为 Runnable 对象。
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); executor.execute(new Runnable() { @Override public void run() { // 任务逻辑 }}); - submit()
submit() 方法用于提交需要返回结果的任务,参数可以是 Runnable 或 Callable 对象,返回一个 Future 对象,通过 Future 对象可以获取任务的执行结果或取消任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); Future<?> future = executor.submit(new Callable<Void>() { @Override public Void call() { // 任务逻辑 return null; }}); 在提交任务后,线程池会根据配置的核心线程数、最大线程数等参数来执行任务。如果线程池中的线程数小于核心线程数,会创建新线程来执行任务;如果线程池中的线程数达到核心线程数,会将任务放入工作队列中等待执行;如果工作队列已满,会根据最大线程数来创建新线程执行任务;如果线程数达到最大线程数且工作队列已满,会根据拒绝策略来处理无法执行的任务。
异常处理
在 Java 中,线程池的异常处理通常涉及到两个方面:一是如何处理线程池中任务的异常,二是如何处理线程池本身可能出现的异常。下面分别介绍这两个方面的处理方法:
任务异常处理
默认情况下,线程池内所有任务执行都被 try catch 包裹。
- 使用Future对象获取任务执行结果并处理异常
通过 Future 对象的 get() 方法可以获取任务执行的结果,同时可以捕获任务执行过程中抛出的异常:
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); Future<?> future = executor.submit(new Callable<Void>() { @Override public Void call() throws Exception { // 任务逻辑,可能会抛出异常 return null; }}); try { future.get(); // 获取任务执行结果} catch (ExecutionException e) { Throwable cause = e.getCause(); // 处理任务执行过程中抛出的异常} - 使用 UncaughtExceptionHandler 处理线程池中线程的未捕获异常
通过设置线程池的 UncaughtExceptionHandler,可以捕获线程池中线程抛出的未捕获异常:
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());executor.setThreadFactory(new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { // 处理线程抛出的未捕获异常 } }); return t; }}); 线程池异常处理
- 使用 ThreadPoolExecutor 的 afterExecute() 方法处理任务执行过程中的异常
可以通过重写 ThreadPoolExecutor 的 afterExecute() 方法,在任务执行完成后处理任务执行过程中的异常:
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) { protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { // 处理任务执行过程中的异常 } }};- 使用RejectedExecutionHandler处理无法执行的任务
当线程池无法执行任务时,可以通过设置RejectedExecutionHandler来处理无法执行的任务:
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 处理无法执行的任务 }});通过以上方法,可以有效地处理线程池中任务的异常和线程池本身可能出现的异常,保证线程池的稳定运行。
为什么使用线程池
线程的开销主要有哪些,线程池使用池化对象的方式怎么在性能和内存之间寻求平衡的?
在Java中,线程调度的成本和线程创建的成本是需要考虑的重要因素,特别是在使用线程池等多线程编程时。下面分别介绍线程调度的成本和线程创建的成本:
线程调度的成本
《深入理解 Java 虚拟机》所述,Java 线程调度成本主要来自于用户态与核心态之间状态切换和 “响应中断、保护和恢复执行现场”成本。
线程调度是指操作系统在多个线程之间进行切换执行的过程。线程调度的成本主要包括以下几个方面:
- 上下文切换成本:当操作系统在多个线程之间进行切换时,需要保存和恢复线程的上下文信息,包括栈信息等。这个过程会带来一定的性能开销。
- 竞争和锁开销:多个线程之间可能会竞争共享资源,导致锁的获取和释放,以及相关的同步开销。
- 缓存失效:线程切换可能导致缓存失效,从而影响程序的性能。
线程创建的成本
线程创建是指在程序中动态创建线程的过程。线程创建的成本主要包括以下方面:
- 内存开销:每个线程都需要一定的内存空间来存储线程的上下文信息、栈空间等。
- 系统调用开销:线程的创建需要操作系统进行相应的系统调用,包括分配资源、初始化线程等。
- 初始化开销:线程的初始化过程可能包括一些额外的操作,比如初始化线程的状态、设置优先级等。
在实际编程中,需要权衡线程调度的成本和线程创建的成本,避免创建过多线程导致系统资源消耗过大,同时尽量减少线程间的频繁切换,以提高程序的性能和效率。使用线程池可以有效管理线程的创建和复用,减少线程创建的开销,同时可以控制并发度,避免过多线程导致的性能下降。
参考文档
《深入理解 JVM 虚拟机》
Featrue
- 异常处理的示例