线程池相关八股
1. 线程池的执行流程
- 创建的线程池通过submit()或者execute()方法提交任务,其中execute()方法为void类型,参数为Runnable类型,不需要返回值;submit方法为Future类型,有返回值和类型,其参数submit(Callable
task) 或者 submit(Runnable task, T result) 或者 submit(Runnable task)
- 线程池首先创建核心线程来执行任务
- 如果核心线程都在执行任务,并且有新任务需要执行,则放入阻塞队列中
- 如果阻塞队列满了,则开启其他非核心线程进行任务,前提不超过设置的最大线程数
- 若线程数量已经到达最大线程数,且阻塞队列已经满了,则执行拒绝策略。
2. 线程池的核心参数
通过查看源码可知,ThreadPoolExecutor的构造方法一共四个,但是都包括了核心的五大参数:核心线程数、最大线程数、非核心线程的存活时间、时间单位、阻塞队列
还有两个参数分别是线程工厂和拒绝策略
3. 线程池的拒绝策略
线程池主要有四大拒绝策略:
ThreadPoolExecutor.DiscardPolicy
:直接丢弃策略ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列中最早的任务ThreadPoolExecutor.AbortPolicy
:抛出异常RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
: 调用者执行
那么如何选择对应的淘汰策略呢?
-
线程池默认拒绝策略是抛出异常策略,适用于希望立即得知线程池无法处理更多任务的场景,例如高可靠性系统,要求开发者显式地处理任务拒绝情况。
具体场景:在银行交易系统中,每个交易请求都需要保证被处理。如果线程池无法接受更多的交易任务,这可能意味着系统过载或出现问题。在这种情况下,立即抛出异常可以让开发者快速捕捉到问题,并采取相应措施(如扩展线程池、报警等),以确保交易请求不会丢失。
-
丢弃策略则允许有任务丢失的情况,适用于对任务执行要求不高的场景。
具体场景: 在某些网络监控系统中,监控数据可能是每秒产生的高频率数据,这些数据可能仅在短时间内有用。如果系统过载,且未来数据会覆盖旧数据,可以选择
DiscardPolicy
,直接丢弃无法处理的任务。这种情况下丢失部分数据是可以接受的,因为新的数据会迅速覆盖旧的数据。 -
调用者运行,适用于对任务执行有严格要求的场景,例如不允许丢失或失败的场景,对性能要求不高,或者适用于希望降低任务提交速率的场景。通过让提交任务的线程执行任务,可以减缓任务提交的速度,防止线程池过载。
应用场景: 在一个高并发的日志记录系统中,日志数据的产生速度可能超过系统的处理能力。如果使用
CallerRunsPolicy
,当线程池满了之后,提交日志的线程将自己执行记录任务。这种方式能够有效降低日志生成的速度,避免线程池过载,并确保日志不会丢失。 -
丢弃最旧,适用于需要保证最新任务优先处理的场景,特别是当系统对新任务有更高优先级要求时。
应用场景: 在一个实时数据处理系统中,最新的数据通常比旧的数据更重要。比如股票行情系统,最新的行情数据最为关键。如果线程池任务满了,可以使用
DiscardOldestPolicy
丢弃最老的未处理数据,从而腾出空间处理最新的任务,确保系统能够尽快处理新产生的数据。
关于自定义拒绝策略:
需要实现RejectedExecutionHandler
接口以及其唯一的方法rejectedExecution
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
其中r为待执行任务,executor为线程池。
例如可以自定义当队列满时,调用BlockingQueue.put来实现生产者的阻塞。
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
// should not be interrupted
}
}
}
};
4. 线程池的阻塞队列
ArrayBlockingQueue,有界队列,底层基于数组实现
LinkedBlockingQueue,无界队列,基于链表
DelayQueue,延迟队列
PriorityBlockingQueue,无界队列,有优先级
SynchronousQueue,无元素存储的阻塞队列
回复