八、线程池的使用

8.1 在任务与执行策略之间的隐性耦合

Executor框架可以将任务的提交和任务执行策略解耦。但有些任务需要明确指定执行策略:

  • 依赖性任务。大多数行为正确的任务都是独立的:它们不依赖于其他任务的执行时序、执行结果或其他效果。当在线程池中执行独立的任务时,可以随意地改变线程池的大小和配置,这些修改只会对执行性能产生影响。然而,如果提交给线程池的任务需要依赖其他的任务,那么就隐含地给执行策略带来了约束,此时必须小心地维持这些执行策略以避免产生活跃性问题
  • 使用线程封闭机制的任务
  • 对响应时间敏感的任务。
  • 使用ThreadLocal的任务

线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么可能产生死锁。在单线程的Executor中,如果一个任务将另一个任务提交到同一个Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完成,因为它在等待第二个任务的完成。在更大的线程池中,如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,那么会发生同样的问题。这种现象被称为线程饥饿死锁(Thread Starvation Deadlock),只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。

8.2 设置线程池大小

要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU ?多大的内存﹖任务是计算密集型、IO密集型还是二者皆可﹖它们是否需要像JDBC连接这样的稀缺资源﹖如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
对于计算密集型的任务,在拥有N个处理器的系统上,当线程池的大小为Ncpu+1时,通常能实现最优的利用率。对于包含IO操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。要正确地设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值。这种估算不需要很精确,并且可以通过一些分析或监控工具来获得。你还可以通过另一种方法来调节线程池的大小:在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察CPU利用率的水平。

8.3 配置ThreadPoolExecutor

1
2
3
4
5
6
7
8
public ThreadPoolExecutor (int corePoolsize,
int maxirnumPoolsize,
long keepAliverime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue ,
ThreadFactory threadFactory ,
RejectedExecutionHandler handler) { ... }

线程的创建与销毁

线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。

newFixedThreadPool 工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool 工厂方法将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。其他形式的线程池可以通过显式的ThreadPoolExecutor 构造函数来构造。

管理队列任务

在有限的线程池中会限制可并发执行的任务数量。

请求的平均到达速率很稳定,也仍然会出现请求突增的情况。尽管队列有助于缓解任务的突增问题,但如果任务持续高速地到来,那么最终还是会抑制请求的到达率以避免耗尽内存。甚至在耗尽内存之前,响应性能也将随着任务队列的增长而变得越来越糟。

ThreadPoolExecutor 允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。

一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多饱和策略[SaturationPolicy]可以解决这个问题。)在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。

对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。在newCachedThreadPool 工厂方法中就使用了SynchronousQueue。

饱和策略

当有界队列被填满后,饱和策略就开始发挥作用。

“中止(Abort)”策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当新提交的任务无法保存到队列中等待执行时,“抛弃( Discard)”策略会悄悄抛弃该任务。“抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。
“调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

当工作队列被填满后,没有预定义的饱和策略来阻塞execute。然而,通过使用Semaphore(信号量)来限制任务的到达率,就可以实现这个功能。

线程工程

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。

1
2
3
4
5
6
7
8
9
10
11
//自定义的线程工厂。它创建了一个新的MyAppThread实例,并将一个特定于线程池的名字传递给MyAppThread的构造函数,从而可以在线程转储和错误日志信息中区分来自不同线程池的线程。在应用程序的其他地方也可以使用MyAppThread,以便所有线程都能使用它的调试功能。

public class MyThreadFactory implements ThreadFactory {
private final string poolName ;
public MyThreadFactory (string poolName) {
this.poolName = poolName ;
}
public Thread newThread ( Runnable runnable) {
return new MyAppThread (runnable, poolName) ;
}
}

8.4 扩展ThreadPoolExecutor

ThreadPoolExecutor是可扩展的,它提供了几个可以在子类化中改写的方法: beforeExecute、afterExecute和 terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。
在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute。)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。
在线程池完成关闭操作时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated可以用来释放Executor 在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。

示例:给线程池添加统计信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//TimingThreadPool中给出了一个自定义的线程池,它通过beforeExecute、afterExecute和 terminated等方法来添加日志记录和统计信息收集。为了测量任务的运行时间,beforeExecute必须记录开始时间并把它保存到一个afterExecute可以访问的地方。因为这些方法将在执行任务的线程中调用,因此 beforeExecute可以把值保存到一个ThreadLocal变量中,然后由afterExecute来读取。在TimingThreadPool中使用了两个AtomicLong变量,分别用于记录已处理的任务数和总的处理时间,并通过terminated来输出包含平均任务时间的日志消息。
public class TimingThreadPool extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime= new ThreadLocal<Long> ( );
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong() ;
private final AtomicEong totalTime = new AtomicLong ( ) ;

protected void beforeExecute (Thread t, Runnable r){
super.beforeExecute(t, r) ;
log.fine (string.format ( "Thread *s: start %s",t, r)) ;
startTime.set (system.nanoTime ( ) ) ;
}

protected void afterExecute (Runnable r, Throwable t) {
try {
long endTime = system.nanoTime ( ) ;
long taskTime = endTime - startTime.get ( ) ;
numTasks.incrementAndGet() ;
totalTime.addAndGet (taskTime);
log.fine (string.format ( "Thread is: end %s, time=%dns " ,
t, r, taskTime) ) ;
} finally {
super.afterExecute (r, t);
}
}
protected void terminated ( ) {
try {
log.info(string.format ( "Terminated: avg time=%dns" ,
totalTime.get() l numTasks.get ()));
}finally {
super.terminated ( ) ;
}
}
}

8.5 递归算法的并行化

如果循环的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成在继续进行那么可以使用Executor将串行循环转换为并行循环。

1
2
3
4
5
6
7
8
9
10
11
12
//将串行执行转换为并行执行
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e) ;
}
//调用processInParallel 比调用processSequentially能更快地返回,因为processInParaliel 会在所有下载任务都进入了Executor的队列后就立即返回,而不会等待这些任务全部完成。如果需要提交一个任务集并等待它们完成,那么可以使用ExecutorService.invokeAll,并且在所有任务都执行完成后调用CompletionService来获取结果
void processInParallel(Executor exec,List<Element> elements){
for (final Element e : elements)
exec.execute (new Runnable) {
public void run () { process(e) ; }
}) ;
)

在一些递归设计中同样可以采用循环并行化的方法。在递归算法中通常都会存在串行循环,而且这些循环可以进行并行化。一种简单的情况是:在每个迭代操作中都不需要来自于后续递归迭代的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//sequentialRecursive用深度优先算法遍历一棵树,在每个节点上执行计算并将结果放入一个集合。
public<T> void sequentialRecursive(List<Node<T>> nodes,Collection<T> resuits){
for (Node<T> n : nodes) {
results.add (n.compute()) ;
sequentialRecursive(n.getChildren ( ) , results) ;
}
}
//修改后的parallelRecursive同样执行深度优先遍历,但它并不是在访问节点时进行计算,而是为每个节点提交一个任务来完成计算。
public<T> void parallelRecursive (final Executor exec,
List<Node<T>> nodes,
final collection<T> results){
for (final Node<T> n : nodes) {
exec.execute (new Runnable(){
public void run () {
results.add (n .compute() ) ;
});
parallelRecursive (exec, n.getchildren( ), results);
}
}

当parallelRecursive返回时,树中的各个节点都已经访问过了(但是遍历过程仍然是串行的,只有compute调用才是并行执行的),并且每个节点的计算任务也已经放入Executor的工作队列。parallelRecursive的调用者可以通过以下方式等待所有的结果:创建一个特定于遍历过Executor,并使用shutdown和awaitTermination方法。

1
2
3
4
5
6
7
8
9
public<T> collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
Executorservice exec = Executors.newCachedThreadPool ();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>( ) ;
parallelRecursive (exec, nodes, resultQueue) ;
exec.shutdown ( ) ;
exec.awaitTermination (Long.MAx_VALUB,TimeUnit.sECONDS);
return resultQueue ;
}

示例:谜题框架

解决一些谜,这些谜题需要找出一系列的操作从初始状态转换到目标状态

我们将“谜题”定义为:包含了一个初始位置,一个目标位置,以及用于判断是否是有效移动的规则集。规则集包含两部分:计算从指定位置开始的所有合法移动,以及每次移动的结果位置。

1
2
3
4
5
6
7
//谜题的抽象类,其中的类型参数Р和M表示位置类和移动类。根据这个接口,我们可以写一个简单的串行求解程序,该程序将在谜题空间(PuzzleSpace)中查找,直到找到一个解答或者找遍了整个空间都没有发现答案。
public interface Puzzle<P,M>{
P initialPosition ( ) ;
boolean isGoal (P position) ;
set<M> legalMoves(P position) ;
move(P position,M move) ;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//Node代表通过一系列的移动到达的一个位置,其中保存了到达该位置的移动以及前一个Node。只要沿着Node链接逐步回溯,就可以重新构建出到达当前位置的移动序列。
@Immutable
static clase Node<P,M> {
final P pos ;
final Mmove;
final Node<P,M> prev ;

Node(P pos, M move,Node<r,M> prev) {...}
List<M> asMovelist () {
List<M>solution = new LinkedList<M>( ) ;
for (Node<P,M> n = this; n.move != null; n = n.prev)
solution.add(o, n.move) ;
return solution;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//串行解决方案,在谜题空间执行一次深度优先搜索
public class sequentialPuzzlesolver<P,>i
private final Puzzle<P,M> puzzle;
private final set<P> seen = new Hashset<P> () ;
public sequentialPuzzlesolver (Puzzle<P,M> puzzle){
this.puzzle = puzzle ;
}
public List<M> solve() {
pos = puzzle.initialPosition ( ) ;
return search (new Node<P,M>(pos,null, null));
}
private List<M> search (Node<P,M> node) {
if ( !seen.contains (node.pos) ) {
seen.add (node.pos);
if(puzzle.isGoal (node.pos))
return node. asMoveList ( );
for (M move : puzzle.legalMoves(node.pos)) {
P pos = puzzle.move (node. pos,move) ;
Node<P,M> child e new Node<P,M>(pos,move,node);
List<M> result = search (child) ;
if (result != nul1)
return result;
}
}
return null;
}
static class Node<P,M> {}

通过修改解决方案以利用并发性,可以以并行方式来计算下一步移动以及目标条件,因为计算某次移动的过程在很大程度上与计算其他移动的过程是相互独立的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ConcurrentPuzzlesolver<P,M> {
private final Puzzle<P,M>puzzle;
private final Executorservice exec;
private final ConcurrentMap<P,Boolean> seen;
final ValueLatch<Node<P,M>> solution= new ValueLatch<Node<p,M>>() ;
...
public List<M> solve( ) throws InterruptedException {
try {
P p= puzzle.initialPosition ( ) ;
exec.execute (newTask(p, null, null)) ;/阻塞直到找到解答
Node<P,M> solnNode = solution.getvalue () ;
return (solnNode == null) ? null : solnNode .asMoveList();
} finally {
exec. shutdown ( ) ;
}
}
protected Runnable newTask(P p, M m,Node<p ,M> n){
return new solverTask (p, m, n) ;
}
class solverTask extends Node<PM> implements Runnable {
public void run () {
if ( solution.isset() || seen.putIfAbsent (pos, true) != null)
return;//已经找到了解答或者已经遍历了这个位置
if (puzzle.isGoal (pos))
solution.setvalue (this);
else
for (M m : puzzle.legalMoves (pos))
exec.execute (
newTask (puzzle.move (pos, m) , m, this)) ;
}
}
}