集合类不安全 List 不安全
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ListTest { public static void main (String[] args) { List<String> list = new CopyOnWriteArrayList<>(); for (int i = 1 ; i <= 10 ; i++) { new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(list); },String.valueOf(i)).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
lock 利用Arrays.copyof()来复制一份新数组,再进行设置
Set 不安全 1 2 3 Set<String> set = new CopyOnWriteArraySet<>();
hashSet 底层是什么?
1 2 3 4 5 6 7 8 public HashSet () { map = new HashMap<>(); } public boolean add (E e) { return map.put(e, PRESENT)==null ; } private static final Object PRESENT = new Object();
Map 不安全 hashMap
加载因子 0.75
位运算 16
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class MapTest { public static void main (String[] args) { Map<String, String> map = new ConcurrentHashMap<>(); for (int i = 1 ; i <=30 ; i++) { new Thread(()->{ map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring( 0 ,5 )); System.out.println(map); },String.valueOf(i)).start(); } } }
ConcurrentHashMap 阻塞队列 FIFO 写入 :队列慢,就阻塞, 取:队列空,就必须等待生产
)
BlockingQueue
阻塞队列:多线程并发处理,线程池!
学会使用队列
方式
抛出异常
有返回值,不抛出异常
阻塞 等待
超时等待
添加
add
offer()
put()
offer(,,)
移除
remove
poll()
take()
poll(,)
检测队首元素
element
peek
添加、移除
SynchronousQueue 同步队列
没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class SynchronousQueueDemo { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" put 1" ); blockingQueue.put("1" ); System.out.println(Thread.currentThread().getName()+" put 2" ); blockingQueue.put("2" ); System.out.println(Thread.currentThread().getName()+" put 3" ); blockingQueue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } },"T1" ).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()+"=>" +blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()+"=>" +blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()+"=>" +blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2" ).start(); } }
线程池 线程池:三大方法、7大参数、4种拒绝策略
池化技术 程序的运行,本质:占用系统的资源! 优化资源的使用!=>池化技术
线程池、连接池、内存池、对象池///….. 创建、销毁。十分浪费资源
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。
线程池的好处
1、降低资源的消耗
2、提高响应的速度
3、方便管理。
线程复用、可以控制最大并发数、管理线程
线程池:三大方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Demo01 { public static void main (String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); try { for (int i = 0 ; i < 100 ; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok" ); }); } } catch (Exception e) { e.printStackTrace(); }finally { threadPool.shutdown(); } } }
7大参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor (ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
corePoolSize: 线程池的核心线程数(常驻线程数),也就是线程池的最小线程数,这部分线程不会被回收.
maximumPoolSize: 线程池最大线程数,线程池中允许同时执行的最大线程数量
keepAliveTime: 当线程池中的线程数量超过corePoolSize,但此时没有任务执行, 那么空闲的线程会保持keepAliveTime才会被回收,corePoolSize的线程不会被回收。
unit: KeepAliveTime的时间单位
workQueue: 当线程池中的线程达到了corePoolSize的线程数量, 并仍然有新任务,那么新任务就会被放入workQueue。
threadFactory: 创建工作线程的工厂,也就是如何创建线程的,一般采用默认的
handler: 拒绝策略,当线程池中的工作线程达到了最大数量, 并且阻塞队列也已经满了,那么拒绝策略会决定如何处理新的任务。ThreadPoolExecutor 提供了四种策略:
AbortPolicy(是线程池的默认拒绝策略): 如果使用此拒绝策略,那么将对新的任务抛出RejectedExecutionException异常,来拒绝任务。
DiscardPolicy: 如果使用此策略,那么会拒绝执行新的任务,但不会抛出异常。
DiscardOldestPolicy: 如果使用此策略,那么不会拒绝新的任务,但会抛弃阻塞队列中等待最久的那个线程。
CallerRunsPolicy: 如果使用此策略,不会拒绝新的任务,但会让调用者执行线程。 也就是说哪个线程发出的任务,哪个线程执行。
四种拒绝策略 1 2 3 4 new ThreadPoolExecutor.AbortPolicy() new ThreadPoolExecutor.CallerRunsPolicy() new ThreadPoolExecutor.DiscardPolicy() new ThreadPoolExecutor.DiscardOldestPolicy()
阿里巴巴开发者手册 不建议开发者使用Executors创建线程池
newFixedThreadPool和newSingleThreadPoolExecutor都是创建固定线程的线程池, 尽管它们的线程数是固定的,但是它们的阻塞队列的长度却是Integer.MAX_VALUE的,所以, 队列的任务很可能过多,导致OOM。
newCacheThreadPool和newScheduledThreadPool创建出来的线程池的线程数量却是Integer.MAX_VALUE的, 如果任务数量过多,也很可能发生OOM。
手动创建一个线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class ThreadPoolTest { public static void main (String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2 , 5 , 3 , TimeUnit.SECONDS, new LinkedBlockingDeque<>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { for (int i = 1 ; i <= 9 ; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
拓展 1 2 3 4 5 最大线程到底该如何定义 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高! Runtime.getRuntime().availableProcessors(), 2、IO 密集型 > 判断你程序中十分耗IO的线程, 程序 15个大型任务 io十分占用资源!
四大函数式接口(必需掌握) lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口: 只有一个方法的接口 1 2 3 4 5 6 7 8 9 @FunctionalInterface public interface Runnable { public abstract void run () ; }
四大函数式接口
Consumer、Function 、Predicate、Supplier
Function
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { Function<String,String> function = str->{return str;}; System.out.println(function.apply("asd" )); }
Predicate 断定型接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { Predicate<String> predicate = (str)->{ return str.isEmpty(); }; System.out.println(predicate.test("" )); }
Consumer 消费者接口
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) { Consumer<String> consumer = str ->{ System.out.println(str); }; consumer.accept("aaaa" ); }
Supplier 供给型接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { Supplier supplier = ()->{ System.out.println("get" ); return 1024 ; }; System.out.println(supplier.get()); }
Stream流式计算 大数据:存储 + 计算
集合、MySQL 本质就是存储东西的;
计算都应该交给流来操作!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class Test { public static void main (String[] args) { User u1 = new User(1 ,"a" ,21 ); User u2 = new User(2 ,"b" ,22 ); User u3 = new User(3 ,"c" ,23 ); User u4 = new User(4 ,"d" ,24 ); User u5 = new User(6 ,"e" ,25 ); List<User> list = Arrays.asList(u1, u2, u3, u4, u5); list.stream() .filter(u->{return u.getId()%2 ==0 ;}) .filter(u->{return u.getAge()>23 ;}) .map(u-> {return u.getName().toUpperCase();}) .sorted( (uu1,uu2)->{return uu2.compareTo(uu1);}) .limit(1 ) .forEach(System.out::println); } }
ForkJoin ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量!
大数据:Map Reduce (把大任务拆分为小任务)
ForkJoin 特点:工作窃取 这个里面维护的都是双端队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public class ForkJoinDemo extends RecursiveTask <Long > { private Long start; private Long end; private Long temp = 10000L ; public ForkJoinDemo (Long start, Long end) { this .start = start; this .end = end; } @Override protected Long compute () { if ( (end-start)<temp){ Long sum = 0L ; for (Long i=start;i<end;i++){ sum += i; } return sum; }else { long middle = (start + end) / 2 ; ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo(middle+1 , end); task2.fork(); return task1.join() + task2.join(); } } } public class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { test3(); } public static void test1 () { Long sum = 0L ; long start = System.currentTimeMillis(); for (Long i = 1L ; i <= 10_0000_0000 ; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println("sum=" +sum+" 时间:" +(end-start)); } public static void test2 () throws ExecutionException, InterruptedException { Long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L , 10_0000_0000L ); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum=" +sum+" 时间:" +(end-start)); } public static void test3 () { long start = System.currentTimeMillis(); long sum = LongStream.rangeClosed(0L , 10_0000_0000L ).parallel().reduce(0 , Long::sum); long end = System.currentTimeMillis(); System.out.println("sum=" +sum+"时间:" +(end-start)); } }
Future 设计的初衷: 对将来的某个事件的结果进行建模
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class Demo1 { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync( ()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync=>supply" ); return 1024 ; }); System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t:" + t); System.out.println("u:" + u); }).exceptionally((e) -> { System.out.println(e.getMessage()); return 233 ; }).get()); } }
单例模式 饿汉式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Hungry { private byte [] data1 =new byte [1024 *1024 ]; private byte [] data2 =new byte [1024 *1024 ]; private byte [] data3 =new byte [1024 *1024 ]; private byte [] data4 =new byte [1024 *1024 ]; private Hungry () { } private final static Hungry HUNGRY = new Hungry(); public static Hungry getInstance () { return HUNGRY; } }
懒汉式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class LazyMan { private static boolean liuqi = false ; private LazyMan () { synchronized (LazyMan.class){ if (liuqi==false ){ liuqi=true ; }else { throw new RuntimeException("不要使用反射破坏异常" ); } } System.out.println(Thread.currentThread().getName()+"ok" ); } private volatile static LazyMan lazyMan; public static LazyMan getInstance () { if (lazyMan==null ){ synchronized (LazyMan.class){ if (lazyMan == null ) { lazyMan=new LazyMan(); } } } return lazyMan; } public static void main (String[] args) throws Exception { Field liuqi = LazyMan.class.getDeclaredField("liuqi" ); liuqi.setAccessible(true ); Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null ); declaredConstructor.setAccessible(true ); LazyMan instance = declaredConstructor.newInstance(); liuqi.set(instance,false ); LazyMan instance2 = declaredConstructor.newInstance(); System.out.println(instance); System.out.println(instance2); } }
静态内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Holder { private Holder () { } public static Holder getInstance () { return InnerClass.HOLDER; } public static class InnerClass { private static final Holder HOLDER = new Holder(); } }
枚举 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public enum EnumSingle { INSTANCE; private EnumSingle () { } public EnumSingle getInstance () { return INSTANCE; } } class Test { public static void main (String[] args) throws Exception { EnumSingle instance1 = EnumSingle.INSTANCE; Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null ); declaredConstructor.setAccessible(true ); EnumSingle instance2 = declaredConstructor.newInstance(String.class,int .class); System.out.println(instance1); System.out.println(instance2); } }
原子引用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class CASDemo { static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1 ,1 ); public static void main (String[] args) { new Thread(()->{ int stamp = atomicStampedReference.getStamp(); System.out.println("a1=>" +stamp); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } Lock lock = new ReentrantLock(true ); atomicStampedReference.compareAndSet(1 , 2 , atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 ); System.out.println("a2=>" +atomicStampedReference.getStamp()); System.out.println(atomicStampedReference.compareAndSet(2 , 1 , atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 )); System.out.println("a3=>" +atomicStampedReference.getStamp()); },"a" ).start(); new Thread(()->{ int stamp = atomicStampedReference.getStamp(); System.out.println("b1=>" +stamp); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicStampedReference.compareAndSet(1 , 6 , stamp, stamp + 1 )); System.out.println("b2=>" +atomicStampedReference.getStamp()); },"b" ).start(); } }
注意: Integer 使用了对象缓存机制,默认范围是 -128 ~ 127 ,推荐使用静态工厂方法 valueOf 获取对象实例,而不是 new,因为 valueOf 使用缓存,而 new 一定会创建新的对象分配新的内存空间;
锁的理解 1.公平锁,非公平锁 公平锁: 非常公平, 不能够插队,必须先来后到!
非公平锁:非常不公平,可以插队 (默认都是非公平)
1 2 3 4 5 6 public ReentrantLock () { sync = new NonfairSync(); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
2.可重入锁 递归锁
synchronize 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Demo01 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sms(); },"A" ).start(); new Thread(()->{ phone.sms(); },"B" ).start(); } } class Phone { public synchronized void sms () { System.out.println(Thread.currentThread().getName() + "sms" ); call(); } public synchronized void call () { System.out.println(Thread.currentThread().getName() + "call" ); } }
3.自旋锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class SpinlockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void myLock () { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"==>mylock" ); while (!atomicReference.compareAndSet(null ,thread)){ } } public void myUnLock () { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"==>mylock" ); atomicReference.compareAndSet(thread,null ); } }
4.死锁 1、使用 jps -l 定位进程号
2、使用 jstack 进程号 找到死锁问题
排查问题
1.日志
2.堆栈信息