JUC基础知识点(二)
6、多线程锁
synchronized实现同步的基础:Java中的每一个对象都可以作为锁
具体表现为以下3种形式:
-
对于普通同步方法,锁是当前实例对象
-
对于静态同步方法,锁是当前类的Class对象
-
对于同步方法块,锁是Synchronized括号里配置的对象
公平锁和非公平锁
//无参默认为非公平锁,false也是非公平锁private final ReentrantLock lock = new ReentrantLock();
-
非公平锁:线程饿死,效率高
-
公平锁:阳光普照,效率相对较低
可重入锁(递归锁)
synchronized(隐式)和Lock(显式)都是可重入锁
进入了第一道大门,里面的门可以无障碍进入
public class SyncLockDemo{ public static void main(String[] args){ /**Object o = new Object; new Thread(() -> { synchronized(o){ System.out.println(Thread.currentThread().getName()+"外层"); synchronized(o){ System.out.println(Thread.currentThread().getName()+"中层"); synchronized(o){ System.out.println(Thread.currentThread().getName()+"内层"); } } } },"t1").start();*/ //Lock演示可重入锁 Lock lock = new ReentrantLock(); new Thread(() -> { try{ lock.lock(); System.out.println(Thread.currentThread().getName()+"外层"); try{ lock.lock(); System.out.println(Thread.currentThread().getName()+"内层层"); }finally{ lock.unlock(); } }finally{ lock.unlock(); } }); }}
死锁
什么是死锁:两个或者两个以上进程在执行过程中,因为抢夺资源而造成一种互相等待的现象,如果没有外力干涉,他们无法再执行下去
产生死锁的原因:
-
系统资源不足
-
进程运行推进的顺序不合适
-
资源分配不当
public class DeadLock{ static Object a = new Object(); static Object b = new Object(); public static void main(String[] args){ new Thread(() -> { synchronized(a){ System.out.println(Thread.currentThread().getName()+"持有锁a,试图获取锁b"); TimeUnit.SECONDS.sleep(1);//需要try catch synchronized(b){ System.out.println(Thread.currentThread().getName()+"获取锁b"); } } },"A").start(); new Thread(() -> { synchronized(b){ System.out.println(Thread.currentThread().getName()+"持有锁b,试图获取锁a"); TimeUnit.SECONDS.sleep(1);//需要try catch synchronized(a){ System.out.println(Thread.currentThread().getName()+"获取锁a"); } } },"B").start(); }}
验证是否是死锁
-
jps 类似于 linux ps -ef
-
jstack jvm自带堆栈跟踪工具
7、Callable锁
目前我们学习了有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是,当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable接口
Callable接口的特点如下(重点)
-
为了实现Runnable(),需要实现不返回任何内容的run()方法,而对于Callable,需要实现在完成时返回结果的call()方法
-
call()方法可以引发异常,而run不能
-
为实现Callable而必须重写call()方法
//比较两个接口class MyThread1 implements Runnable{ @Override public void run(){ }}class MyThread2 implements Callable{ @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName()+"come in callable"); return 200; }}public class Demo1{ public static void main(String[] args){ //Runnable接口创建线程 new Thread(new MyThread1(), "AA").start(); //Callable,这样会报错 //new Thread(new MyThread2(), "BB").start(); //找一个类,既和Runnable有关系,又和Callable有关系 //Runnable接口有实现类FutureTask //FutureTask构造可以传递Callable FutureTask futureTask1 = new FutureTask(new MyThread()); //lambda表达式 FutureTask futureTask2 = new FutureTask(() -> { System.out.println(Thread.currentThread().getName()+"come in callable"); return 1024; }); //创建一个线程 new Thread(futureTask2, "lucy").start(); new Thread(futureTask1, "mary").start(); // while(!futureTask2.isDone()){// Syetem.out.println("wait...");// } //调用FutureTask的get方法 System.out.println(futureTask2.get()); System.out.println(Thread.currentThread().getName()+"come over"); //Future原理 未来任务 /**1.老师上课口渴了,去买水不合适,讲课线程继续,单开启线程找班长帮忙买水,把水买回来直接get 2.4个同学 1 1+2+...5 2 10+11+...50 3 60+61+61 4 100+200 第二个同学的计算量比较大 FutureTask单开启线程给2同学计算,先汇总1 3 4,再等2同学计算完成,统一汇总 */ }}
8、JUC强大的辅助类
8.1 减少计数CountDownLatch
CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行-1的操作,使用await()方法等待计数器不大于0,然后继续执行await()方法之后的语句
-
CountDownLatch主要有两个方法,当一个或多个线程调用await()方法时,这些线程不会阻塞
-
其他线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
-
当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
public class CountDownLatchDemo{ //6个同学陆续离开教室之后,班长锁门 public static void main(String[] args) throws InterruptedException{ //创建CountDownLatch对象,设置初始值 CountDownLatch countDownLatch = new CountDownLatch(6); for(int i = 1; i { System.out.println(Thread.currentThread().getName()+"号同学离开了教室"); //计数 -1 countDownLatch.countDown(); },String.valueOf(i)).start(); } //等待 countDownLatch.countDown(); System.out.println(Thread.currentThread().getName()+"班长锁门走人了"); }}
8.2 循环栅栏CyclicBarrier
CyclicBarrier看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行cyclicBarrier.await()之后的语句。可以将CyclicBarrier理解为+1操作
场景:集合七颗龙珠召唤神龙
public class CyclicBarrierDemo{ //创建固定值 public static final int NUMBER = 7; public static void main(String[] args){ //创建CyclicBarrier CyclicBarrier cyclicBarrier = new CyclicBarrier((NUMBER) -> { System.out.println("集齐七颗龙珠就可以召唤神龙"); }); //集齐七颗龙珠过程 for(int i = 1; i { try{ System.out.println(Thread.currentThread().getName()+"星龙被收集到了"); //等待 cyclicBarrier.await(); }catch(Exception e){ e.printStackTrace(); } },String.valueOf(i)).start(); } }}
8.3 信号量Semaphore
public class SemaphoreDemo{ public static void main(String[] args){ //创建Semaphore,设置许可数量 Semaphore semaphore = new Semaphore(3); //模拟六辆汽车 for(int i = 1; i { try{ //抢占 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"抢到了车位"); //设置随机停车时间 TimeUnit.SECONDS.sleep(new Random().nextInt(5)); System.out.println(Thread.currentThread().getName()+"离开了车位"); }catch(InterruptedException e){ e.printStackTrace(); }finally{//释放 semaphore.release(); } },String.valueOf(i)).start(); } }}
9、ReentrantReadWriteLock读写锁
悲观锁、乐观锁、表锁、行锁、读锁、写锁
class MyCache{//资源类 //创建map集合 加volatile是因为map是不断变化的内容 private volatile Map map = new HashMap(); //创建读写锁对象 private ReadWriteLock rwLock = new ReentrantReadWriteLock(); //放数据 public void put(String key, Object value){ //添加写锁 rwLock.writeLock().lock(); try{ System.out.println(Thread.currentThread().getName()+"正在写操作"+key); //暂停一会 TimeUnit.MICROSECONDS.sleep(300); //放数据 map.put(key, value); System.out.println(Thread.currentThread().getName()+"写完了"+key); }catch(InterruptedException e){ e.printStackTrace(); }finally{//释放写锁 rwLock.writeLock().unlock(); } } //取数据 public Object get(String key){ //添加读锁 rwLock.readLock().lock(); Object res = null; try{ System.out.println(Thread.currentThread().getName()+"正在读取操作"); TimeUnit.MICROSECONDS.sleep(300); res = map.get(key); System.out.println(Thread.currentThread().getName()+"取完了"+key); }catch(InterruptedException e){ e.printStackTrace(); }finally{//释放读锁 rwLock.readLock().unlock(); } return res; }}public class ReadWriteLockDemo{ public static void main(String[] args){ MyCache myCache = new Cache(); //创建线程释放数据 for(int i = 1; i { myCache.put(num+"", num+""); },String.valueOf(i)).start(); } //创建线程取数据 for(inty i = 1; i { myCache.get(num+""); },String.valueOf(i)).start(); } }}
读写锁:一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享
读写锁的演变
-
无锁:多个线程抢夺资源 乱
-
添加锁:使用synchronized和ReentrantLock,都是独占的,每次只能来一个操作。读读 1 不能共享 读写 1 写写 1
-
读写锁:ReentrantReadWriteLock,读读可以共享,提升性能,同时多人进行读操作 写写 1
-
缺点:造成锁饥饿,一直读,没有写操作,比如坐地铁一个人下一堆人上,又把你挤上去了
-
读时候,不能写,只有读完成之后才可以写,写操作可以读
-
读写锁的降级
锁降级:将写锁降级为读锁,读锁不能升级为写锁
增加修改删除 -- 写 查询 -- 读
JDK8说明:获取写锁 --> 获取读锁 --> 释放写锁 --> 释放读锁
//演示读写锁降级public class Demo1{ public static void main(String[] args){ //可重入读写锁对象 ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();//读锁 ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();//写锁 //锁降级 writeLock.lock();//1. 获取写锁 System.out.println("atguigu"); readLock.lock();//2. 获取读锁 System.out.println("---read"); writeLock.unlock();//3. 释放写锁 readLock.unlock();//4. 释放读锁 }}
10、BlockingQueue阻塞队列
10.1 阻塞队列概述
阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出
当队列是空的,从队列中获取元素的操作将会被阻塞;当队列是满的,从队列中添加元素的操作将会被阻塞 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。
为什么需要BlockingQueue? 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了,在concurrent包发布之前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这些会给我们的程序带来不小的复杂度
10.2 阻塞队列的架构
-
父接口:Collection、Iterable、Queue
-
子接口:BlockingDeque
-
实现类:ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue
10.3 阻塞队列分类
-
ArrayBlockingQueue(常用):由数组结构组成的有界阻塞队列
-
LinkedBlockingQueue(常用):由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
-
DelayQueue:使用优先级队列实现的延迟无界阻塞队列
-
PriorityBlockingQueue:支持优先级排列的无界阻塞队列
-
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
-
LinkedTransferQueue:由链表组成的无界阻塞队列
-
LinkedBlockingDeque:由链表组成的双向阻塞队列
10.4 阻塞队列核心方法
-
抛出异常那组:add(e)、remove()、element()
-
特殊值:offer(e)、poll()、peek()
-
阻塞:put(e)、take()、不可用
-
超时:offer(e, time, unit)、poll(time, unit)、不可用
public class BlockingQueueDemo{ public static void main(String[] args){ //创建阻塞队列 BlockingQueue blockingQueue = new ArrayBlockingQueue(3); //第一组 System.out.println(blockingQueue.add("a"));//true System.out.println(blockingQueue.add("b"));//true System.out.println(blockingQueue.add("c"));//true //System.out.println(blockingQueue.element());//a //System.out.println(blockingQueue.add("w"));//异常 System.out.println(blockingQueue.remove());//a System.out.println(blockingQueue.remove());//b System.out.println(blockingQueue.remove());//c //System.out.println(blockingQueue.remove());//异常 //第二组 System.out.println(blockingQueue.offer("a"));//true System.out.println(blockingQueue.offer("b"));//true System.out.println(blockingQueue.offer("c"));//true System.out.println(blockingQueue.offer("www"));//false System.out.println(blockingQueue.poll());//a System.out.println(blockingQueue.poll());//b System.out.println(blockingQueue.poll());//c System.out.println(blockingQueue.poll());//null //第三组 blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); //blockingQueue.put("w");//一直阻塞 不结束 System.out.println(blockingQueue.take());//a System.out.println(blockingQueue.take());//b System.out.println(blockingQueue.take());//c System.out.println(blockingQueue.take());//阻塞 //第四组 System.out.println(blockingQueue.offer("a"));//true System.out.println(blockingQueue.offer("b"));//true System.out.println(blockingQueue.offer("c"));//true System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS));//false }}
11、ThreadPool线程池
11.1 线程池概述
线程池(Thread Pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能保证内核态的充分利用,还能防止过分调度。
线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等待,等其他线程执行完毕,再从队列中取出任务来执行。
11.2 线程池架构
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor、Executors、ExecutorService、ThreadPoolExecutor这几个类
11.3 线程池使用方式
Executors.newFixedThreadPool(int):一池N线程
Executors.newSingleThreadExecutor():一个任务一个任务执行,一池一线程
Executors.newCachedThreadPool():线程池根据需求创建线程,可扩容,遇强则强
public class ThreadPoolDemo1 { //演示线程池三种常用分类 public static void main(String[] args){ //一池五线程 //EutorService threadPool1 = Executors.newFixedThreadPool(5);//5个窗口 //一池一线程 //EutorService threadPool2 = Executors.newSingleThreadExecutor();//一个窗口 //一池可扩容线程 ExecutorService threadPool3 = Executors.newCachedThreadPool(); //10个顾客请求 try{ for(int i = 1; i { System.out.println(Thread.currentThread().getName()+"办理业务"); }); } }catch(Exception e){ e.printStackTrace(); }finally{//关闭 threadPool3hutdown(); }}
11.4 线程池底层原理
都是用的ThreadPoolExecutor来创建的
11.5 线程池的七个参数
-
int corePoolSize 核心线程数量
-
int maximumPoolSize 最大线程数量
-
long keepAliveTime
-
TimeUnit unit 空闲线程存活时间
-
BlockingQueue workQueue 阻塞队列
-
ThreadFactory threadFactory 线程工厂
-
RejectedExecutionHandler handler 拒绝策略
11.6 线程池底层工作流程
JDK内置的拒绝策略
-
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
-
CallerRunsPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量(谁让你来的找谁去)
-
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中再次提交当前任务
-
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略
11.7 自定义线程池——实际常用
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式 这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
//自定义线程池创建public class ThreadPoolDemo2{ public static void main(String[] args){ ExecutorService threadPool1 = new ThreadPoolExecutor(2, 5, 2L, TimeUnit.SECONDS, new ArrayBlockingQueue(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //10个顾客请求 try{ for(int i = 1; i { System.out.println(Thread.currentThread().getName()+"办理业务"); }); } }catch(Exception e){ e.printStackTrace(); }finally{//关闭 threadPool1hutdown(); } }}
12、Fork/Join分支合并框架
Fork/join框架简介:Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
class MyTask extends RecursiveTask { //拆分差值不能超过10,计算10以内运算 private static final Integer VALUE = 10; private int begin;//拆分开始值 private int end;//拆分结束值 private int res;//返回结果 //创建有参数构造 public MyTask(int begin, int end){ this.begin = begin; this.end = end; } //拆分和合并过程 @Override protected Integer computer(){ //判断相加的两个数值是否大于10 if((end - begin)<=VALUE){ //相加操作 for(int i = begin; i <= end; i++){ res = res + i; } }else{//进一步拆分 //获取中间值 int midddle = (begin+end) / 2; //拆分左边 MyTask task01 = new MyTask(begin, middle); //拆分右边 MyTask task02 = new MyTask(middle+1, end); //调用方法拆分 task01.fork(); task02.fork(); //合并结果 res = task01.join() + task02.join(); } return res; }}public class ForkJoinDemo{ public static void main(String[] args){ //创建MyTask对象 MyTask myTask = new MyTask(0, 100); //创建分支合并池对象 ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask forkJoinTask = forkJoinPool.submit(myTask); //获取最终合并之后结果 Integer res = forkJoinTask.get(); System.out.println(res); //关闭池对象 forkJoinPool.shutdown(); }}
13、CompletableFuture异步回调
同步: 就相当于起床,要先穿衣服,再穿鞋,再洗漱;是按一定顺序的,你做一件事的时候就不能做另一件事。
异步: 就相当于你吃饭和看电视这两件事情是可以一起进行的,能够节约时间,提高效率。
public class CompletableFutureDemo{ public static void main(String[] args) throws Exception{ //异步调用 没有返回值 CompletableFuture completableFuture1 = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()+"completableFuture1"); }); completableFuture1.get(); //异步调用 有返回值 CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()+"completableFuture2"); //模拟异常 int i = 1/0; reurn 1024; }); completableFuture2.whenComplete((t, u) -> { System.out.println("----t="+t);//返回值 System.out.println("----u="+u);//异常 }).get(); }}