六、任务执行

6.1 在线程中执行任务

大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。Web服务器、邮件服务器、文件服务器、EJB容器以及数据库服务器等,这些服务器都能通过网络接受远程客户的连接请求。将独立的请求作为任务边界,既可以实现任务的独立性,又可以实现合理的任务规模。

串行的执行任务

在单个线程中串行执行各项任务。

1
2
3
4
5
6
7
8
9
class singleThreadwebServer {
public static void main (string[] args) throws IOBxception {
serversocket socket = new serverSocket (8o ) ;
while (true){
socket connection = socket .accept ( ) ;
handleRequest(connection) ;
}
}
}

它每次只能处理一个请求。在单线程的服务器中,阻塞不仅会推迟当前请求的完成时间,而且还将彻底阻止等待中的请求被处理。如果请求阻塞的时间过长,用户将认为服务器是不可用的,因为服务器看似失去了响应。同时,服务器的资源利用率非常低,因为当单线程在等待I/O操作完成时;CPU将处于空闲状态。
在服务器应用程序中,串行处理机制通常都无法提供高吞吐率或快速响应性。也有一些例外,例如,当任务数量很少且执行时间很长时,或者当服务器只为单个用户提供服务,并且该客户每次只发出一个请求时——但大多数服务器应用程序并不是按照这种方式来工作的。

显示地为任务创建线程

通过为每个请求创建一个线程来提供服务,实现更高的相应性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ThreadPerTaskwebserver {
public static void main (string[] args) throws IOException {
serversocket socket - new serversocket (8o) ;
while (true) {
finalSocket connection = socket.accept ( ) ;
Runnable task = new Runnable(){
public void run (){
handleRequest(connection);
}
};
new Thread(task).start();
}
}
}

ThreadPerTaskWebServer在结构上类似于前面的单线程版本——主线程仍然不断地交替执行“接受外部连接”与“分发请求”等操作。区别在于,对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。由此可得出3个主要结论:

  • 任务处理过程从主线程中分离出来,使得主循环能够更快地重新等待下一个到来的连接。这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性。
  • 任务可以并行处理,从而能同时服务多个请求。
  • 任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码。

无限制创建线程的不足

当创建大量线程时:

  • 线程生命周期开销非常高。线程创建和销毁不是没有代价的。

  • 资源消耗。活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可月处理器的数量,那么有些线程将闲置。如果你已经拥有足够多的线程使所有CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。

  • 稳定性。在可创建线程的数量上存在一个限制。这个限制值将随着平台的不同而不同,并且受多个因素制约,包括JVM的启动参数、Thread构造函数中请求的栈大小,以及底层操竹系统对线程的限制等。如果破坏了这些限制,那么很可能抛出OutOfMemoryError异常。

    在一定的范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建更多的线程只会降低程序的执行速度,“为每个任务分配一个线程”这种方法的问题在于,它没有限制可创建线程的数量,只限制了远程用户提交HTTP请求的速率。

6.2 Executor框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。juc提供了一种灵活线程池实现作为Executor框架的一部分。

1
2
3
public interface Executor {
void execute ( Runnable command) ;
}

Executor是一个接口。为异步任务执行框架提供了基础。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
Executor基于生产者–消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者。

示例:基于Executor的Web服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
classraskExecutionwebserver {
private static final int NTHREADS = 100;

private static final Executor exec = executors.newFixedThreadPool(NTHREADS);

public static void main (string [] args) throws IOException {
serversocket socket = new serversocket (80) ;
while (true) {
final socket connection = socket.accept ( );
Runnable task = new Runnable () {
public void run () {
handleRequest (connection) ;
}
};
exec.execute(task) ;
}
}
}

通过使用Executor,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的Executor实现,就可以改变服务器的行为。改变Executor实现或配置所带来的影响要远远小于改变任务提交方式带来的影响。通常,Executor的配置是一次性的,因此在部署阶段可以完成,而提交任务的代码却会不断地扩散到整个程序中,增加了修改的难度。
我们可以很容易地将TaskExecutionWebServer修改为类似ThreadPerTaskWebServer的行为,只需使用一个为每个请求都创建新线程的Executor。

1
2
3
4
5
public class ThreadPerTaskExecutor implements Executor {
public void execute (Runnable r){
new Thread(r).start () ;
};
}

执行策略

通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的“What、Where、When、How”等方面,包括:

  • 什么( What)线程中执行任务?
  • 任务按照什么( What)顺序执行(FIFO、LIFO、优先级)?
  • 有多少个(How Many)任务能并发执行?
  • 在队列中有多少个(How Many)任务在等待执行?
  • 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个(Which)任务?另外,如何(How)通知应用程序有任务被拒绝?
  • 在执行一个任务之前或之后,应该进行哪些(What)动作?
    各种执行策略都是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求。

每当看到下面这种形式的代码时:new Thread(runnable) .start()虑使用Executor来代替Thread。

线程池

管理一组同构工作线程的资源池,线程池与工作队列相关,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
“在线程池中执行任务”比“为每个任务分配一个线程”优势更多。通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。

同构Executors静态工厂方法来创建一个线程池:

newFixedThreadPool。创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)。
newCachedThreadPool。创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
newSingleThreadExecutor。是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(例如FIFO、LIFO、优先级)。
newScheduledThreadPool。创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务
newFixedThreadPool和newCachedThreadPool这两个工厂方法返回通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor。

为每个任务分配一个线程变成基于线程池的策略,由于服务器不会创建数千个线程来争夺有限的CPU和内存资源,因此服务器的性能将平缓地降低。通过使用Executor,可以实现各种调优、管理、监视、记录日志、错误报告和其他功能

Executor的生命周期

Executor的实现通常会创建线程来执行任务。但JVM只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。

Executor以异步方式来执行任务,提交任务的状态不是立即可见的。Executor是为应用程序提供服务的,因而它们也是可关闭的(无论采用平缓的方式还是粗暴的方式),并将在关闭操作中受影响的任务的状态反馈给应用程序。
为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。

1
2
3
4
5
6
7
8
public. interface ExecutorService extends Executor {
void shutdown () ;
List<Runnable> shutdownNow () ;
boolean isShutdown () ;
boolean isTerminated () ;
boolean awaitTermination (long timeout,TimeUnit unit) throws InterruptedException;
//……其他用于任务提交的便利方法
}

ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成—―包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
在ExecutorService关闭后提交的任务将由“拒绝执行处理器(Rejected Execution Handler)”来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。等所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果

延迟任务与周期任务

Timer类负责管理延迟任务(“在100ms后执行该任务”)以及周期任务(“每10ms执行一次该任务”)。然而,Timer存在一些缺陷,因此应该考虑使用ScheduledThreadPoolExecutor 来代替它。可以通过构造函数或newScheduledThreadPool 工厂方法来创建该类的对象。
Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。
Timer的另一个问题是,如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。Timer线程并不捕获异常,因此当TimerTask 抛出未检查的异常时将终止定时线程。这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。因此,已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。这个问题称之为“线程泄漏“。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class OutOfTime {
//你认为程序6s结束,但是1s就结束了
public static void main (string[] args) throws Exception {
Timer timer = new Timer () ;
timer.schedule(new ThrowTask ( ) , 1) ;
SECONDs.sleep (1);
timer.schedule (new ThrowTask ( ) , 1) ;
SECONDs. sleep (5) ;
}
static class ThrowTask extends TimerTask {
public void run ( ) { throw new RuntimeException( ) ; }
}
}

在Java 5.0或更高的JDK中,将很少使用Timer。
如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue 中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。

6.3 找出可利用的并行性

如果要使用Executor,必须将任务表示为一个Runnable.大多数服务器应用程序存在一个边界:单个用户请求,在单个用户请求中仍存在并行性,如数据库服务器。

示例:串行的页面渲染器

最简单的方法就是对HTML文档进行串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,先通过网络获取它,然后再将其绘制到图像缓存中。这很容易实现,程序只需将输入中的每个元素处理一次(甚至不需要缓存文档),但这种方法可能会令用户感到烦恼,他们必须等待很长时间,直到显示所有的文本。
另一种串行执行方法更好一些,它先绘制文本元素,同时为图像预留出矩形的占位空间,在处理完了第一遍文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。

1
2
3
4
5
6
7
8
9
10
11
12
//串行渲染页面
public class singleThreadRenderer {
void renderPage (charsequence source){
renderText ( source) ;
List<ImageData> imageData = new ArrayList<ImageData>();
for (rmageInfo imageInfo : scanForImageInfo (source))
imageData.add (imageInfo.downloadImage ( )) ;
for ( ImageData data : imageData)
renderImage (data) ;
}
}

携带结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象:它认为主入口点(即call)将返回一个值,并可能抛出一个异常。
Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个生命周期阶段﹔创建、提交、开始和完成。由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在 Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。.当某个任务完成后,它就永远停留在“完成”状态上。
get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。

1
2
3
4
5
6
7
8
9
10
11
12
public interface callable<V> {
V call ( ) throws Exception ;
}
public interface Future<V> {
boolean cancel (boolean mayInterruptIfRunning) ;
boolean isCancelled ( );
boolean isDone ( ) ;
V get ( ) throws InterruptedException, ExecutionException, cancellationException;
V get (long timeout,TimeUnit unit)
throws InterruptedException,ExecutionException,
cancellationException,TimeoutException ;
}

可以通过许多种方法创建一个Future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future月来获得任务的执行结果或者取消任务。还可以显式地为某个指定的Runnable或Callable实例化一个FutureTask。

示例:使用Future 实现页面渲染器

首先将渲染过程分解为两个任务,一个是渲染所的文本,另一个是下载所有的图像。(因为其中一个任务是CPU密集型,而另一个任务是I/O密集型,因此这种方法即使在单CPU 系统上也能提升性能。)
Callable和Future有助于表示这些协同任务之间的交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FutureRenderer {	
private final ExecutorService executor = ... ;

void renderPage (charsequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
//创建了一个Callable来下载所有的图像,并将其提交到一个ExecutorService。
Callable<List<ImageData>> task =new Callable<List<ImageData>> ( ){
public List<ImageData> call() {
List<ImageData> result = new ArrayList<ImageData> ( ) ;
for ( ImageInfo imageInfo : imageInfos)
result.add (imageInfo. downloadImage ( ) );
return result;
}
};
//返回一个描述任务执行情况的Future。当主任务需要图像时,它会等待Future.get的调用结果。如果幸运的话,当开始请求时所有图像就已经下载完成了,即使没有,至少图像的下载任务也已经提前开始了。
Future<List<ImageData>> future = executor.submit (task);
renderText (source) ;
try {
List<ImageData> imageData =future.get ();
for ( ImageData data : imageData)
renderImage(data) ;
} catch(InterruptedException e) {
//重新设置线程的中断状态
Thread.currentThread () .interrupt ( );//由于不需要结果,因此取消任务
future.cancel (true) ;
}catch (ExecutionException e) {
throw launderThrowable(e.getCause ());
}
}
}

get方法拥有“状态依赖”的内在特性,因而调用者不需要知道任务的状态,此外在任务提交和获得结果中包含的安全发布属性也确保了这个方法是线程安全的。Future.get 的异常处理代码将处理两个可能的问题:任务遇到一个Exception,或者调用get的线程在获得结果之前被中断
FutureRenderer使得渲染文本任务与下载图像数据的任务并发地执行。当所有图像下载完后,会显示到页面上。这将提升用户体验,不仅使用户更快地看到结果,还有效利用了并行性,但我们还可以做得更好。用户不必等到所有的图像都下载完成,而希望看到每当下载完一幅图像时就立即显示出来。

在异构任务并行化中查找的局限

我们尝试并行地执行两个不同类型的任务——下载图像与渲染页面。然而,通过对异构任务进行并行化来获得重大的性能提升是很困难的。
两个人可以很好地分担洗碗的工作:其中一个人负责清洗,而另一个人负责烘干。然而,要将不同类型的任务平均分配给每个工人却并不容易。当人数增加时,如何确保他们能帮忙而不是妨碍其他人工作,或者在重新分配工作时,并不是容易的事情。
当在多个工人之间分配异构的任务时,还有一个问题就是各个任务的大小可能完全不同。如果将两个任务A和B分配给两个工人,但A的执行时间是B的10倍,那么整个过程也只能加速9%。最后,当在多个工人之间分解任务时,还需要一定的任务协调开销:为了使任务分解能提高性能,这种开销不能高于并行性实现的提升。
FutureRenderer使用了两个任务,其中一个负责渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像的速度(可能性很大),那么程序的最终性能与串行执行时的性能差别不大,而代码却变得更复杂了。当使用两个线程时,至多能将速度提高一倍。因此,虽然做了许多工作来并发执行异构任务以提高并发度,但从中获得的并发性却是十分有限的。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的.工作负载分配到多个任务中带来的真正性能提升。

CompletionServiceExecutor与 BlockingQueue

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout 指定为0从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService)。
CompletionService将Executor和 BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的 take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。

示例:使用CompletionService实现页面渲染器

可以通过CompletionService从两个方面来提高页面渲染器的性能:缩短总运行时间以及提高响应性。为每一幅图像的下载都创建一个独立任务,并在线程池中执行它们,从而将串行的下载过程转换为并行的过程:这将减少下载所有图像的总时间。此外,通过从CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来,能使用户获得一个更加动态和更高响应性的用户界面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Renderer {
private final ExecutorService executor;
Renderer (Executorservice executor) { this.executor = executor; }

void renderPage (Charsequence source){
List<ImageInfo> info = scanForImageInfo (source) ;
Completionservice<ImageData> completionservice = new Executorcompletionservice<ImageData> (executor);
//为每个图片都开启一个线程
for (final ImageInfo imageInfo : info)
completionservice.submit (new callable<ImageData> ( ) {
public ImageData call (){
return imageinfo.downloadImage ( ) ;
}
}) ;
renderText (source) ;
try {
//陆续加载
for (int t = 0, n = info.size( ) ; t < n;t++){
Future<ImageData> f = completionservice.take ( );
ImageData imageData = f.get ( ) ;
renderImage (imageData) ;
}
} catch (InterruptedException e){
Thread.currentThread( ).interrupt ();
} catch (ExecutionException e){
throw launderThrowable(e.getCause ());
}
}
}

为任务设置时限

在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的Future.get中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出 TimeoutException。
在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后中止执行或取消任务。此时可再次使用Future,如果一个限时的get方法抛出了TimeoutException,那么可以通过Future来取消任务。如果编写的任务是可取消的,那么可以提前中止它,以免消耗过多的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Page renderPagewithAd ( ) throws InterruptedException {
long endNanos = system.nanoTime ( ) +TIME_BUDGET;
//它将获取广告的任务提交给一个Executor
Future<Ad> f = exec.submit (new FetchAdTask ( )) ;
//在等待广告的同时显示页面
Page page = renderPageBody ( );
Ad ad;
try {
//只等待指定的时间长度
long timeLeft = endNanos - system.nanoTime ( ) ;
ad = f.get (timeLeft,NANOSECONDS) ;
//如果get超时,那么将取消广告获取任务,并转而使用默认的广告信
} catch (ExecutionException e){
ad = DEFAULT_AD;
}catch (TimeoutException e){
ad = DEFAULT_AD;
f.cancel (true) ;
}
page.setAd (ad);
return page;
}

示例:旅行预订门户网站

考虑这样一个旅行预定门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条航线、旅店或汽车租赁公司的报价。在获取不同公司报价的过程中,可能会调用Web服务、访问数据库、执行一个EDI事务或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的信息。对于没有及时响应的服务提供者,页面可以忽略它们,或者显示–个提示信息,例如“Did not hear from Air Java in time。”
从一个公司获得报价的过程与从其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到一个线程池,保留n个 Future,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法—-—invokeAll。
支持限时的invokeAll,将多个任务提交到一个ExecutorService并获得结果。InvokeAll方法的参数为一组任务,并返回一组Future。这两个集合有着相同的结构。invokeAll 按照任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示的Callable 关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,invokeAll将返回。当超过指定时限后,任何还未完成的任务都会取消。当invokeAll返回后,每个任务要么正常地完成,要么被取消,而客户端代码可以调用get 或isCancelled来判断究竟是何种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private class QuoteTask implements Callable<TravelQuote> {
private final Travelcompany company ;
private final TravelInfo travelInfo;
...
public TravelQuote call () throws Exception {
return company.solicitQuote (travelInfo) ;
}
public List<TravelQuote> getRankedTravelQuotes (
TravelInfo travelInfo,Set<Travelcompany> companies,
Comparator<TravelQuote> ranking,long time,TimeUnit unit)
throws InterruptedException {
//去那几个公司网站查找
List<QuoteTask> tasks = new ArrayList<QuoteTask>( );
for (Travelcompany company : companies)
tasks.add(new QuoteTask(company, travelInfo) ) ;
//Future列表
List<Future<TravelQuote>> futures =exec.invokeAll(tasks, time, unit);

List<TravelQuote> quotes =new ArrayList<TravelQuote>(tasks.size ()) ;

Iterator<QuoteTask> taskIter = tasks.iterator () ;
//遍历每一个Future,执行相应的搜索任务
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next ( ) ;
try {
//将每一个搜索结果添加到最后的列表中
quotes.add(f.get ()) ;
}catch (ExecutionException e){
quotes.add (task.getFailureQuote (e.getCause ())) ;
}catch (cancellationException e) {
quotes.add (task.getTimeoutQuote(e) ) ;
}
}
Collections.sort (quotes, ranking) ;
return quotes ;
}

七、取消与关闭

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java没有提供任何机制来安全地终止线程。但它提供了中断(Interruption),这是一种协作机制,能够使-一-个线程终止另一个线程的当前工作。
这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用-一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。

7.1 任务取消

如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的(Cancellable)。取消某个操作的原因很多:

  • 用户请求取消。用户点击图形界面程序中的“取消”按钮,或者通过管理接口来发出取消请求,例如JMX (Java Management Extensions)。
  • 有时间限制的操作
  • 应用程序事件。例如,应用程序对某个问题空间进行分解并搜索,从而使不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案时,所有其他仍在搜索的任务都取消
  • 错误
  • 关闭。当一个程序或服务关闭时,必须对正在处理和等待处理的工作执行某种操作。在平缓的关闭过程中,当前正在执行的任务将继续执行直到完成,而在立即关闭过程中,当前的任务则可能取消。

其中一种协作机制能设置某个“已请求取消(Cancellation Requested)”标志,而任务将定期地查看该标志。如果设置了这个标志,那么任务将提前结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Threadsafe
public class PrimeGenerator implements Runnable {
@GuardedEy ( "this" )
private final Eist<BigInteger> primes = new ArrayList<BigInteger> ();
private volatile boolean cancelled;
public void run ( ) {
BigInteger p = BigInteger.ONE;
while (!cancelled){
p = p.nextProbablePrime ();
synchronized (this) {
primes.add(p) ;
}
}
}
//设置cancelled标志
public void cancel ( ) { cancelled = true;}
public synchronized List<BigInteger> get ( ) {
return new ArrayList<BigInteger>(primes) ;
}
}

即让素数生成器运行1秒钟后取消。素数生成器通常并不会刚好在运行1秒钟后停止,因为在请求取消的时刻和run方法中循环执行下一次检查之间可能存在延迟。cancel方法由finally块调用,从而确保即使在调用sleep时被中断也能取消素数生成器的执行。如果cancel没有被调用,那么搜索素数的线程将永远运行下去,不断消耗CPU的时钟周期,并使得JVM不能正常退出。

1
2
3
4
5
6
7
8
9
10
List<BigInteger> asecondofPrimes() throws InterruptedException
PrimeGenerator generator = new PrimeGenerator ();
new Thread(generator).start( ) ;
try {
SECONDS.sleep(1) ;
}finally {
generator.cancel();
}
return generator.get() ;
}

一个可取消的任务必须拥有取消策略,其他代码如何(How)请求取消该任务,任务在可时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。
PrimeGenerator使用了一种简单的取消策略:客户代码通过调用cancel来请求取消,primeGenerator在每次搜索素数前首先检查是否存在取消请求,如果存在则退出。

中断

PrimeGenerator中的取消机制最终会使得搜索素数的任务退出,但在退出过程中需要花费一定的时间。然而,如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
生产者线程生成素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满,put方法也会阻塞。当生产者在put方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况﹖它可以调用cancel方法来设置cancelled标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态)。*/
class BrokenPrimeProducer extends Thread {
private final B1ockingQueue<BigInteger> queue ;
private volatile boolean cance1led = false ;
BrokenPrimeProducer (B1ockingQueue<BigInteger> queue){
this.queue = queue ;
}
public void run () {
try {
BigInteger p = BigInteger.ONE;
while ( !cancelled)
queue.put (p = p.nextProbablePrime()) ;
} catch (InterruptedException consumed){ }
}
public void cancel() { cancelled = true;}
}
void ConsumePrimes () throws InterruptedException {
BlockingQueue<BigInteger> primes = ...;
BrokenPrimeProducer producer = new BrokenPrimeProducer (primes);
producer.start ( ) ;
try {
while (needMorePrimes ())
consume (primes.take ());
}finally {
producer.cancel ();
}
}

线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程。每个线程都有一个boolean的中断标志,当中断线程时,这个线程的中断状态将被设置为true。在Thread中包含了中断线程以及查询线程中断状态的方法,interrupt方法能中断目标线程,而isInterrupted方法能返回目标线程的中断状态。静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

对中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些时刻也被称为取消点)。有些方法,例如wait、sleep和join等,将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计糟糕的方法可能会屏蔽中断请求,从而导致调用栈中的其他代码无法对中断请求作出响应。
在使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态。如果在调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出InterruptedException,或者通过再次调用interrupt来恢复中断状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//使用中断而不是boolean标志来取消
class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue){
this.queue = queue ;
}
public void run ( ) {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread( ).isInterrupted())
queue.put (p = p.nextProbablePrime()) ;
}catch(InterruptedException consumed){
/*允许线程退出*/
}
}
public void cancel ( ) { interrupt ( ) ; }
}

中断策略

中断策略规定线程如何解释某个中断请求,当发现中断请求时,应做哪些工作,哪些工作单元对于中断来说是原子操作,以及多快速度来相应中断。

相应中断

当调用可中断的阻塞函数时,例如Thread.sleep或BlockingQueue.put等,有两种实用策略可用于处理InterruptedException;

  • 传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
  • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//对于一些不支持取消但仍然可以调用可中断阻塞的方法,他们必须在循环中调用这些方法,并在中断后重新尝试,如果过早的设置中断状态,就会引起无线循环因为大多数可中断的阻塞方法都会在入口检查中断
public Task getNextTask (BlockingQueue<Taskgt ; queue) {
boolean interrupted = false ;
try {
while (true) {
try {
return queue.take () ;
}catch (InterruptedException e) {
interrupted = true ;
//重新尝试
}
}
} finally {
if (interrupted)
Thread.currentThread ( ) .interrupt () ;
}
}

示例:计时运行

许多问题永远无法解决,例如枚举所有的素数。

在执行任务时的另一个方面是,你希望知道在任务执行过程中是否会抛出异常如果PrimeGenerator在指定时限内抛出了一个未检查的异常,那么这个异常可能会被忽略,因为素数生成器在另一个独立的线程中运行,而这个线程并不会显式地处理异常。
在程序清单7-8中给出了在指定时间内运行一个任意的Runnable的示例。它在调用线程中运行任务,并安排了一个取消任务,在运行指定的时间间隔后中断它。这解决了从任务中抛出未检查异常的问题,因为该异常会被timedRun的调用者捕获。

1
2
3
4
5
6
7
8
9
//在外部线程中安排中断
private static final scheduledBxecutorservice cancelExec = ...;
public static void timedRun (Runnable r,long timeout,TimeUnit unit) {
final Thread taskThread = Thread.currentThread ( );
cancelExec.schedule (new Runnable() {
public void run () { taskThread.interrupt ();}
}, timeout, unit) ;
r.run () ;
}

通过Future来实现取消

ExecutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void timedRun (Runnable r,
long timeout, TimeUnit unit)throws InterruptedException {
//将任务提交给ExecutorService
Future<?> task = taskExec.submit(r) ;
try {
//通过定时任务Future.get来获得结果
task.get (timeout, unit);
}catch (TimeoutException e) {
//接下来任务将被取消
} catch(ExecutionException e){
//如果在任务中抛出了异常,那么重新抛出该异常
throw launderThrowable(e.getCause ( ));
}finally {
//如果任务已经结束,那么执行取消操作也不会带来任何影响
task.cancel (true);/如果任务正在运行,那么将被中断
}
}

当Future.get 抛出 InterruptedExceptionTimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

处理不可中断的阻塞

对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止线程,但要求我们知道线程阻塞的原因:

  • Java.io包中的同步Socket l/O。在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和 write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。

  • Java.io包中的同步IO。

  • Selector的异步IO。如果一个线程在调用Selector.select方法(在java.nio.channels 中)时阻塞了,那么调用close或wakeup方法会使线程抛出 ClosedSelectorException 并提前返回。

  • 获取某个锁。如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ReaderThread管理了一个套接字连接,它采用同步方式从该套接字中读取数据,并将接收到的数据传递给processBuffer。为了结束某个用户的连接或者关闭服务器,ReaderThread改写了interrupt方法,使其既能处理标准的中断,也能关闭底层的套接字。因此,无论ReaderThread线程是在read方法中阻塞还是在某个可中断的阻塞方法中阻塞,都可以被中断并停止执行当前的工作。
public class ReaderThread extends Thread {
private final socket socket ;
private final Inputstream in;

public ReaderThread(Socket socket) throws IOException {
this.socket = socket ;
this.in = socket.getInputstream ( );
}
public void interrupt (){
try {
socket.close( ) ;
}catch (IOException ignored){}
finally {
super.interrupt ( );
}
public void run (){
try {
byte[]buf = new byte [BUFSZ];
while (true){
int count = in.read ( buf) ;
if (count < 0 )
break ;
else if (count > o)
processBuffer (buf,count) ;
}
}catch (IOException e) { /*允许线程退出*/}
}
}

采用newTaskFor来封装非标准的取消

当把一个 Callable提交给ExecutorService时,submit方法会返回一个Future,我们可以通过这个Future来取消任务。newTaskFor是一个工厂方法,它将创建Future来代表任务。newTaskFor还能返回一个RunnableFuture接口,该接口扩展了Future和 Runnable (并由FutureTask实现)。
通过定制表示任务的Future可以改变Future.cancel 的行为。例如,定制的取消代码可以实现日志记录或者收集取消操作的统计信息,以及取消一些不响应中断的操作。通过改写interrupt方法,ReaderThread可以取消基于套接字的线程。同样,通过改写任务的Future.cancel方法也可以实现类似的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//定义了一个CancellableTask接口,该接口扩展了Callable,并增加了一个cancel方法和一个newTask 工厂方法来构造RunnableFuture。
public interface CancellableTask<T> extends callable<T> {
void cancel ( ) ;
RunnableFuture<T> newTask ( );
}
//CancellingExecutor扩展了ThreadPoolExecutor,并通过改写newTaskFor使得CancellableTask可以创建自己的Future。
@Threadsafe
public class CancellingExecutor extends ThreadPoolExecutor {
...
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
if (callable instanceof cancellableTask)
return ( (cancellableTask<T>) callable) .newTask ();
else
return super.newTaskFor (callable) ;
}
}
//SocketUsingTask实现了CancellableTask,并定义了Future.cancel来关闭套接字和调用super.cancel。如果SocketUsingTask通过其自己的 Future来取消,那么底层的套接字将被关闭并且线程将被中断。因此它提高了任务对取消操作的响应性∶不仅能够在调用可中断方法的同时确保响应取消操作,而且还能调用可阻调的套接字IO方法。
public abstract class SocketUsingTask<T>
implements CancellableTask<T> {
@GuardedBy ( "this" ) private Socket socket ;

protected synchronized void setSocket (Socket s) { socket = s;}

public synchronized void cancel ( ){
try {
if (socket != null)
socket.close ( ) ;
} catch (IOException ignored){ }
}
public RunnableFuture<T> newTask (){
return new FutureTask<T>(this) {
public boolean cancel (boolean mayInterruptIfRunning){
try {
SocketUsingTask.this.cancel( ) ;
}finally {
return super.cancel( mayInterruptIfRunning) ;
}
}
};
}
}

7.2 停止基于线程的服务

应用程序通常会创建拥有多个线程的服务,例如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。
正确的封装原则是:除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等。在线程API中,并没有对线程所有权给出正式的定义:线程由Thread对象表示,并且像其他对象一样可以被自由共享。然而,线程有一个相应的所有者,,即创建该线程的类。因此线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。
与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法(Lifecycle Method)来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。在ExecutorScevice 中提供了shutdown和 shutdownNow等方法。同样,在其他拥有线程的服务中也应该提供类似的关闭机制。

示例:日志服务

将调用log方法将日志放入某个队列中,交给其他线程来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//LogWriter中给出了一个简单的日志服务示例,其中日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接写入输出流,而是由LogWrite通过BlockingQueue将消息提交给日志线程,并由日志线程写入。这是一种多生产者单消费者(Multiple-Producer,Single-Consumer)的设计方式:每个调用log 的操作都相当于一个生产者,而后台的日志线程则相当于消费者。如果消费者的处理速度低于生产者的生成速度,那么BlockingQueue将阻塞生产者,直到日志线程有能力处理新的日志消息。
public class LogWriter {
private final BlockingQueue<string> queue;
private final LoggerThread logger;
public Logwriter (Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start () { logger.start (); }

public void log(string msg) throws InterruptedException {
queue.put (msg) ;
}
private class LoggerThread extends Thread {
private final Printwriter writer;
...
public void run ( ) {
try {
while (true)
writer.println (queue.take ( ) ) ;
}catch ( InterruptedException ignored){

} finally {
writer.close ( ) ;
}
}
}
}

需要一种终止日志线程的方法,避免使JVM无法正常关闭。

要停止日志线程是很容易的,因为它会反复调用take,而take能响应中断。如果将日志线程修改为当捕获到InterruptedException时退出,那么只需中断日志线程就能停止服务。
然而,如果只是使日志线程退出,那么还不是一种完备的关闭机制。这种直接关闭的做法会丢失那些正在等待被写入到日志的信息,不仅如此,其他线程将在调用log时被阻塞,因为日志消息队列是满的,因此这些线程将无法解除阻塞状态。当取消一个生产者-消费者操作时,需要同时取消生产者和消费者。在中断日志线程时会处理消费者,但在这个示例中,由于生者并
另一种关闭LogWriter的方法是:设置某个“已请求关。在收到关闭请求后,消费者会把队列中的所有消息写入日志,解除所有在调用log时阻塞的生产者。然而,在这个方法中存在着竞态条件问题,使得该方并不可靠。log 的实现是一种“先判断再运行”的代码序列:生产者发现该服务还没有关闭,此在关闭服务后仍然会将日志消息放入队列,这同样会使得生产者可能在调用log时阻塞并无法解除阻塞状态。可以通过一些技巧来降低这种情况的发生概率

为LogWriter提供可靠关闭操作的方法是解决竞态条件问题,因而要使日志消息的提交操作成为原子操作。然而,我们不希望在消息加入队列时去持有一个锁,因为put方法本身就可以阻塞。我们采用的方法是:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提交消息的权利。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Logservice {
private final BlockingQueue<String> queue ;
private final LoggerThread loggerThread;
private final Printwriter writer ;
//是否关闭
@GuardedBy ( "this" ) private boolean isShutdown ;
//计数器
@GuardedBy ( "this" ) private int reservations ;

public void start () { loggerThread.start ( ) ; }

public void stop (){
synchronized (this) { isShutdown = true; }
loggerThread.interrupt();
}

public void log(string msg) throws InterruptedBxception {
synchronized (this) {
if(isShutdown)
throw new IllegalstateException (...) ;
++reservations ;
}
gueue.put (msg);
}
private class LoggerThread extends Thread {
public void run () {
try {
while (true){
try {
//原子方式来检查关闭请求
synchronized (Logservice.this){
if (isshutdown && reservations ==o)
break ;
}
String msg = queue.take () ;
//记录一个就
synchronized (LogService.this) { --reservations;} writer.println (msg) ;
}catch (InterruptedException e){/*retry */ }
}
}finally {
writer.close ( ) ;
}
}
}
}

关闭ExecutorService

ExecutorService提供了两种关闭方法:使用shutdown正常关闭,以及使用shutdownNow强行关闭。在进行强行关闭时,shutdownNow首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。
这两种关闭方式的差别在于各自的安全性和响应性:强行关闭的速度更快,但风险也更大,因为任务很可能在执行到一半时被结束﹔而正常关闭虽然速度慢,但却更安全,因为ExecutorService 会一直等到队列中的所有任务都执行完成后才关闭。在其他拥有线程的服务中也应该考虑提供类似的关闭方式以供选择。
简单的程序可以直接在main 函数中启动和关闭全局的ExecutorService。而在复杂程序中,通常会将ExecutorService封装在某个更高级别的服务中,并且该服务能提供其自己的生命周期方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//它将管理线程的工作委托给一个 ExecutorService,而不是由其自行管理。通过封装ExecutorService,可以将所有权链(Ownership Chain)从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。
public class LogService{
private final ExecutorService exec = new SingleThreadExecutor( ) ;:..
public void start () { }
public void stop () throws InterruptedException {
try {
exec.shutdown ();
exec.awaitTermination (TIMEOUr,UNIT);
}finally {
writer.close () ;
}
}
public void log (string msg) {
try {
exec.execute (new writeTask (msg) ) ;
}catch (RejectedExecutionException ignored){ }
}
}

毒丸对象

另-种关闭生产者一消费者服务的方式就是使用“毒丸(Poison Pill)”对象:“毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止。”在FIFO(先进先出)队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。

给出一个单生产者-单消费者的桌面搜索示例(来自程序清单5-8),在这个示例中使用了“毒丸”对象来关闭服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//单生产者-单消费者的桌面搜索
public class Indexingservice {
private static final File POISON = new File ("") ;
private final IndexerThread consumer = new IndexerThread( ) ;
private final CrawlerThread producer = new CrawlerThread ( ) ;
private final BlockingQueue<File> queue;
private final FileFilter fileFilter ;
private final File root ;
//生产者线程
public class CrawlerThread extends Thread {
public void run () {
try {
crawl (root);
} catch (InterruptedException e){ /*发生异常*/}
finally {
while ( true) {
try {
//重点
queue.put (POISON);
break ;
} catch ( InterruptedException e1) {/*重新尝试*/ }
}
}
}

private void crawl(File root) throws InterruptedException {
}
}

//消费者线程
public class IndexerThread extends Thread {
public void run ( ) {
try {
while (true){
File file = queue.take ();
//重点
if(fle == POISON)
break ;
else
indexFile(file) ;
}
} catch (InterruptedException consumed){ }
}
}

public void start () {
producer.start () ;
consumer.start () ;
}

public void stop () { producer.interrupt( ) ; }

public void awaitTermination ( ) throws InterruptedException {
consumer.join ();
}
}

只有在生产者和消费者的数量都已知的情况下,才可以使用“毒丸”对象。

示例:只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor来简化服务的生命周期管理,其中该Executor 的生命周期是由这个方法来控制的。(在这种情况下,invokeAll和invokeAny等方法通常会起较大的作用。)

shutdownNow的局限性

当通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。
要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需要知道当Executor 关闭时哪些任务正在执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//通过封装ExecutorService并使得execute(类似地还有submit,在这里没有给出)记录哪些任务是在关闭后取消的,TrackingExecutor可以找出哪些任务已经开始但还没有正常完成。在Executor结束后,getCancelledTasks返回被取消的任务清单。要使这项技术能发挥作用,任务在返回时必须维持线程的中断状态,在所有设计良好的任务中都会实现这个功能。
public class TrackingExecutor extends AbstractExecutorService {
private final Executorservice exec ;
private final set<Runnable> tasksCancelledAtshutdown =
Collections.synchronizedset (new HashSet<Runnable> () ) ;
public List<Runnable> getCancelledTasks ( ) {
if ( !exec.isTerminated ( ))
throw new IllegalstateException (...) ;
return new ArrayList<Runnable> (tasksCancelledAtshutdown) ;
}
public void execute (final Runnable runnable) {
exec.execute (new Runnable () {
public void run ( ) {
try {
runnable.run ( ) ;
} finally {
if ( isshutdown ( )
&& Thread.currentThread ( ).isInterrupted ())
tasksCancelledAtshutdown.add (runnable) ;
}
}
}) ;
}
//将Executorservice的其他方法委托给exec
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//WebCrawler中给出了TrackingExecutor的用法。网页爬虫程序的工作通常是无穷尽的,因此当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新房动。CrawlTask提供了一个getPage方法,该方法能找出正在处理的页面。当爬虫程序关闭时无论是还没有开始的任务,还是那些被取消的任务,都将记录它们的URL,因此当爬虫程序重新启动时,就可以将这些URL的页面抓取任务加入到任务队列中。
public abstract class webCrawler {
private volatile TrackingExecutor exec ;
@GuardedBy ( "this" )
private final set<URL> urlsToCrawl = new Hashset<URL> ( ) ;
...
public synchronized void start ( ) {
exec = new TrackingExecutor (Executors.newCachedThreadPool ());
//为每一个url提交任务
for (URL url : urlsToCrawl) submitcrawlTask (url) ;
urlsToCrawl.clear () ;
}
//停止
public synchronized void stop() throws InterruptedException {
try {
saveUncrawled ( exec. shutdownNow () ;
if ( exec.awaitTermination (TIMEOUT,UNIT))
saveUncrawled (exec.getcancelledTasks ( ));
} finally {
exec = null;
}
}
protected abstract List<URL>processPage (URL url) ;
//保存没有爬取的页面
private void saveUncrawled (List<Runnable> uncrawled){
for (Runnable task : uncrawled)
urlsToCrawl.add (((crawlTask) task).getPage()) ;
}
//开始爬取
private void submitcrawlTask (URL u){
exec.execute (new crawlTask(u));
}

private class CrawlTask implements Runnable {
private final URL url;
...
public void run () {
for (URL link : processPage (url)){
if (Thread.currentThread().isInterrupted ())
return ;
submitCrawlTask (link) ;
}
}
public URLgetPage () { return url ; }
}
}

在TrackingExecutor中存在一个不可避免的竞态条件,从而产生“误报”问题:一些被认为已取消的任务实际上已经执行完成。这个问题的原因在于,在任务执行最后一条指令以及线程池将任务记录为“结束”的两个时刻之间,线程池可能被关闭。如果任务是幂等的(Idempotent,即将任务执行两次与执行一次会得到相同的结果),那么这不会存在问题,在网页爬虫程序中就是这种情况。否则,在应用程序中必须考虑这种风险,并对“误报”问题做好准备。

7.3 处理非正常的线程终止

如果并发程序中的某个线程发生故障,在控制台中可能会输出栈追踪信息,但没有人会观察控制台。此外,当线程发生故障时,应用程序可能看起来仍然在工作,所以这个失败很可能会被忽略。幸运的是,我们有可以监测并防止在程序中“遗漏”线程的方法。
导致线程提前死亡的最主要原因就是RuntimeException。由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们通常不会被捕获。它们不会在调用栈中逐层传递,而是默认地在控制台中输出栈追踪信息,并终止线程。
线程非正常退出的后果可能是良性的,也可能是恶性的,这要取决于线程在应用程序中的作用。

1
2
3
4
5
6
7
8
9
10
11
12
//在线程池内部构建一个工作者线程。如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。然后,框架可能会用新的线程来代替这个工作线程,也可能不会,因为线程池正在关闭,或者当前已有足够多的线程能满足需要。ThreadPoolExecutor和 Swing都通过这项技术来确保行为糟糕的任务不会影响到后续任务的执行。当编写一个向线程池提交任务的工作者线程类时,或者调用不可信的外部代码时(例如动态加载的插件),使用这些方法中的某一种可以避免某个编写得糟糕的任务或插件不会影响调用它的整个线程。
public void run {
Throwable thrown = null ;
try {
while ( !isInterrupted ())
runTask (getTaskFromworkQueue () ) ;
} catch (Throwable e){
thrown = e ;
}finally {
threadExited (this, thrown) ;
}
}