第十三章、显示锁

13.1 Lock 和ReentrantLock

lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作。
为什么要创建一种与内置锁如此相似的新加锁机制?在大多数情况下,内置锁都能很好地工作,但在功能上存在一些局限性,例如,无法中断一个正在等待获取锁的线程,或者无法在请求获取一个锁时无限地等待下去。内置锁必须在获取该锁的代码块中释放,这就简化了编码工作,并且与异常处理操作实现了很好的交互,但却无法实现非阻塞结构的加锁规则。这些都是使用synchronized的原因,但在某些情况下,一种更灵活的加锁机制通常能提供更好的活跃性或性能。

1
2
3
4
5
6
7
8
9
Lock lock = new ReentrantLock () ;
...
lock.lock ( ) ;
try {
//更新对象状态
//捕获异常,并在必要时恢复不变性条件
}finally {
lock.unlock ( ) ;

轮询锁与定时锁

由tryLock实现

如果不能获得所有需要的锁,那么可以使用可定时的或可轮询的锁获取方式,从而使你重新获得控制权,它会释放已经获得的销.然后重新尝试获取所有锁。

在实现具有时间限制的操作时,定锁同样非常有用。当在带有时同限的操作中调用了一个阻塞方法时,它能根据剩余时间来提供一个时限。如果操作不能在指定的时间内给出结果,那么就会使程序提前结束。当使用内置锁时,在开始请求锁后,这个操作将无法取消,因此内置锁很难实现带有时间限制的操作。

可中断的锁获取操作

正如定时的锁获取操作能在带有时间限制的操作中使用独占锁,可中断的锁获取操作同样能在可取消的操作中使用加锁。lockInterruptibly方法能够在获得锁的同时保持对中断的响应,并且由于它包含在Lock中,因此无须创建其他类型的不可中断阻塞机制。
可中断的锁获取操作的标准结构比普通的锁获取操作略微复杂一些,因为需要两个try块。(如果在可中断的锁获取操作中抛出了InterruptedException,那么可以使用标准的try.finally加锁模式。)

13.3 公平性

在ReentrantLock的构造函数中提供了两种公平性选择:创建一个非公平的锁(默认)或者一个公平的锁。在公平的锁上,线程将按照它们发出请求的顺序来获得锁,但在非公平的锁上,则允许“插队”:当一个线程请求非公平的锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过队列中所有的等待线程并获得这个锁。

我们为什么不希望所有的锁都是公平的?毕竟,公平是一种好的行为,而不公平则是一种不好的行为,对不对?当执行加锁操作时,公平性将由于在挂起线程和恢复线程时存在的开销而极大地降低性能。在大多数情况下,非公平锁的性能要高于公平锁的性能。

13.5 锁的选择

在一些内置锁无法满足需求的情况下,ReentrantLock 可以作为一种高级工具。当需要一些高级功能时才应该使用ReentrantLock,这些功能包括:可定时的、可轮询的与可中断为锁荻取操作,公平队列,以及非块结构的锁。否则,.还是应该优先使用synchronized。

13.5 读写锁

ReentrantLock实现了一种标准的互斥锁:每次最多只有一个线程能持有ReentrantLock。但对于维护数据的完整性来说,互斥通常是一种过于强硬的加锁规则,因此也就不必要地限制了并发性。互斥是一种保守的加锁策略,虽然可以避免“写/写”冲突和“写/读”冲突,但同样也避免了“读/读”冲突。在许多情况下,数据结构上的操作都是“读操作”—–虽然它们也是可变的并且在某些情况下被修改,但其中大多数访问操作都是读操作。此时,如果能够放宽加锁需求,允许多个执行读操作的线程同时访问数据结构,那么将提升程序的性能。只要每个线程都能确保读取到最新的数据,并且在读取数据时不会有其他的线程修改数据,那么就不会发生问题。在这种情况下就可以使用读/写锁:一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时进行。
ReadWriteLock 中暴露了两个Lock对象,其中一个用于读操作,而另一个用于写操作。要读取由ReadWriteLock保护的数据,必须首先获得读取锁,当需要修改ReadWriteLock保护的数据时,必须首先获得写入锁。尽管这两个锁看上去是彼此独立的,但读取锁和写入锁只是读―写锁对象的不同视图。

读―写锁是一种性能优化措施,在一些特定的情况下能实现更高的并发性。在实际情况中,对于在多处理器系统上被频繁读取的数据结构,读-写锁能够提高性能。而在其他情况下,读一写锁的性能比独占锁的性能要略差一些,这是因为它们的复杂性更高。
在读取锁和写入锁之间的交互可以采用多种实现方式。ReadWriteLock 中的一些可选实现包括:

  • 释放优先。当一个写入操作释放写入锁时,并且队列中同时存在读线程和写线程,那么应该优先选择读线程,写线程,还是最先发出请求的线程?

  • 读线程插队。如果锁是由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但却可能造成写线程发生饥饿问题。

  • 重入性。读取锁和写入锁是否是可重入的?

  • 降级。如果一个线程持有写人锁,那么它能否在不释放该锁的情况下获得读取锁?这可能会使得写入锁被“降级”为读取锁,同时不允许其他写线程修改被保护的资源。

  • 升级。读取锁能否优先于其他正在等待的读线程和写线程而升级为一个写入锁?在大多数的读一写锁实现中并不支持升级,因为如果没有显式的升级操作,那么很容易造成死锁。(如果两个读线程试图同时升级为写入锁,那么二者都不会释放读取锁。)

十四、构建自定义同步工具

14.1 状态依赖性的管理

在生产者一消费者的设计中经常会使用像ArrayBlockingQueue这样的有界缓存。在有界缓存提供的put和 take操作中都包含有一个前提条件﹔不能从空缓存中获取元素,也不能将元素放入已满的缓存中。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态(使其成为调用者的一个问题),也可以保持阻塞直到对象进入正确的状态。
接下来介绍有界缓存的几种实现,其中将采用不同的方法来处理前提条件失败的问题。在每种实现中都扩展了BaseBoundedBuffer,在这个类中实现了一个基于数组的循环缓存,其中各个缓存状态变量(buf、head、tail和count)均由缓存的内置锁来保护。它还提供了同步的doPut和doTake方法,并在子类中通过这些方法来实现put和take操作,底层的状态将对子类隐藏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Threadsafe
public abstract class BaseBoundedBuffere<V> {
@GuardedBy ( "this") private final v[] buf ;
@GuardedBy ( " this" )private int tail;
@GuardedBy ( "this" ) private int head ;
@GuardedBy ( "this")private int count ;

protected BaseBoundedBuffer (int capacity){
this.buf = (V[]) new Object [capacity] ;
}

protected synchronized final void doPut (V v) {
buf [tail] = v;
if (++tail == buf.length)
tail = 0 ;
++count ;
}

protected synchronized final V doTake ( ){
V v = buf [head] ;
buf [head] = null;
if(++head == buf.length)
head = 0;
--count ;
return v ;
}
public synchronized final boolean isFull(){
return count == buf.length;
}
public synchronized fina1 boolean isEmpty() {
return count == 0;
}
}

示例:将前提条件的失败传递给调用者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//简单有界缓存。put和take都进行了同步以确保对缓存状态的独占访问,线程检查后运行
@Threadsafe
public class GrumpyBoundedBuffer<v> extends BaseBoundedBuffer<v> {
public GrumpyBoundedBuffer(int size) { super(size); }
public synchronized void put (V v) throws BufferFul1Exception {
if(isFull()))
throw new BufferFu1lException() ;doput (v) ;
}
public aynchronized V take ( ) throws BufferEmptyException {
if(isEmpty( ))
throw new BufferEmptyException () ;
return doTake ( ) ;
}
}

尽管这种方法实现起来很简单,但使用起来却并非如此。异常应该用于发生异常条件的情况中。“缓存已满”并不是有界缓存的一个异常条件。在实现缓存时得到的简化(使调用者管理状态依赖性)并不能抵消在使用时存在的复杂性,因为现在调用者必须做好捕获异常的准备,并且在每次缓存操作时都需要重试。程序清单14-4给出了对take的调用—并不是很漂亮,尤其是当程序中有许多地方都调用put和 take方法时。

示例:通过轮询与休眠来实现简单的阻塞

SleepyBoundedBuffer尝试通过put和 take方法来实现一种简单的“轮询与休眠”重试机制,从而使调用者无须在每次调用时都实现重试逻辑。如果缓存为空,那么take将休眠并直到另一个线程在缓存中放入一些数据:如果缓存是满的,那么put将休眠并直到另一个线程从缓存中移除一些数据,以便有空间容纳新的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Threadsafe
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<v> {
public SleepyBoundedBuffer(int size) { super(size); }
public void put (V v) throws InterruptedException {
while (true){
synchronized (this) {
if (!isFull()){
doPut (v);
return;
}
}
Thread.sleep (SLEEP_GRANULARITY);
}
}
public V take ( ) throws InterruptedException {
while (true){
synchronized (this) {
if ( lisEmpty ())
return doTake ( ) ;
}
Thread.sleep ( SLEEP_GRANUI.ARITY);
}
}
}

SleepyBoundedBuffer的实现远比之前的实现复杂。缓存代码必须在持有缓存锁的时候才能测试相应的状态条件,因为表示状态条件的变量是由缓存锁保护的。如果测试失败,那么当前执行的线程将首先释放锁并休眠一段时间,从而使其他线程能够访问缓存。当线程醒来时,它将重新请求锁并再次尝试执行操作,因而线程将反复地在休眠以及测试状态条件等过程之间进行切换,直到可以执行操作为止。
从调用者的角度看,这种方法能很好地运行,如果某个操作可以执行,那么就立即执行,否则就阻塞,调用者无须处理失败和重试。要选择合适的休眠时间间隔,就需要在响应性与CPU使用率之间进行权衡。休眠的间隔越小,响应性就越高,但消耗的CPU资源也越高。

休眠间隔对响应性的影响:在缓存中出现可用空间的时刻与线程醒来并再次检查的时刻之间可能存在延迟。线程刚进入休眠,条件立即为真,存在不必要的休眠时间。

条件队列

“条件队列”这个名字来源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。传统队列的元素是一个个数据, 而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。

正如每个Java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法就构成了内部条件队列的API。对象的内置锁与其内部条件队列是相互关联的,要调用对象X中条件队列的任何一个方法,必须持有对象X上的锁。这是因为“等待由状态构成的条件”与“维护状态一致性”这两种机制必须被紧密地绑定在一起:只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。
Object.wait 会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,它将在返回之前重新获取锁。从直观上来理解,调用wait意味着“我要去休息了,但当发生特定的事情时唤醒我”,而调用通知方法就意味着“特定的事情发生了”。·

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//BoundedBuffer中使用了wait和notifyAll来实现一个有界缓存。这比使用“休眠”的有界缓存更简单,并且更高效(当缓存状态没有发生变化时,线程醒来的次数将更少),响应性也更高(当发生特定状态变化时将立即醒来)。
@Threadsafe
public class BoundedBuffer<v> extends BaseBoundedBuffer<v> {
//条件谓词: not -full ( !isFul1())
//条件谓词: not -empty ( !isEmpty ( ))
public BoundedBuffer(int size){ super (size) ; }
//阻塞并直到:not-full
public synchronized void put (V v) throws InterruptedBxception {
while (isFull())
wait();
doPut(v);
notifyAll();
}
//阻塞并直到: not-empty
public synchronized v take ( ) throws InterruptedException {
while ( isEmpty (0) )
wait ();
V v = doTake();
notifyA11 ();
return v;
}
}

要注意﹔与使用“休眠”的有界缓存相比,条件队列并没有改变原来的语义。它只是在多个方面进行了优化:CPU效率、上下文切换开销和响应性等。如果某个功能无法通过“轮询和休眠”来实现,那么使用条件队列也无法实现,但条件队列使得在表达和管理状态依赖性时更加简单和高效

14.2 使用条件队列

条件队列使构建高效以及高可响应性的状态依赖类变得更容易,但同时也很容易被不正确地使用。虽然许多规则都能确保正确地使用条件队列,但在编译器或系统平台上却并没有强制要求遵循这些规则。(这也是为什么要尽量基于LinkeBlockingQueue、Latch、Semaphore和FutureTask等类来构造程序的原因之一,如果能避免使用条件队列,那么实现起来将容易许多。)

条件谓词

条件谓词是使某个操作成为状态依赖操作的前提条件。在有界缓存中,只有当缓存不为空时,take方法才能执行,否则必须等待。对take方法来说,它的条件谓词就是“缓存不为空”,take方法在执行之前必须首先测试该条件谓词。同样,put方法的条件谓词是“缓存不满”。条件谓词是由类中各个状态变量构成的表达式。BaseBoundedBuffer在测试“缓存不为空”时将把count与0进行比较,在测试“缓存不满”时将把count与缓存的大小进行比较。

过早唤醒

当执行控制重新进入调用wait的代码时,它已经重新获取了与条件队列相关联的锁。现在条件谓词是不是已经变为真了或许。在发出通知的线程调用notifyAll时,条件谓词可能已经变成真,但在重新获取锁时将再次变为假。在线程被唤醒到wait重新获取锁的这段时间里,可能有其他线程已经获取了这个锁,并修改了对象的状志。或者,条件谓词从调用wait起根本就没有变成真。你并不知道另一个线程为什么调用notify或notifyAll,也许是因为与同一条件队列相关的另一个条件谓词变成了真。“一个条件队列与多个条件谓词相关”是一种很常见的情况——在BoundedBuffer中使用的条件队列与“非满”和“非空”两个条件谓词相关。
基于所有这些原因,每当线程从wait中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用wait,并在每次迭代中都测试条件谓词。

1
2
3
4
5
6
7
8
void stateDependentMethod( ) throws InterruptedException{}
//必须通过一个锁来保护条件谓词
synchronized ( lock) {
while ( !conditionPredicate ( ))
lock . wait () ;.
//现在对象处于合适的状态
}
}

丢失的信号

丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。现在,线程将等待一个已经发过的事件。这就好比在启动了烤面包机后出去拿报纸,当你还在屋外时烤面包机的铃声响了,但你没有听到,因此还会坐在厨房的桌子前等着烤面包机的铃声。你可能会等待很长的时间。通知并不像你涂在面包上的果酱,它没有“黏附性”。如果线程A通知了一个条件队列,而线程B随后在这个条件队列上等待,那么线程B将不会立即醒来,而是需要另一个通知来唤醒它。像上述程序清单中警示之类的编码错误(例如,没有在调用wait之前检测条件谓词)就会导致信号的丢失。

通知

在有界缓存中,如果缓存为空,那么在调用take时将阻塞。在缓存变为非空时,为了使take解除阻塞,必须确保在每条使缓存变为非空的代码路径中都发出一个通知。在BoundedBuffer中,只有一条代码路径,即在put方法之后。因此,put在成功地将一个元素添加到缓存后,将调用notifyAll。同样,take在移除一个元素后也将调用notifyAll,向任何正在等待“不为满”条件的线程发出通知:缓存已经不满了。

在BoundedBuffer的put和take方法中采用的通知机制是保守的:每当将一个对象放入缓存或者从缓存中移走一个对象时,就执行一次通知。我们可以对其进行优化:首先,仅当缓存从空变为非空,或者从满转为非满时,才需要释放一个线程。并且,仅当put或take影响到这些状态转换时,才发出通知。这也被称为“条件通知(Conditional Notification)。虽然“条件通知”可以提升性能,但却很难正确地实现(而且还会使子类的实现变得复杂),因此在使用时应该谨慎。

1
2
3
4
5
6
7
8
9
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait() ;
boolean wasEmpty = isEmpty();
doPut (v) ;
if (wasEmpty)
notifyAll();
}

示例:阀门类

ThreadGate可以打开和关闭阀门,并提供一个await方法,该方法能一直阻塞直到阀门被打开。在open方法中使用了notifyAll,这是因为这个类的语义不满足单次通知的“单进单出”测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Threadsafe
public class ThreadGate {
//条件谓词:opened-since (n) (isopen [l generation>n)
@GuardedBy ( "this" ) private boolean isopen;
@GuardedBy ( "this" ) private int generation ;
public synchronized void close () {
isopen = false ;
}
public synchronized void open() {
++generation ;
isopen = true;
notifyAll();
}
//阻塞并直到: opened-since (generation on entry)
public synchronized void await () throws InterruptedException {
int arrivalGeneration = generation ;
while ( !isopen && arrivalGeneration == generation)
wait () ;
}
}

在await 中使用的条件谓词比测试isOpen复杂得多。这种条件谓词是必需的,因为如果当阀门打开时有N个线程正在等待它,那么这些线程都应该被允许执行。然而,如果阀门在打开后又非常快速地关闭了,并且await方法只检查isOpen,那么所有线程都可能无法释放﹔当所有线程收到通知时,将重新请求锁并退出wait,而此时的阀门可能已经再次关闭了。因此,在ThreadGate中使用了一个更复杂的条件谓词:每次阀门关闭时,.递增一个“Generation”计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过await。
由于ThreadGate 只支持等待打开阀门,因此它只在open 中执行通知。要想既支持“等待打开”又支持“等待关闭”,那么ThreadGate必须在open和close中都进行通知。这很好地说明了为什么在维护状态依赖的类时是非常困难的—当增加一个新的状态依赖操作时,可能需要对多条修改对象的代码路径进行改动,才能正确地执行通知。

14.3 显示的Condition对象

当内置锁过于灵活时,可以使用显示锁。Lock是一种广义的内置锁,Condition也是一种广义的内置条件队列。

使用显示的Lock和Condition,一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个Condition,可以在相关联的Lock 上调用Lock.newCondition方法。正如Lock 比内置加锁提供了更为丰富的功能,Condition同样比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作

与内置条件队列不同的是,对于每个Lock,可以有任意数量的Condition对象。Condition对象继承了相关的Lock对象的公平性,对于公平的锁,线程会依照FIFO顺序从Condition.await中释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//给出了有界缓存的另一种实现,即使用两个Condition,分别为notFull和notEmpty,用于表示“非满”与“非空”两个条件谓词。当缓存为空时,take将阻塞并等待notEmpty,此时put向notEmpty发送信号,可以解除任何在take中阻塞的线程。
@Threadsafe
public class ConditionBoundedBuffer<T>{
protected final Lock lock = new ReentrantLock () ;
//条件谓词:notFull (count < items . length)
private final condition notFull= lock.newCondition ( ) ;
//条件谓词:notEmpty (count > 0 )
private final condition notEmpty= lock.newCondition( ) ;
@GuardedBy ( " lock")
private final T[] items = (T[]) new 0bject [BUFFER_SIZB];
@GuardedBy ( " lock") private int tail, head,count ;
//阻塞并直到: notFull
public void put (T x) throws InterruptedException {
lock.lock ( ) ;
try {
while (count -= items.length)
notFull.await ();
items [tail] = x ;
if (++tail == items.length)
tail = 0 ;
++count ;
notEmpty.signal ( );}
finally {
lock .unlock ( ) ;
}
}
//阻塞并直到:notEmpty
public T take ( ) throws IrterruptedException {
lock .lock ( ) ;
try {
while (count == 0)
notEmpty.await ( );
T x = items [head] ;
items [head] = null;
if ( ++head == items. length)
head = 0 ;
--count ;
notFull.signal ();
return x;
finally {
lock .unlock ( ) ;
}
}

14.4 Synchronizer

在ReentrantLock和 Semaphore这两个接口之间存在许多共同点。这两个类都可以用做一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock 或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回“假”,表示在指定的时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作

事实上,它们在实现时都使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReentrantLock和Semaphore是基于AQS构建的,还包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。

14.5 AbstractQueuedSynchronizer

大部分开发者都不会直接使用AQS。在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取”操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。在使用CountDownLatch时,“获取”操作意味着“等待并直到闭锁到达结束状态”,而在使用FutureTask时,则意味着“等待并直到任务已经完成”。“释放”并不是一个可阻塞的操作,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行。
如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState以及compareAndSetState等protected类型方法来进行操作。这个整数可以用于表示任意状态。例如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。在同步器类中还可以自行管理一些额外的状态变量,例如,ReentrantLock 保存了锁的当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的。
AQS中的获取操作与释放操作的形式。根据同步器的不同,获取操作可以是一种独占操作(例如ReentrantLock),也可以是一个非独占操作(例如Semaphore利CountDownLatch)。一个获取操作包括两部分。首先,同步器判断当前状态是否允许获得操作如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是由同步器的语义决定的例如,对于锁来说,如果它没有被某个线程持有,那么就能被成功地获取,而对于闭锁来说如果它处于结束状态,那么也能被成功地获取。

其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响。例如,当获取一个锁后,锁的状态将从“未被持有”变成“已被持有”,而从Semaphore中获取一个许可后,将把剩余许可的数量减1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它,因此获取闭锁的操作不会改变闭锁的状态。

14.6 java.util.concurrent同步器中的AQS

ReetntrantLock

ReentrantLock 只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively。ReentrantLock将同步状态用于保存锁获取操作的次数,并且还维护一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。在tryRelease 中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁:在tryAcquire中将使用这个域来区分获取操作是重入的还是竞争的。

当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态。如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此tryAcquire使用compareAndSetState来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。如果锁状态表明它已经被持有,并且如果当前线程是锁的拥有者,那么获取计数会递增,如果当前线程不是锁的拥有者,那么获取操作将失败。
ReentrantLock还利用了AQS对多个条件变量和多个等待线程集的内置支持。Lock.newCondition将返回一个新的ConditionObject实例,这是AQS的一个内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryAcquire(int ignored) {
final Thread current = Thread.currentThread ( ) ;
int c = getstate ( ) ;
if (c == 0){
if(compareAndsetstate(0,1)) {
owner = current ;
return true ;
}
}else if (current == owner) {
Setstate (c+1) ;
return true ;
}
return false ;
}

Semaphore 与CountDownLatch

Semaphore将AQS的同步状态用于保存当前可用许可的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//tryAcquireShared方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子方式来降低许可的计数。如果这个操作成功(这意味着许可的计数自从上一次读取后就没有被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。
protected int tryAcquireshared (int acquires){
while(true){
int available = getstate () ;
int remaining = available - acquires ;
if (remaining < 0
|| compareAndsetstate (available, remaining))
return remaining ;
}
}
protected boolean tryReleaseshared(int releases) {
while (true){
int p = getstate ( ) ;
if (compareAndsetstate(p, p + releases))
return true;
}
}
//当没有足够的许可,或者当tryAcquireShared可以通过原子方式来更新许可的计数以响应获取操作时,while循环将终止。虽然对compareAndSetState的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中有一个会变为真。同样,tryReleaseShared将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。tryReleaseShared的返回值表示在这次释放操作中解除了其他线程的阻塞。

CountDownLatch使用AQS的方式与Semaphore很相似:在同步状态中保存的是当前的计数值。countDown方法调用release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await 调用acquire,当计数器为零时,acquire将立即返回,否则将阻塞。

Future Task

初看上去,FutureTask甚至不像一个同步器,但Future.get的语义非常类似于闭锁的语义——如果发生了某个事件(由FutureTask表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
在FutureTask 中,AQS同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。FutureTask 还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。

ReentrantReadWriteLock

ReadWriteLock 接口表示存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock 中,单个AQS子类将同时管理读取加锁和写入加锁。ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。
AQS在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。

十五、原子变量与非阻塞同步机制

15.1 锁定劣势

早锁的挂起和恢复过程中存在很大开销,并且通常存在长时间中断。与锁相比,volatile变量是一种更轻量级的同步操作,在使用这些变量时不会发生上下文切换或线程调度。然而,volatile变量同样存在一些局限:虽然它们提供了相似的可见性保证,但不能用于构建原子的复合操作。因此,当一个变量依赖其他的变量时,或者当变量的新值依赖于旧值时,就不能使用volatile变量。这些都限制了volatile变量的使用,因此它们不能用来实现一些常见的工具,例如计数器或互斥体(mutex)。
例如,虽然自增操作(++i)看起来像一个原子操作,但事实上它包含了3个独立的操作——获取变量的当前值,将这个值加1,然后再写入新值。为了确保更新操作不被丢失,整个的读一改–写操作必须是原子的。到目前为止,我们实现这种原子操作的唯一方式就是使用锁定方式。

15.2 硬件对并发的支持

对于细粒度的操作,还有另外一种更高效的方法,也是一种乐观的方法,通过这种方法可以在不发生干扰的情况下完成更新操作。这种方法需要借助冲突检查机制来判断在更新过程中是否存在来自其他线程的干扰,如果存在,这个操作将失败,并且可以重试(也可以不重试)。

现在几乎所有的处理器都包含了某种形式的原子读-改-写指令。

比较并交换

在CAS包含了3个操作数—需要读写的内存位置V、进行比较的值A和拟写入的新值B。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。(这种变化形式被称为比较并设置,无论操作是否成功都会返回。)CAS的含义是:“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”。CAS是一项乐观的技术,它希望能成功地执行更新操作,并且如果有另一个线程在最近一次检查后更新了该变量,那么CAS能检测到这个错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//SimulatedCAS 说明了CAS语义
@Threadsafe
public class simulatedcAs {
@GuardedPy ( "this" ) private int value;
public synchronized int get() { return value; }

public synchronized int compareAndSwap(int expectedvalue,
int newvalue) {
int oldvalue = value;
if (oldvalue == expectedvalue)value = newvalue;
return oldva1ue;
}
public synchronized boolean compareAndSet(int expectedvalue,
int newValue){
return (expectedvalue
== compareAndswap (expectedvalue,newvalue) ) ;
}
}

当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。然而,失败的线程并不会被挂起(这与获取锁的情况不同:当获取锁失败时,线程将被挂起),而是被告知在这次竞争中失败,并可以再次尝试。由于一个线程在竞争CAS时失败不会阻塞,因此它可以决定是否重新尝试,或者执行一些恢复操作,也或者不执行任何操作。
CAS的典型使用模式是:首先从V中读取值A,并根据A计算新值B,然后再通过CAS以原子方式将V中的值由A变成B。由于CAS能检测到来自其他线程的干扰,因此即使不使用锁也能够实现原子的读一改–写操作序列。

非阻塞计数器

CasCounter使用CAS实现了一个线程安全的计数器。递增操作采用了标准形式一一读取旧的值,根据它计算出新值(加1),并使用CAS来设置这个新值。如果CAS失败,那么该操作将立即重试。通常,反复地重试是一种合理的策略,但在一些竞争很激烈的情况下,更好的方式是在重试之前首先等待一段时间或者回退,从而避免造成活锁问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Threadsafe
public class cascounter {
private simulatedcAs value ;
public int getvalue () {
return value.get ( ) ;
}
public int increment () {
int v;
do {
v= value.get ( );
}
while (v != value.compareAndswap(v, v +1));
return v + 1;
}
}

初看起来,基于CAS的计数器似乎比基于锁的计数器在性能上更差一些,因为它需要执行更多的操作和更复杂的控制流,并且还依赖看似复杂的CAS操作。但实际上,当竞争程度不高时,基于CAS的计数器在性能上远远超过了基于锁的计数器,而在没有竞争时甚至更高。如果要快速获取无竞争的锁,那么至少需要一次CAS操作再加上与其他锁相关的操作,因此基于锁的计数器即使在最好的情况下也会比基于CAS的计数器在一般情况下能执行更多的操作。由于CAS在大多数情况下都能成功执行(假设竞争程度不高),因此硬件能够正确地预测while循环中的分支,从而把复杂控制逻辑的开销降至最低。
虽然Java语言的锁定语法比较简洁,但JVM和操作在管理锁时需要完成的工作却并不简单。在实现锁定时需要遍历JVM中一条非常复杂的代码路径,并可能导致操作系统级的锁定线程挂起以及上下文切换等操作。在最好的情况下,在锁定时至少需要一次CAS,因此虽然在使用锁时没有用到CAS,但实际上也无法节约任何执行开销。另一方面,在程序内部执行CAS时不需要执行JVM代码、系统调用或线程调度操作。在应用级上看起来越长的代码路径,如果加上JVM和操作系统中的代码调用,那么事实上却变得更短。CAS的主要缺点是,它将使调用者处理竞争问题(通过重试、回退、放弃),而在锁中能自动处理竞争问题(线程在获得锁之前将一直阻塞)。

JVM对 CAS的支持

那么,Java代码如何确保处理器执行CAS操作?在Java 5.0之前,如果不编写明确的代码,那么就无法执行CAS。在Java 5.0中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS操作,并且.JVM把它们编译为底层硬件提供的最有效方法。在原子变量类(例如java.util.concurrent.atomic中的AtomicXxx)中使用了这些底层的JVM支持为数字类型和引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时则直接或间接地使用了这些原子变量类。

15.3 原子变量类

原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读–改–写操作。AtomicInteger表示一个int类型的值,并提供了get和set方法,这些Volatile类型的int变量在读取和写入上有着相同的内存语义。它还提供了一个原子的compareAndSet方法.(如果该方法成功执行,那么将实现与读取/写入一个volatile变量相同的内存效果),以及原子的添加、递增和递减等方法。AtomicInteger 表面上非常像一个扩展的Counter类,但在发生竞争的情况下能提供更高的可伸缩性,因为它直接利用了硬件对并发的支持。

共有12个原子变量类,可分为4组:标量类(Scalar)、更新器类、数组类以及复合变量类。最常用的原子变量就是标量类:AtomicInteger、AtomicLong、AtomicBoolean 以及AtomicReference。所有这些类都支持CAS,此外,AtornicInteger和 AtomicLong还支持算术运算。(要想模拟其他基本类型的原子变量,可以将short或byte等类型与int类型进行转换,以及使用floatToIntBits或doubleToLongBits来转换浮点数。)

原子数组类(只支持Integer、Long和Reference版本)中的元素可以实现原子更新。原子数组类为数组的元素提供了volatile类型的访问语义,这是普通数组所不具备的特性——volatile类型的数组仅在数组引用上具有volatile语义,而在其元素上则没有。

尽管原子的标量类扩展了Number类,但并没有扩展一些基本类型的包装类,例如Integer或Long。事实上,它们也不能进行扩展:基本类型的包装类是不可修改的,而原子变量类是可修改的。在原子变量类中同样没有重新定义hashCode或equals方法,每个实例都是不同的。与其他可变对象相同,它们也不宜用做基于散列的容器中的键值。

原子变量是一种更好的volatile

可以将OneValueCache中的技术与原子引用结合起来,并且通过对指向不可变对象(其中保存了下界和上界)的引用进行原子更新以避免竟态条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//使用了AtomicReference和 IntPair来保存状态,并通过使用compare-AndSet,使它在更新上界或下界时能避免NumberRange的竞态条件。
public class CasNumberRange {
@Immutable
private static class IntPair {
final int lower;//不变性条件: lower <=upper
final int upper ;
...
}
//对对象进行原子操作
private final AtomicReference<IntPair> values = new AtomicReference<IntPair> (new IntPair(0,0) ) ;

public int getLower () { return values.get ( ).lower; }
public int getUpper () { return values.get ( ).upper; }
public void setLower(int i){
while (true) {
IntPair oldv = values.get ( ) ;
if (i > oldv.upper)·
throw new IllegalArgumentException (
" can't set lower to " + i +" > upper" ) ;
IntPair newv = new IntPair (i, oldv. upper) ;
if (values.compareAndset(oldv, newv) )
return;
}
}
//对setupper采用类似的方法
)

十六、Java内存模型

16.1 什么是内存模型,为什么需要它

JMM规定了JVM必须遵循一组最小保证,这组保证规定了对变量的写入操作在何时将对于其他线程可见。JMM在设计时就在可预测性和程序的易于开发性之间进行了权衡,从而在各种主流的处理器体系架构上能实现高性能的JVM。

重排序

JMM还使得不同线程看到的操作执行顺序是不同的,从而导致在缺乏同步的情况下,要推断操作的执行顺序将变得更加复杂。各种使操作延迟或者看似乱序执行的不同原因,都可以归为重排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
}//在没有正确同步的情况下,即使要推断最简单的并发程序的行为也很困难。很容易想象PossibleReordering是如何输出(1,0)或(0,1)或(1,1)的:线程A可以在线程B开始之前就执行完成,线程B也可以在线程A开始之前执行完成,或者二者的操作交替执行。但奇怪的是,PossibleReordering还可以输出(0,0)。由于每个线程中的各个操作之间不存在数据流依赖性,因此这些操作可以乱序执行。(即使这些操作按照顺序执行,但在将缓存刷新到主内存的不同时序中也可能出现这种情况,从线程B的角度看,线程A中的赋值操作可能以相反的次序执行。) 
//x = b -> b = 1 -> y = a -> a = 1
public class PossibleReordering {
static int x = 0, y =0 ;
static int a = 0, b = 0 ;
public static void main (string []args) throws InterruptedException {
Thread one = new Thread(new Runnable (){
public void run () {
a = 1;
x = b;
}
}) ;
Thread other new Thread (new Runnable ( ){
public void run () {
b = 1;
y = a;
}
}) ;
one.start () ; other.start ( ) ;
one .join() ;other.join ( ) ;
system.out.println ( "("+ x +","+ y + ")") ;
}
}

Java内存模型简介

Java内存模型是通过各种操作来定义的,包括对变量的读/写操作,监视器的加锁和释放操作,以及线程的启动和合并操作。JMM为程序中所有的操作定义了一个偏序关系,称之为Happens-Before。要想保证执行操作B的线程看到操作A的结果(无论A和B是否在同一个线程中执行),那么在A和B之间必须满足Happens-Before 关系。如果两个操作之间缺乏Happens-Before 关系,那么JVM可以对它们任意地重排序。
当一个变量被多个线程读取并且至少被一个线程写入时,如果在读操作和写操作之间没有依照Happens-Before来排序,那么就会产生数据竞争问题。在正确同步的程序中不存在数据竞争,并会表现出串行一致性,这意味着程序中的所有操作都会按照一种固定的和全局的顺序执行。