• 生活小妙招免费各类生活中的小问题知识以及音乐简谱等,是你了解世界未知知识的好地方。

如何优雅的使用和理解线程池?你怎么看?

十万个为什么 空空 2024-4-09 00:06:23 5次浏览

关于问题如何优雅的使用和理解线程池?你怎么看?一共有 2 位热心网友为你解答:

【1】、来自网友【华为云开发者联盟】的最佳回答:

提示

请带着这些问题继续后文,会很大程度上帮助你更好的理解相关知识点。@pdai

  • 为什么要有线程池?
  • Java 是实现和管理线程池有哪些方式? 请简单举例如何使用。
  • 为什么很多公司不允许使用 Executors 去创建线程池? 那么推荐怎么使用呢?
  • ThreadPoolExecutor 有哪些核心的配置参数? 请简要说明
  • ThreadPoolExecutor 可以创建哪是哪三种线程池呢?
  • 当队列满了并且 worker 的数量达到 maxSize 的时候,会怎么样?
  • 说说 ThreadPoolExecutor 有哪些 RejectedExecutionHandler 策略? 默认是什么策略?
  • 简要说下线程池的任务执行机制? execute –> addWorker –>runworker (getTask)
  • 线程池中任务是如何提交的?
  • 线程池中任务是如何关闭的?
  • 在配置线程池的时候需要考虑哪些配置因素?
  • 如何监控线程池的状态?

为什么要有线程池

线程池能够对线程进行统一分配,调优和监控:

  • 降低资源消耗(线程无限制地创建,然后使用完毕后销毁)
  • 提高响应速度(无须创建线程)
  • 提高线程的可管理性

ThreadPoolExecutor 例子

Java 是如何实现和管理线程池的?

从 JDK 5 开始,把工作单元与执行机制分离开来,工作单元包括 Runnable 和 Callable,而执行机制由 Executor 框架提供。

  • WorkerThread

SimpleThreadPool

程序中我们创建了固定大小为五个工作线程的线程池。然后分配给线程池十个工作,因为线程池大小为五,它将启动五个工作线程先处理五个工作,其他的工作则处于等待状态,一旦有工作完成,空闲下来工作线程就会捡取等待队列里的其他工作进行执行。

这里是以上程序的输出。

输出表明线程池中至始至终只有五个名为 “pool-1-thread-1” 到 “pool-1-thread-5” 的五个线程,这五个线程不随着工作的完成而消亡,会一直存在,并负责执行分配给线程池的任务,直到线程池消亡。

Executors 类提供了使用了 ThreadPoolExecutor 的简单的 ExecutorService 实现,但是 ThreadPoolExecutor 提供的功能远不止于此。我们可以在创建 ThreadPoolExecutor 实例时指定活动线程的数量,我们也可以限制线程池的大小并且创建我们自己的 RejectedExecutionHandler 实现来处理不能适应工作队列的工作。

这里是我们自定义的 RejectedExecutionHandler 接口的实现。

  • RejectedExecutionHandlerImpl.java

ThreadPoolExecutor 提供了一些方法,我们可以使用这些方法来查询 executor 的当前状态,线程池大小,活动线程数量以及任务数量。因此我是用来一个监控线程在特定的时间间隔内打印 executor 信息。

  • MyMonitorThread.java

这里是使用 ThreadPoolExecutor 的线程池实现例子。

  • WorkerPool.java

注意在初始化 ThreadPoolExecutor 时,我们保持初始池大小为 2,最大池大小为 4 而工作队列大小为 2。因此如果已经有四个正在执行的任务而此时分配来更多任务的话,工作队列将仅仅保留他们(新任务)中的两个,其他的将会被 RejectedExecutionHandlerImpl 处理。

上面程序的输出可以证实以上观点。

注意 executor 的活动任务、完成任务以及所有完成任务,这些数量上的变化。我们可以调用 shutdown() 方法来结束所有提交的任务并终止线程池。

ThreadPoolExecutor 使用详解

其实 java 线程池的实现原理很简单,说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入 workQueue 中。workerSet 中的线程会不断的从 workQueue 中获取线程然后执行。当 workQueue 中没有任务的时候,worker 就会阻塞,直到队列中有任务了就取出来继续执行。

Execute 原理

当一个任务提交至线程池之后:

  1. 线程池首先当前运行的线程数量是否少于 corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入 2.
  2. 判断 BlockingQueue 是否已经满了,倘若还没有满,则将线程放入 BlockingQueue。否则进入 3.
  3. 如果创建一个新的工作线程将使当前运行的线程数量超过 maximumPoolSize,则交给 RejectedExecutionHandler 来处理任务。

当 ThreadPoolExecutor 创建新线程时,通过 CAS 来更新线程池的状态 ctl.

参数

  • corePoolSize 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
  • workQueue 用来保存等待被执行的任务的阻塞队列. 在 JDK 中提供了如下阻塞队列: 具体可以参考 JUC 集合: BlockQueue 详解
    • ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按 FIFO 排序任务;
    • LinkedBlockingQueue: 基于链表结构的阻塞队列,按 FIFO 排序任务,吞吐量通常要高于 ArrayBlockingQueue;
    • SynchronousQueue: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue;
    • PriorityBlockingQueue: 具有优先级的无界阻塞队列;

LinkedBlockingQueue 比 ArrayBlockingQueue 在插入删除节点性能方面更优,但是二者在 put(), take()任务的时均需要加锁,SynchronousQueue 使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是 Transfer.transfer().

  • maximumPoolSize 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize;当阻塞队列是无界队列, 则 maximumPoolSize 则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入 workQueue.
  • keepAliveTime 线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于 corePoolSize 时才有用, 超过这个时间的空闲线程将被终止;
  • unit keepAliveTime 的单位
  • threadFactory 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为 DefaultThreadFactory
  • handler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
    • AbortPolicy: 直接抛出异常,默认策略;
    • CallerRunsPolicy: 用调用者所在的线程来执行任务;
    • DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy: 直接丢弃任务;

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

三种类型

newFixedThreadPool

线程池的线程数量达 corePoolSize 后,即使线程池没有可执行任务时,也不会释放线程。

FixedThreadPool 的工作队列为无界队列 LinkedBlockingQueue(队列容量为 Integer.MAX_VALUE), 这会导致以下问题:

  • 线程池里的线程数量不超过 corePoolSize,这导致了 maximumPoolSize 和 keepAliveTime 将会是个无用参数
  • 由于使用了无界队列, 所以 FixedThreadPool 永远不会拒绝, 即饱和策略失效

newSingleThreadExecutor

初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行.

由于使用了无界队列, 所以 SingleThreadPool 永远不会拒绝, 即饱和策略失效

newCachedThreadPool

线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列; 和 newFixedThreadPool 创建的线程池不同,newCachedThreadPool 在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 执行过程与前两种稍微不同:

  • 主线程调用 SynchronousQueue 的 offer()方法放入 task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue 的 task, 即调用了 SynchronousQueue 的 poll(), 那么主线程将该 task 交给空闲线程. 否则执行(2)
  • 当线程池为空或者没有空闲的线程, 则创建新的线程执行任务.
  • 执行完任务的线程倘若在 60s 内仍空闲, 则会被终止. 因此长时间空闲的 CachedThreadPool 不会持有任何线程资源.

关闭线程池

遍历线程池中的所有线程,然后逐个调用线程的 interrupt 方法来中断线程.

关闭方式 – shutdown

将线程池里的线程状态设置成 SHUTDOWN 状态, 然后中断所有没有正在执行任务的线程.

关闭方式 – shutdownNow

将线程池里的线程状态设置成 STOP 状态, 然后停止所有正在执行或暂停任务的线程. 只要调用这两个关闭方法中的任意一个, isShutDown() 返回 true. 当所有任务都成功关闭了, isTerminated()返回 true.

ThreadPoolExecutor 源码详解

几个关键属性

内部状态

其中 AtomicInteger 变量 ctl 的功能非常强大: 利用低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态:

  • RUNNING: -1 << COUNT_BITS,即高 3 位为 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • SHUTDOWN: 0 << COUNT_BITS,即高 3 位为 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • STOP : 1 << COUNT_BITS,即高 3 位为 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • TIDYING : 2 << COUNT_BITS,即高 3 位为 010, 所有的任务都已经终止;
  • TERMINATED: 3 << COUNT_BITS,即高 3 位为 011, terminated()方法已经执行完成

任务的执行

execute –> addWorker –>runworker (getTask)

线程池的工作线程通过 Woker 类实现,在 ReentrantLock 锁的保证下,把 Woker 实例插入到 HashSet 后,并启动 Woker 中的线程。 从 Woker 类的构造方法实现可以发现: 线程工厂在创建线程 thread 时,将 Woker 实例本身 this 作为参数传入,当执行 start 方法启动线程 thread 时,本质是执行了 Worker 的 runWorker 方法。 firstTask 执行完成之后,通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;

execute()方法

ThreadPoolExecutor.execute(task)实现了 Executor.execute(task)

  • 为什么需要 double check 线程池的状态?

在多线程环境下,线程池的状态时刻在变化,而 ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command 加入 workque 是线程池之前的状态。倘若没有 double check,万一线程池处于非 running 状态(在多线程环境下很有可能发生),那么 command 永远不会执行。

addWorker 方法

从方法 execute 的实现可以看出: addWorker 主要负责创建新的线程并执行任务 线程池创建新线程执行任务时,需要 获取全局锁:

Worker 类的 runworker 方法

  • 继承了 AQS 类,可以方便的实现工作线程的中止操作;
  • 实现了 Runnable 接口,可以将自身作为一个任务在工作线程中执行;
  • 当前提交的任务 firstTask 作为参数传入 Worker 的构造方法;

一些属性还有构造方法:

runWorker 方法是线程池的核心:

  • 线程启动之后,通过 unlock 方法释放锁,设置 AQS 的 state 为 0,表示运行可中断;
  • Worker 执行 firstTask 或从 workQueue 中获取任务:进行加锁操作,保证 thread 不被其他线程中断(除非线程池被中断)检查线程池状态,倘若线程池处于中断状态,当前线程将中断。执行 beforeExecute 执行任务的 run 方法执行 afterExecute 方法解锁操作

通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;

getTask 方法

下面来看一下 getTask()方法,这里面涉及到 keepAliveTime 的使用,从这个方法我们可以看出线程池是怎么让超过 corePoolSize 的那部分 worker 销毁的。

注意这里一段代码是 keepAliveTime 起作用的关键:

allowCoreThreadTimeOut 为 false,线程即使空闲也不会被销毁;倘若为 ture,在 keepAliveTime 内仍空闲则会被销毁。

如果线程允许空闲等待而不被销毁 timed == false,workQueue.take 任务: 如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take 方法返回任务,并执行;

如果线程不允许无休止空闲 timed == true, workQueue.poll 任务: 如果在 keepAliveTime 时间内,阻塞队列还是没有任务,则返回 null;

任务的提交

  1. submit 任务,等待线程池 execute
  2. 执行 FutureTask 类的 get 方法时,会把主线程封装成 WaitNode 节点并保存在 waiters 链表中, 并阻塞等待运行结果;
  3. FutureTask 任务执行完成后,通过 UNSAFE 设置 waiters 相应的 waitNode 为 null,并通过 LockSupport 类 unpark 方法唤醒主线程;

在实际业务场景中,Future 和 Callable 基本是成对出现的,Callable 负责产生结果,Future 负责获取结果。

  1. Callable 接口类似于 Runnable,只是 Runnable 没有返回值。
  2. Callable 任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即 Future 可以拿到异步执行任务各种结果;
  3. Future.get 方法会导致主线程阻塞,直到 Callable 任务执行完成;

submit 方法

AbstractExecutorService.submit()实现了 ExecutorService.submit() 可以获取执行完的返回值, 而 ThreadPoolExecutor 是 AbstractExecutorService.submit()的子类,所以 submit 方法也是 ThreadPoolExecutor`的方法。

通过 submit 方法提交的 Callable 任务会被封装成了一个 FutureTask 对象。通过 Executor.execute 方法提交 FutureTask 到线程池中等待被执行,最终执行的是 FutureTask 的 run 方法;

FutureTask 对象

public class FutureTask<V> implements RunnableFuture<V> 可以将 FutureTask 提交至线程池中等待被执行(通过 FutureTask 的 run 方法来执行)

  • 内部状态

内部状态的修改通过 sun.misc.Unsafe 修改

  • get 方法

内部通过 awaitDone 方法对主线程进行阻塞,具体实现如下:

如果主线程被中断,则抛出中断异常;

  1. 判断 FutureTask 当前的 state,如果大于 COMPLETING,说明任务已经执行完成,则直接返回;
  2. 如果当前 state 等于 COMPLETING,说明任务已经执行完,这时主线程只需通过 yield 方法让出 cpu 资源,等待 state 变成 NORMAL;
  3. 通过 WaitNode 类封装当前线程,并通过 UNSAFE 添加到 waiters 链表;
  4. 最终通过 LockSupport 的 park 或 parkNanos 挂起线程;

run 方法

FutureTask.run 方法是在线程池中被执行的,而非主线程

  1. 通过执行 Callable 任务的 call 方法;
  2. 如果 call 执行成功,则通过 set 方法保存结果;
  3. 如果 call 执行有异常,则通过 setException 保存异常;

任务的关闭

shutdown 方法会将线程池的状态设置为 SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完

shutdownNow 做的比较绝,它先将线程池状态设置为 STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的 worker,然后清空任务队列。

更深入理解

为什么线程池不允许使用 Executors 去创建? 推荐方式是什么?

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 各个方法的弊端:

  • newFixedThreadPool 和 newSingleThreadExecutor:   主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
  • newCachedThreadPool 和 newScheduledThreadPool:   主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

推荐方式 1

首先引入:commons-lang3 包

推荐方式 2

首先引入:com.google.guava 包

推荐方式 3

spring 配置线程池方式:自定义线程工厂 bean 需要实现 ThreadFactory,可参考该接口的其它默认实现类,使用方式直接注入 bean 调用 execute(Runnable task)方法即可

配置线程池需要考虑因素

从任务的优先级,任务的执行时间长短,任务的性质(CPU 密集/ IO 密集),任务的依赖关系这四个角度来分析。并且近可能地使用有界的工作队列。

性质不同的任务可用使用不同规模的线程池分开处理:

  • CPU 密集型: 尽可能少的线程,Ncpu+1
  • IO 密集型: 尽可能多的线程, Ncpu*2,比如数据库连接池
  • 混合型: CPU 密集型的任务与 IO 密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。

监控线程池的状态

可以使用 ThreadPoolExecutor 以下方法:

  • getTaskCount() Returns the approximate total number of tasks that have ever been scheduled for execution.
  • getCompletedTaskCount() Returns the approximate total number of tasks that have completed execution. 返回结果少于 getTaskCount()。
  • getLargestPoolSize() Returns the largest number of threads that have ever simultaneously been in the pool. 返回结果小于等于 maximumPoolSize
  • getPoolSize() Returns the current number of threads in the pool.
  • getActiveCount() Returns the approximate number of threads that are actively executing tasks.

参考文章

由于问答代码块插入受限,部分代码未完全展示,若有需要可阅读原文:

【2】、来自网友【Java 编程技术】的最佳回答:

要把 java 线程池理解好并且用好,需要把并发编程的基础知识掌握好,并且把线程池的所有 API 的官方文档仔细阅读研究一遍。这里把优雅的使用线程池的要点总结如下:

1. 弄明白你要用线程池做什么。例如你的目的是把同步 API 改造为异步,还是想要并发请求多个外部服务,还是减少线程的创建和销毁以处理用户请求等。

2. 根据你的实际项目需求,配置好线程池的参数,具体包括 corePoolSize, maximumPoolSize, 阻塞队列, keepAliveTime,线程工厂 和 饱和策略(或者说是 RejectedExecutionHandler)。

3. 根据你的应用特点部署线程池。有的后台服务应用适合在启动的时候一次性创建好线程池,在应用的执行过程不再修改线程池。有的时候,应用适合临时创建一个线程池并且把任务提交进去,用完之后立即销毁。

4. 当你决定不再使用线程池之后,应该调用 shutdown()以优雅的关闭线程池。shutdown 可以保证之前已经提交到线程池中的任务不会被丢弃,保证了数据安全。

5. 当调用了 shutdown 之后,线程池此时已经可能在执行任务,只是关闭了提交任务的入口。如果需要等待线程池完全终止,需要调用 awaitTerminate 以等待线程池把队列中的任务全部处理完成并且清理完成,然后才返回。awaitTerminate 成功返回了,线程池算是真的清理干净了。

总结一下,线程池按照创建(构造方法)、提交任务(execute)、清理(shutdown),等待清理结束(awaitTerminate)的顺序调用 API,这样使用线程池才算优雅。要想用好 java 线程池,最大化优化程序的性能可以参考我公众号里的分析文章。

喜欢 (0)