Lock reentrantlock 使用reentrantlock可以完成sync的功能
CAS锁
需要注意的是,必须要必须要必须要手动释放锁 (重要的事情说三遍)
使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class T02_ReentrantLock2 { Lock lock = new ReentrantLock(); void m1 () { try { lock.lock(); for (int i = 0 ; i < 10 ; i++) { TimeUnit.SECONDS.sleep(1 ); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } void m2 () { try { lock.lock(); System.out.println("m2 ..." ); } finally { lock.unlock(); } } public static void main (String[] args) { T02_ReentrantLock2 rl = new T02_ReentrantLock2(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
可以根据tryLock的返回值来判定是否锁定
也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
1 2 3 4 5 6 7 8 9 10 11 12 void m2 () { boolean locked = false ; try { locked = lock.tryLock(5 , TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (locked) lock.unlock(); } }
使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,
在一个线程等待锁的过程中,可以被打断
打断了之前加的所
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Thread t2 = new Thread(()->{ try { lock.lockInterruptibly(); System.out.println("t2 start" ); TimeUnit.SECONDS.sleep(5 ); System.out.println("t2 end" ); } catch (InterruptedException e) { System.out.println("interrupted!" ); } finally { lock.unlock(); } }); t2.start();
1 2 ReentrantLock还可以指定为公平锁 private static ReentrantLock lock=new ReentrantLock(true );
生产者消费者模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 class Aircondtion { private int number = 0 ; public synchronized void increment () throws Exception { while (number != 0 ){ this .wait(); } number++; System.out.println(Thread.currentThread().getName()+"\t" +number); this .notifyAll(); } public synchronized void decrement () throws Exception { while (number == 0 ){ this .wait(); } number--; System.out.println(Thread.currentThread().getName()+"\t" +number); this .notifyAll(); } } class Aircondtion2 { private int number = 0 ; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment () throws Exception { lock.lock(); try { while (number != 0 ){ condition.await(); } number++; System.out.println(Thread.currentThread().getName()+"\t" +number); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public synchronized void decrement () throws Exception { lock.lock(); try { while (number == 0 ){ condition.await(); } number--; System.out.println(Thread.currentThread().getName()+"\t" +number); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } public class ProdConsumerDemo04 { public static void main (String[] args) throws Exception { Aircondtion aircondtion = new Aircondtion(); new Thread(() -> { for (int i = 1 ;i<=10 ;i++){ try { aircondtion.increment(); } catch (Exception e) { e.printStackTrace(); } } },"A" ).start(); new Thread(() -> { for (int i = 1 ;i<=10 ;i++){ try { aircondtion.decrement(); } catch (Exception e) { e.printStackTrace(); } } },"B" ).start(); new Thread(() -> { for (int i = 1 ;i<=10 ;i++){ try { aircondtion.increment(); } catch (Exception e) { e.printStackTrace(); } } },"C" ).start(); new Thread(() -> { for (int i = 1 ;i<=10 ;i++){ try { aircondtion.decrement(); } catch (Exception e) { e.printStackTrace(); } } },"D" ).start(); } }
synchronized与Lock的区别 两者区别: 1.首先synchronized是java内置关键字,在jvm层面,Lock是个java类; 2.synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁; 3.synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁; 4.用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了; 5.synchronized的锁可重入、不可中断、非公平 ,而Lock锁可重入、可判断、可公平 (两者皆可) 6.Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题。
Condition 精准的通知和唤醒线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 class ShareData { private int number = 1 ; private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void print5 () { lock.lock(); try { while (number!=1 ){ condition1.await(); }s for (int i = 1 ;i<=5 ;i++){ System.out.println(Thread.currentThread().getName()+"\t" +i); } number = 2 ; condition2.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print10 () { lock.lock(); try { while (number!=2 ){ condition2.await(); } for (int i = 1 ;i<=10 ;i++){ System.out.println(Thread.currentThread().getName()+"\t" +i); } number = 3 ; condition3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print15 () { lock.lock(); try { while (number!=3 ){ condition3.await(); } for (int i = 1 ;i<=15 ;i++){ System.out.println(Thread.currentThread().getName()+"\t" +i); } number = 1 ; condition1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } public class ConditionDemo { public static void main (String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 1 ;i<10 ;i++){ shareData.print5(); } },"A" ).start(); new Thread(() -> { for (int i = 1 ;i<10 ;i++){ shareData.print10(); } },"B" ).start(); new Thread(() -> { for (int i = 1 ;i<10 ;i++){ shareData.print15(); } },"C" ).start(); } }
Callable 和runable
1、可以有返回值
2、可以抛出异常
3、方法不同,run()/ call()
FutureTask是一个Runnable的实现类
他的构造方法可以和Callable有关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class MyThread implements Runnable { @Override public void run () { } } class MyThread2 implements Callable <Integer > { @Override public Integer call () throws Exception { System.out.println("*****come in call method " ); return 1024 ; } } public class CallableDemo { public static void main (String[] args) { FutureTask futureTask = new FutureTask(new MyThread2()); new Thread(futureTask,"A" ).start(); Integer o = (Integer) futureTask.get(); System.out.println(o); } }
有缓存
会阻塞
AQS(AbstractQueuedSynchronizer) 1 2 3 4 AQS是Doug Lea大师为JDK编写的一套基于API层面的抽象队列同步器. AbstractQueuedSynchronizer,抽象队列同步器. Lock,CountDownLatch等等这些并发工具都是基于AQS来实现的。 由此可以看出Doug Lea大师的功力已经臻至化境
AQS概述 AQS的核心思想是如果被请求的资源空闲,那么就将当前请求资源的线程设置为有效的工作线程; 如果请求的资源被其他线程所占有, 那么就使用CLH线程(FIFO)阻塞队列来提供阻塞线程并唤线程分配资源的机制。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。
在CLH队列中,每个请求资源的线程都会被封装成队列中的一个节点。
在AQS内部有一个int类型的state表示线程同步状态, 当线程lock获取到锁后,该state计数就加1,unlock就减1, 这就是为什么解锁次数要对应加锁次数的原因。
AQS主要实现技术为:CLH队列(Craig,Landin and Hagersten), 自旋CAS,park(阻塞线程)以及unparkSuccessor(唤醒阻塞队列中的后继线程)。
用大白话来说,AQS就是基于CLH队列,用volatile修饰共享变量state,线程通过CAS去改变状态符,成功则获取锁成功,失败则进入等待队列,等待被唤醒。
AQS的两种共享资源的访问方式
AQS定义了两种共享资源方式.
独占式(Exclusive): 同一时间只有一个线程可以访问共享资源,也就是独占锁。 如:Synchronized,ReentrantLock。对于独占式锁的实现,在AQS中对应tryAcquire获取锁和tryRelease释放锁。
共享式(Share): 同一时间允许多个线程同时访问共享资源,也就是共享锁。 CountDownLatch,Semaphore,ReentrantReadWriteLock的ReadLock都是共享锁。对于共享式锁的实现,在AQS中对应tryAcquireShare获取锁和tryReleaseShare释放锁。
lock,tryLock和lockInterruptibly区别 PS: AQS中的锁计数指的是 state 变量。
lock: 如果线程获取到了锁或线程已经拥有了锁就更改锁计数, 否则线程就加入阻塞队列并一直CAS自旋获取。
tryLock: 线程尝试获取锁,如果线程获取到了锁或线程已经拥有了锁就更改锁计数,否则返回false。
lockInterruptibly: 如果线程在获取锁之前被设置了中断状态,那么当线程获取锁时就会响应中断状态, 抛出InterruptedException异常。如果获取不到就加入阻塞队列并自旋获取,并且阻塞自旋期间还会响应中断, 也就是说在阻塞自旋期间可能抛出InterruptedException异常。所以lockInterruptibly优先响应中断,而不是优先获取锁。 如果线程获取到了锁或线程已经拥有了锁才更改锁计数。
CountDownLatch 倒计时 不一定在一个线程只减一下,可以多下
CountDownLatch允许count个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch是共享锁的一种实现,它默认构造AQS的state为count。 当线程使用countDown方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state, 直至state为0就代表所有的线程都调用了countDown方法。 假如某线程A调用await方法时,如果state不为0,就代表还有线程未执行countDown方法, 那么就把线程A放入阻塞队列Park,并自旋CAS判断state == 0。 直至最后一个线程调用了countDown,使得state == 0, 于是阻塞的线程判断成功,并被唤醒,就继续往下执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static void usingCountDownLatch () { Thread[] threads = new Thread[100 ]; CountDownLatch latch = new CountDownLatch(threads.length); for (int i=0 ; i<threads.length; i++) { threads[i] = new Thread(()->{ int result = 0 ; for (int j=0 ; j<10000 ; j++) result += j; latch.countDown(); }); } for (int i = 0 ; i < threads.length; i++) { threads[i].start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end latch" ); }
CyclicBarrier 栅栏 达到一定条件栅栏开放
条件 动作 限流
CycliBarrier的功能与CountDownLatch相似,但是CountDownLatch的实现是基于AQS的, 而CycliBarrier是基于ReentrantLock(ReentrantLock也属于AQS同步器)和Condition的。
CountDownLatch虽然可以令多个线程阻塞在同一代码处,但只能await一次就不能使用了。 而CycliBarrier有Generation代的概念,一个代,就代表CycliBarrier的一个循环,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) { CyclicBarrier barrier = new CyclicBarrier(20 , () -> System.out.println("满人" )); for (int i=0 ; i<100 ; i++) { new Thread(()->{ try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }
Phaser 让线程阶段性的同步执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static class MarriagePhaser extends Phaser { @Override protected boolean onAdvance (int phase, int registeredParties) { switch (phase) { case 0 : System.out.println("所有人到齐了!" + registeredParties); System.out.println(); return false ; case 1 : System.out.println("所有人吃完了!" + registeredParties); System.out.println(); return false ; case 2 : System.out.println("所有人离开了!" + registeredParties); System.out.println(); return false ; case 3 : System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties); return true ; default : return true ; } } } phaser.bulkRegister(7 ); phaser.arriveAndDeregister(); phaser.arriveAndAwaitAdvance();
ReadWriteLock 读写锁 :共享锁、排他锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class T10_TestReadWriteLock { static Lock lock = new ReentrantLock(); private static int value; static ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); static Lock readLock = readWriteLock.readLock(); static Lock writeLock = readWriteLock.writeLock(); public static void read (Lock lock) { try { lock.lock(); Thread.sleep(1000 ); System.out.println("read over!" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void write (Lock lock, int v) { try { lock.lock(); Thread.sleep(1000 ); value = v; System.out.println("write over!" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main (String[] args) { Runnable readR = ()-> read(readLock); Runnable writeR = ()->write(writeLock, new Random().nextInt()); for (int i=0 ; i<18 ; i++) new Thread(readR).start(); for (int i=0 ; i<2 ; i++) new Thread(writeR).start(); } }
Semaphore 信号量 公平锁
Semaphore允许一次性最多(不是同时)permits个线程执行任务。
Semaphore与CountDownLatch一样,也是共享锁的一种实现。它默认构造AQS的state为permits。 当执行任务的线程数量超出permits,那么多余的线程将会被放入阻塞队列Park,并自旋判断state是否大于0。 只有当state大于0的时候,阻塞的线程才有机会继续执行,此时先前执行任务的线程继续执行release方法, release方法使得state的变量会加1,那么自旋的线程便会判断成功。
如此,每次只有不超过permits个的线程能自旋成功,便限制了执行任务线程的数量。 所以这也是我为什么说它可能不是permits个线程同时执行, 因为只要state>0,线程就有机会执行.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class T11_TestSemaphore { public static void main (String[] args) { Semaphore s = new Semaphore(2 , true ); new Thread(()->{ try { s.acquire(); System.out.println("T1 running..." ); Thread.sleep(200 ); System.out.println("T1 running..." ); } catch (InterruptedException e) { e.printStackTrace(); } finally { s.release(); } }).start(); new Thread(()->{ try { s.acquire(); System.out.println("T2 running..." ); Thread.sleep(200 ); System.out.println("T2 running..." ); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }