JAVA-JUC并发编程
JUC:java.util.concurrent
环境准备
1.maven项目
2.准备一个lombok
org.projectlombok lombok 1.18.22
3.项目的Project Structure
的Project
的SDK
改为1.8
,Language level
改为8
;Modules
的Sources
的Language level
改为8
4.Settings->Java Compiler
的Target bytecode version
改为8
并发与并行
- 并发编程的本质:充分使用CPU资源
- 并发:多个线程操作同一个资源,针对CPU单核的,通过快速交替,来模拟多条线程
- 并行:CPU多核,多个线程可以同时执行,线程池
package com.cx;public class Test1 { public static void main(String[] args) { //获取cpu核数 //CPU密集型,IO密集型 System.out.println(Runtime.getRuntime().availableProcessors()); }}
synchronized
package com.cx;/** * 线程应该是一个单独的资源类,没有其他附属操作 * 1.属性,方法 */public class Test2 { public static void main(String[] args){ Ticket ticket = new Ticket(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { ticket.sell(); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { ticket.sell(); } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { ticket.sell(); } },"C").start(); }}//资源类class Ticket{ //属性 private int num=29; //方法 //synchronized相当于队列 public synchronized void sell(){ if (num>0){ System.out.println(Thread.currentThread().getName()+"买了第"+num--+"张票,还剩下"+num+"张票."); } }}
生产者-消费者(synchronized实现)
package com.cx;public class Test4 { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); }}//等待-业务-通知class Data{ private int num=0; public synchronized void increment() throws InterruptedException { if (num!=0){ //等待 this.wait(); } num++; System.out.println(Thread.currentThread().getName()+"+"+num); //通知其他线程,此线程+1完成 this.notifyAll(); } public synchronized void decrement() throws InterruptedException { if (num==0){ //等待 this.wait(); } num--; System.out.println(Thread.currentThread().getName()+"-"+num); //通知其他线程,此线程-1完成 this.notifyAll(); }}
当new两个加,两个减的线程就会出问题,这是因为if导致的,这种情况叫做虚假唤醒,从官方文档中可以看出,等待应该是发生在循环中的,所以我们将if换成while就不会出错了
package com.cx;public class Test4 { public static void main(String[] args) { Data data = new Data(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); }}//等待-业务-通知class Data{ private int num=0; public synchronized void increment() throws InterruptedException { while (num!=0){ //等待 this.wait(); } num++; System.out.println(Thread.currentThread().getName()+"+"+num); //通知其他线程,此线程+1完成 this.notifyAll(); } public synchronized void decrement() throws InterruptedException { while (num==0){ //等待 this.wait(); } num--; System.out.println(Thread.currentThread().getName()+"-"+num); //通知其他线程,此线程-1完成 this.notifyAll(); }}
Lock
- 公平锁:先来后到,例如3h,3m,这样会等待3h先执行完后,才会执行3m的
- 非公平锁(默认):不遵循先来后到
package com.cx;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 线程应该是一个单独的资源类,没有其他附属操作 * 1.属性,方法 */public class Test3 { public static void main(String[] args){ Ticket2 ticket = new Ticket2(); new Thread(()->{for (int i = 0; i < 10; i++) ticket.sell();},"A").start(); new Thread(()->{for (int i = 0; i < 10; i++) ticket.sell();},"B").start(); new Thread(()->{for (int i = 0; i < 10; i++) ticket.sell();},"C").start(); }}//资源类class Ticket2{ //属性 private int num=29; Lock lock=new ReentrantLock(); //方法 //synchronized相当于队列 public void sell(){ //加锁 lock.lock(); try { //业务代码 if (num>0){ System.out.println(Thread.currentThread().getName()+"买了第"+num--+"张票,还剩下"+num+"张票."); } } catch (Exception e) { e.printStackTrace(); } finally { //解锁 lock.unlock(); } }}
生产者-消费者(Lock实现)
Condition:精准的通知和唤醒线程
package com.cx;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Test5 { public static void main(String[] args) { Data2 data = new Data2(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.p1(); } catch (InterruptedException e) { e.printStackTrace(); } } },"p1").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.p2(); } catch (InterruptedException e) { e.printStackTrace(); } } },"p2").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { try { data.p3(); } catch (InterruptedException e) { e.printStackTrace(); } } },"p3").start(); }}//等待-业务-通知class Data2{ private int num=1; Lock lock=new ReentrantLock(); Condition condition1=lock.newCondition(); Condition condition2=lock.newCondition(); Condition condition3=lock.newCondition(); public void p1() throws InterruptedException { lock.lock(); try { while (num!=1){ //等待 condition1.await(); } num=2; System.out.println(Thread.currentThread().getName()+"+"+num); //通知2线程,此线程完成 condition2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void p2() throws InterruptedException { lock.lock(); try { while (num!=2){ //等待 condition2.await(); } num=3; System.out.println(Thread.currentThread().getName()+"-"+num); //通知3线程,此线程完成 condition3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void p3() throws InterruptedException { lock.lock(); try { while (num!=3){ //等待 condition3.await(); } num=1; System.out.println(Thread.currentThread().getName()+"-"+num); //通知1线程 condition1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }}
八锁问题
package com.cx;import java.util.concurrent.TimeUnit;public class Test6 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ /** * 1.此时由于还没有进入当方法中,也就是还没锁住对象,所以此时先执行B,在执行A try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } */ phone.info(); },"A").start(); new Thread(()->{ phone.call(); },"B").start(); new Thread(()->{ phone.p1(); },"C").start(); }}class Phone{ //锁的对象是方法的调用者,谁先拿到谁先执行 public synchronized void info(){ System.out.println("发消息之前"); /** 2.此时由于已经进入方法中,锁住对象,所以在执行A的过程中,无论A占用多长时间,B也要等下去 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } */ /** * 3.此时先执行输出”发消息之前“,然后此处需要等待,但是普通方法不会等待 * 所以会输出”普通方法“,最后会执行之后的过程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } */ System.out.println("发消息"); } public synchronized void call(){ System.out.println("打电话"); } /** * 3.普通方法不受锁的影响:当被锁住的方法执行过程中遇到等待,此时普通 * 方法不必等待被锁住的方法完全执行完才会执行 */ public void p1(){ System.out.println("普通方法"); }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test7 { public static void main(String[] args) { Phone2 phone = new Phone2(); Phone2 phone2 = new Phone2(); new Thread(()->{ phone.info(); },"A").start(); //4.由于是两个对象,此时跟两把锁互补感染,此时下面延迟的时间 //跟 发消息 方法中的延迟差,决定了是先发消息还是先打电话 try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); },"B").start(); }}class Phone2{ //锁的对象是方法的调用者,谁先拿到谁先执行 public synchronized void info(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发消息"); } public synchronized void call(){ System.out.println("打电话"); } public void p1(){ System.out.println("普通方法"); }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test8{ public static void main(String[] args) { Phone3 phone = new Phone3(); Phone3 phone2 = new Phone3(); new Thread(()->{ phone.info(); },"A").start(); //4.由于是两个对象,但是锁的同一个类,此时,还是需要等待A执行完后才能执行B try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); },"B").start(); }}class Phone3{ //锁的对象是方法的调用者,谁先拿到谁先执行 //static 静态 //类一加载就有,锁的是class public static synchronized void info(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发消息"); } public static synchronized void call(){ System.out.println("打电话"); }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test9{ public static void main(String[] args) { Phone4 phone = new Phone4(); new Thread(()->{ phone.info(); },"A").start(); //4.由于一个是锁的类模板,一个是锁的对象,此时,执行结果跟延时差有关 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call(); },"B").start(); }}class Phone4{ //锁的对象是方法的调用者,谁先拿到谁先执行 //static 静态 //类一加载就有,锁的是class public static synchronized void info(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发消息"); } //锁的是 调用者 public synchronized void call(){ System.out.println("打电话"); }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test9{ public static void main(String[] args) { Phone4 phone = new Phone4(); Phone4 phone2 = new Phone4(); new Thread(()->{ phone.info(); },"A").start(); //4.对象不同,锁也不同,此时,执行结果跟延时差有关 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); },"B").start(); }}class Phone4{ //锁的对象是方法的调用者,谁先拿到谁先执行 //static 静态 //类一加载就有,锁的是class public static synchronized void info(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发消息"); } //锁的是 调用者 public synchronized void call(){ System.out.println("打电话"); }}
多线程下的List、Set、Map
CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentHashMap
package com.cx;import java.util.*;import java.util.concurrent.CopyOnWriteArrayList;public class Test10 { public static void main(String[] args) { Test10 test10 = new Test10(); //会出现并发修改异常 //test10.test(); //Vector是线程安全的,不会出错 //test10.test2(); //也不会出错 //test10.test3(); //底层是用的lock /** * 是写入时复制,是计算机程序涉及领域的一种优化策略 * 多个线程的时候,list读写的时候,国定写入,比Vector效果好,因为底层是lock */ test10.test4(); } public void test(){ //java.util.ConcurrentModificationException,会出现并发修改异常 ArrayList<Object> objects = new ArrayList<>(); for (int i = 0; i < 10; i++) { new Thread(()->{ objects.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(objects); }).start(); } } public void test2(){ //vector底层用的是sychronized List<Object> objects = new Vector<>(); for (int i = 0; i < 10; i++) { new Thread(()->{ objects.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(objects); }).start(); } } public void test3(){ List<Object> objects = Collections.synchronizedList(new ArrayList<>()); for (int i = 0; i < 10; i++) { new Thread(()->{ objects.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(objects); }).start(); } } public void test4(){ List<Object> objects = new CopyOnWriteArrayList<>(); for (int i = 0; i < 10; i++) { new Thread(()->{ objects.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(objects); }).start(); } }}
并发控制常用类
CountDownLatch
原理:每次有线程调用countDown(),数量就会减1,当计数器变为0后, countDownLatch.await()就会被唤醒,才会执行后面的操作
package com.cx;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class Test12 { public static void main(String[] args) throws InterruptedException { //CountDownLatch(数字), CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 7; i++) { new Thread(()->{ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"success"); countDownLatch.countDown();//数量减1 },String.valueOf(i)).start(); } countDownLatch.await();//等待计数器归零后,才会向下执行 System.out.println("end"); }}
CycliBarrier
package com.cx;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class Test13 { public static void main(String[] args) { //等待集齐第一批6个线程,就会输出一个success,等待再次集齐6个线程,就再会输出一个success //依次类推 CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{ System.out.println("success"); }); for (int i = 0; i <11; i++) { final int temp=i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+temp); try { cyclicBarrier.await();//等待集齐第一批6个线程 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } System.out.println("end");//只会执行一次,并且不会受cyclicBarrier集齐六个线程才会执行 }}
Semaphore
作用:多个共享资源互斥的使用!并发限流,控制最大线程数
package com.cx;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class Test14 { public static void main(String[] args) { //线程数量,Semaphore用来限流 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 6; i++) { new Thread(()->{ try { //acquire()将此车位锁定,如果该车位没有被释放,其他车不能进来 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"进入停车位"); //模拟等待 TimeUnit.SECONDS.sleep(2); //离开车位 System.out.println(Thread.currentThread().getName()+"离开停车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { //release()将此车位释放,其他车可以进来了 semaphore.release(); } },String.valueOf(i)).start(); } }}
ReadWriteLock
package com.cx;import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class Test15 { public static void main(String[] args) { MyCache myCache = new MyCache(); //写入 for (int i = 0; i < 5; i++) { final int temp=i; new Thread(()->{ myCache.put(temp+"",temp+""); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } //读取 for (int i = 0; i < 5; i++) { final int temp=i; new Thread(()->{ myCache.get(temp+"",temp+""); },String.valueOf(i)).start(); } }}class MyCache{ private volatile Map<String,Object> map=new HashMap<>(); //读写锁:更加细粒的控制 private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); //存,写 public void put(String key,Object value){ try { readWriteLock.writeLock().lock(); System.out.println(Thread.currentThread().getName()+"写入"+key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入ok"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } //取,读 public void get(String key,Object value){ try { readWriteLock.readLock().lock(); System.out.println(Thread.currentThread().getName()+"读取"+key); Object o = map.get(key); System.out.println(Thread.currentThread().getName()+"读取ok"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } }}
队列
阻塞队列BlockingQueue
BlockingQueue四组API
package com.cx;import java.sql.Time;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.TimeUnit;public class Test16 { public static void main(String[] args) throws InterruptedException { //会抛出异常 //test1(); //不会抛出异常 //test2(); //阻塞一直等待 //test3(); //超时等待 test4(); } public static void test1(){ //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 System.out.println(objects.add("c")); System.out.println(objects.add("x")); //java.lang.IllegalStateException: Queue full 抛出异常 //System.out.println(objects.add("w")); System.out.println(objects.element());//判断对首是谁 System.out.println("---------"); System.out.println(objects.remove()); System.out.println(objects.remove()); //java.util.NoSuchElementException 抛出异常 System.out.println(objects.remove()); } public static void test2(){ //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 System.out.println(objects.offer("c")); System.out.println(objects.offer("x")); //不会抛出异常,会输出false System.out.println(objects.offer("w")); System.out.println(objects.peek());//判断对首是谁 System.out.println("---------"); System.out.println(objects.poll()); System.out.println(objects.poll()); //不会抛出异常,会输出null System.out.println(objects.poll()); } public static void test3() throws InterruptedException { //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 objects.put("c"); objects.put("x"); //objects.put("w");//会一直阻塞 System.out.println(objects.take()); System.out.println(objects.take()); //System.out.println(objects.take());//会一直阻塞 } public static void test4() throws InterruptedException { //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 System.out.println(objects.offer("c")); System.out.println(objects.offer("x")); //不会抛出异常,会输出false //超过两秒就会退出 System.out.println(objects.offer("w",2,TimeUnit.SECONDS)); System.out.println(objects.peek());//判断对首是谁 System.out.println("---------"); System.out.println(objects.poll()); System.out.println(objects.poll()); //不会抛出异常,会输出null //超过两秒就会退出 System.out.println(objects.poll(2,TimeUnit.SECONDS)); }}
同步队列SynchronousQueue
进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
package com.cx;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;/** * 同步队列不储存元素,put了一个元素,必须从里面先take取一个,否则不能再向里面put */public class Test17 { public static void main(String[] args) { //同步队列 SynchronousQueue<Object> objects = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"放入put1"); objects.put("put1"); System.out.println(Thread.currentThread().getName()+"放入put2"); objects.put("put2"); System.out.println(Thread.currentThread().getName()+"放入put3"); objects.put("put3"); } catch (InterruptedException e) { e.printStackTrace(); } },"C").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"获取"+objects.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"获取"+objects.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"获取"+objects.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"X").start(); }}
池化技术及线程池使用
池化技术:事先准备好一些资源,需要即直接从里面拿来使用,使用过后就返还里面去
线程池好处:
- 降低资源消耗
- 提高响应速度
- 方便管理
即:线程复用,可以控制最大并发数,管理线程
线程池:三大方法、7大参数,4种拒绝策略
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源消耗尽的风险。
Executors返回的线程池对象弊端如下:
1).FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
2).CachedThreadPool和ScheduledThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
三大方法
本质:ThreadPoolExecutor()
package com.cx;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Executors里的三大方法 */public class Test18 { public static void main(String[] args) { //test(); //test2(); //test3(); } public static void test(){ //单个线程 ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 3; i++) { executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"ok"); }); } } public static void test2(){ //创建一个固定的线程池的大小 ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 3; i++) { executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"ok"); }); } } public static void test3(){ //可伸缩的,遇强则强,遇弱则弱 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"ok"); }); } }}
7大参数及自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2,//核心线程池大小 5,//最大线程池大小 3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放 TimeUnit.SECONDS,//超时的单位 new ArrayBlockingQueue<>(3),//阻塞队列的大小 Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变 new ThreadPoolExecutor.AbortPolicy()//拒绝策略,有四种 );
四种拒绝策略
- new ThreadPoolExecutor.AbortPolicy()
会抛出异常,并且不会执行超出线程池最大承载的任务
public class Test19 { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2,//核心线程池大小 5,//最大线程池大小 3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放 TimeUnit.SECONDS,//超时的单位 new ArrayBlockingQueue<>(3),//阻塞队列的大小 Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变 new ThreadPoolExecutor.AbortPolicy()//拒绝策略,有四种 );//最大承载:线程池最大大小+阻塞队列大小 for (int i = 0; i < 10; i++) { threadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+"ok"); }); } }}
- new ThreadPoolExecutor.CallerRunsPolicy()
不会抛出异常,超出最大承载的任务,都会进入主线程main中执行
package com.cx;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test19 { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2,//核心线程池大小 5,//最大线程池大小 3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放 TimeUnit.SECONDS,//超时的单位 new ArrayBlockingQueue<>(3),//阻塞队列的大小 Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变 new ThreadPoolExecutor.CallerRunsPolicy()//拒绝策略,有四种 ); //最大承载:线程池最大大小+阻塞队列大小 for (int i = 1; i <= 10; i++) { threadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+"ok"); }); } }}
- new ThreadPoolExecutor.DiscardPolicy()
不会抛出异常,会直接丢掉超出最大承载的任务
package com.cx;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test19 { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2,//核心线程池大小 5,//最大线程池大小 3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放 TimeUnit.SECONDS,//超时的单位 new ArrayBlockingQueue<>(3),//阻塞队列的大小 Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变 new ThreadPoolExecutor.DiscardPolicy()//拒绝策略,有四种 ); //最大承载:线程池最大大小+阻塞队列大小 for (int i = 1; i <= 10; i++) { threadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+"ok"); }); } }}
- new ThreadPoolExecutor.DiscardOldestPolicy()
最大承载满了之后,后续进来的线程会和第一个进来的线程竞争,如果竞争过了就会执行此线程,将第一个线程抛弃,否则,则此线程被抛弃
package com.cx;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test19 { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2,//核心线程池大小 5,//最大线程池大小 3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放 TimeUnit.SECONDS,//超时的单位 new ArrayBlockingQueue<>(3),//阻塞队列的大小 Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变 new ThreadPoolExecutor.DiscardOldestPolicy()//拒绝策略,有四种 ); //最大承载:线程池最大大小+阻塞队列大小 for (int i = 1; i <= 10; i++) { threadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+"ok"); }); } }}
CPU密集型和IO密集型
Runtime.getRuntime().availableProcessors()
获取CPU核数
- CPU密集型:计算一般是几核,一般最大线程数就设置为几
- IO密集型:程序中消耗IO很大的线程有几个,一般最大线程数设置为2倍
四大函数式接口
只有一个方法的接口
函数型接口Function
Function 函数型接口,有一个参数,一个输出
public interface Function {
R apply(T t);
}
例1:
package com.cx;import java.util.function.Function;public class Test20 { public static void main(String[] args) { Function function=new Function<String,String>() { @Override public String apply(String o) { return o; } }; System.out.println(function.apply("cx")); Function function1=(str)->{return str;}; System.out.println(function1.apply(2)); }}
断定型接口Predicate
有一个输入参数,返回值只能是布尔值
public interface Predicate {
boolean test(T t);
}
package com.cx;import java.util.function.Predicate;public class Test21 { public static void main(String[] args) { Predicate<String > objectPredicate = new Predicate<String>() { @Override public boolean test(String s) { return s.isEmpty(); } }; System.out.println(objectPredicate.test("")); Predicate<String > objectPredicate1=(str)->{ return str.isEmpty(); }; System.out.println(objectPredicate1.test("cx")); }}
消费型接口Consumer
有一个输入参数,没有返回值
public interface Consumer {
void accept(T t);
}
package com.cx;import java.util.function.Consumer;public class Test22 { public static void main(String[] args) { Consumer<String> objectConsumer = new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }; objectConsumer.accept("get"); Consumer<String> objectConsumer2=(str)->{ System.out.println(str); }; objectConsumer2.accept("get2"); }}
供给型接口Supplier
没有输入参数,有一个返回值
public interface Supplier {
T get();
}
package com.cx;import java.util.function.Supplier;public class Test23 { public static void main(String[] args) { Supplier<String> objectSupplier = new Supplier<String>() { @Override public String get() { return "put"; } }; System.out.println(objectSupplier.get()); Supplier<String> objectSupplier2=()->{ return "put2"; }; System.out.println(objectSupplier2.get()); }}
Stream流式计算
package com.cx;import java.util.ArrayList;import java.util.Locale;import java.util.Random;/** * ID必须是偶数 * 年龄必须大于4 *用户名大写 * 用户名字母倒序 * 输出一个 */public class Test24 { public static void main(String[] args) { ArrayList<User> users = new ArrayList<>(); Random random = new Random(); for (int i = 0; i < 5; i++) { int i1 = random.nextInt(10); User user = new User(i, String.valueOf((char)(i+i1+97)),i+i1); users.add(user); } for (User user : users) { System.out.println(user); } System.out.println("-----------"); users.stream() .filter(u->{return u.getId()%2==0;}) .filter(u->{return u.getAge()>4;}) .map(u->{return u.getName().toUpperCase();}) .sorted((u1,u2)->{return ((String) u2).compareTo((String)u1);}) .limit(1) .forEach(System.out::println); }}
ForkJoin详解
特点 :工作窃取,里面维护的是双端队列,将大任务化为小任务来执行
package com.cx;import java.util.concurrent.RecursiveTask;/** * 求从任意连个值之间的累计和 */public class ForkJoinTest extends RecursiveTask<Long> { private Long start; private Long end; //临界值 private Long temp; //返回值 private Long sum=0L; public ForkJoinTest(Long start, Long end,Long temp) { this.start = start; this.end = end; this.temp=temp; } //计算方法 @Override protected Long compute() { if ((end-start)<temp){ for (Long i = start; i < end; i++) { sum+=i; } return sum; }else { long middle=(start+end)/2;//取中间值 ForkJoinTest task1 = new ForkJoinTest(start, middle,temp); task1.fork();//拆分任务,把任务压入线程队列 ForkJoinTest task2 = new ForkJoinTest(middle, end,temp); task2.fork();//拆分任务,把任务压入线程队列 return task1.join()+task2.join();//返回两个任务的结果和 } }}
package com.cx;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.stream.LongStream;public class Test25 { public static void main(String[] args) throws ExecutionException, InterruptedException { long start=0L; long end=10_0000_0000L; long temp=5_00_0000L; //test1(start,end);//3107s //test2(start,end,temp);//5429 test3(start,end);//125 } public static void test1(long start,long end){ Long sum=0L; long l = System.currentTimeMillis(); for (long i = start; i < end; i++) { sum+=i; } long l2 = System.currentTimeMillis(); System.out.println("sum="+sum+"所有时间是:"+(l2-l)); } public static void test2(long start,long end,long temp) throws ExecutionException, InterruptedException { Long sum=0L; long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinTest(start,end,temp); ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务 sum=submit.get();//获取最终结果 long l2 = System.currentTimeMillis(); System.out.println("sum="+sum+"所有时间是:"+(l2-l)); } //并行流 public static void test3(long start,long end) throws ExecutionException, InterruptedException { long l = System.currentTimeMillis(); //range是(),rangeClosed是(] long sum = LongStream.range(start, end).parallel().reduce(0, Long::sum); long l2 = System.currentTimeMillis(); System.out.println("sum="+sum+"所有时间是:"+(l2-l)); }}
异步回调
- Future设计初衷:对将来的某个事件的结果进行建模
package com.cx;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;public class Test26 { public static void main(String[] args) { //test1(); test2(); } /** * 没有返回值的异步回调 */ public static void test1(){ CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(()->{ try { TimeUnit.SECONDS.sleep(4); System.out.println(Thread.currentThread().getName()+":runAsync void"); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("正常执行");//不需要等待上面的程序执行结果 try { Object o=voidCompletableFuture.get();//获取阻塞结果 System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("正常执行2");//需要等待异步执行结果的获取 } /** * 有返回值的异步回调 */ public static void test2(){ try { CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ int i=10/0; //此时的返回值,就是whenComplete,里面的SUCCESS参数 return Thread.currentThread().getName()+":SUCCESS"; }); String s = objectCompletableFuture.whenComplete((SUCCESS, ERROR) -> { }).exceptionally((e) -> { //此时的返回值,就是whenComplete,里面的ERROR参数 return "exception:" + e.getMessage();//返回异常信息 }).get(); System.out.println(s); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }}
理解JMM
JMM:java内存模型,是一种概念,或者约定。
JMM即为JAVA 内存模型(java memory model)。因为在不同的硬件⽣产商和不同的操作系统下,内存的访问逻辑有⼀定的差异,结果就是当你的代码在某个系统环境下运⾏良好,并且线程安全,但是换了个系统就出现各种问题。Java内存模型,就是为了屏蔽系统和硬件的差异,让⼀套代码在不同平台下能到达相同的访问结果。
JMM规定了内存主要划分为主内存和⼯作内存两种。此处的主内存和⼯作内存跟JVM内存划分(堆、栈、⽅法区)是在不同的层次上进⾏的,如果⾮要对应起来,主内存对应的是Java堆中的对象实例部分,⼯作内存对应的是栈中的部分区域,从更底层的来说,主内存对应的是硬件的物理内存,⼯作内存对应的是寄存器和⾼速缓存。
JVM在设计时候考虑到,如果JAVA线程每次读取和写⼊变量都直接操作主内存,对性能影响⽐较⼤,所以每条线程拥有各⾃的⼯作内
存,⼯作内存中的变量是主内存中的⼀份拷贝,线程对变量的读取和写⼊,直接在⼯作内存中操作,⽽不能直接去操作主内存中的变量。但
是这样就会出现⼀个问题,当⼀个线程修改了⾃⼰⼯作内存中变量,对其他线程是不可见的,会导致线程不安全的问题。因为JMM制定了⼀
套标准来保证开发者在编写多线程程序的时候,能够控制什么时候内存会被同步给其他线程
内存交互操作
内存交互操作有8种,虚拟机实现必须保证每⼀个操作都是原⼦的,不可在分的
- lock (锁定):作⽤于主内存的变量,把⼀个变量标识为线程独占状态
- unlock (解锁):作⽤于主内存的变量,它把⼀个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作⽤于主内存变量,它把⼀个变量的值从主内存传输到线程的⼯作内存中,以便随后的load动作使⽤
- load (载⼊):作⽤于⼯作内存的变量,它把read操作从主存中变量放⼊⼯作内存中
- use (使⽤):作⽤于⼯作内存中的变量,它把⼯作内存中的变量传输给执⾏引擎,每当虚拟机遇到⼀个需要使⽤到变量的值,就会使⽤到这个指令
- assign (赋值):作⽤于⼯作内存中的变量,它把⼀个从执⾏引擎中接受到的值放⼊⼯作内存的变量副本中
- store (存储):作⽤于主内存中的变量,它把⼀个从⼯作内存中⼀个变量的值传送到主内存中,以便后续的write使⽤
- write (写⼊):作⽤于主内存中的变量,它把store操作从⼯作内存中得到的变量的值放⼊主内存的变量中
JMM对这⼋种指令的使⽤,制定了如下规则:
- 不允许read和load、store和write操作之⼀单独出现。即使⽤了read必须load,使⽤了store必须write
- 不允许线程丢弃他最近的assign操作,即⼯作变量的数据改变了之后,必须告知主存
- 不允许⼀个线程将没有assign的数据从⼯作内存同步回主内存
- ⼀个新的变量必须在主内存中诞⽣,不允许⼯作内存直接使⽤⼀个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
- ⼀个变量同⼀时间只有⼀个线程能对其进⾏lock。多次lock后,必须执⾏相同次数的unlock才能解锁
- 如果对⼀个变量进⾏lock操作,会清空所有⼯作内存中此变量的值,在执⾏引擎使⽤这个变量前,必须重新load或assign操作初始化变量的值
- 如果⼀个变量没有被lock,就不能对其进⾏unlock操作。也不能unlock⼀个被其他线程锁住的变量
- 对⼀个变量进⾏unlock操作之前,必须把此变量同步回主内存
JMM对这⼋种操作规则和对就能确定哪⾥操作是线程安全,哪些操作是线程不安全的了。但是这些规则实在复杂,很难在实践中直接分析。所以⼀般我们也不会通过上述规则进⾏分析。更多的时候,使⽤java的happen-before规则来进⾏分析。
例如下面代码,循环不会停止:
package com.cx;import java.util.concurrent.TimeUnit;public class Test27 { private static int num=0; public static void main(String[] args) { new Thread(()->{//线程对主线程内存的变化不可见,即不知道其已经发生了变化 while (num==0){ //System.out.println("start"); } }).start(); try { TimeUnit.SECONDS.sleep(1); num=1;//main线程执行 } catch (InterruptedException e) { e.printStackTrace(); } }}
模型特征
Volatile
- volatile:是java虚拟机提供轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
保证可见性
package com.cx;import java.util.concurrent.TimeUnit;public class Test27 {//volatile 保证了可见性 private volatile static int num=0; public static void main(String[] args) { new Thread(()->{ while (num==0){ //System.out.println("start"); } }).start(); try { TimeUnit.SECONDS.sleep(1); num=1;//main线程执行 System.out.println(num); } catch (InterruptedException e) { e.printStackTrace(); } }}
不保证原子性
package com.cx;import java.util.concurrent.TimeUnit;public class Test28 { /** * 加了volatile,不能保证原子性 *就比如A线程将NUM变为100后,从工作内存中放入主内存中,此时NUM=100 *但是此时线程B将NUM变为20后,从工作内存中放入主内存中,此时NUM=20 * 这样连个线程执行完后,理论上是加到2000000,实际达不到 * */ private volatile static int num=0; public static void main(String[] args) { new Thread(()->{ for (int i=0;i<1000000;i++){ add(); } },"A").start(); new Thread(()->{ for (int i=0;i<1000000;i++){ add(); } },"B").start(); while (Thread.activeCount()>2){ Thread.yield(); } System.out.println("num:"+num); } public static void add(){ num++; }}
通过原子类保证原子性
- java.util.concurrent.atomic
这些类的底层都直接喝操作系统挂钩!在内存中修改值
package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class Test28 { /** * 加了volatile,不能保证原子性 *就比如A线程将NUM变为100后,从工作内存中放入主内存中,此时NUM=100 *但是此时线程B将NUM变为20后,从工作内存中放入主内存中,此时NUM=20 * 这样连个线程执行完后,理论上是加到2000000,实际达不到, * 想要达到保证原子性,不用lock和synchronized, * 那么可以使用 todo 原子类 * */ private volatile static AtomicInteger num=new AtomicInteger(); public static void main(String[] args) { new Thread(()->{ for (int i=0;i<1000000;i++){ add(); } },"A").start(); new Thread(()->{ for (int i=0;i<1000000;i++){ add(); } },"B").start(); while (Thread.activeCount()>2){//main gc基本线程 Thread.yield(); } System.out.println("num:"+num); } public static void add(){ num.getAndIncrement();//相当于 num+1,底层使用的是CAS }}
禁止指令重排
指令重排:程序的执行顺序,可以被改变
** 内存屏障**作用:
- 保证特定的操作的执行顺序!
- 可以保证某些变量的内存可见
深入理解CAS
CAS即compare and swap的缩写,比较并且交换,CAS是cpu的并发原语
- 处理器(包括 Intel 和 Sparc 处理器)使用的最通用的方法是实现名为 比较并转换或 CAS 的原语。(在 Intel 处理器中,比较并交换通过指令的 cmpxchg 系列实现。PowerPC 处理器有一对名为“加载并保留”和“条件存储”的指令,它们实现相同的目地;MIPS 与 PowerPC 处理器相似,除了第一个指令称为“加载链接”。)
- CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置的值即可。”
- 通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。
- 类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法可以对该操作重新计算。清单 3 说明了 CAS 操作的行为(而不是性能特征),但是 CAS 的价值是它可以在硬件中实现,并且是极轻量级的(在大多数处理器中)。
package com.cx;import java.util.concurrent.atomic.AtomicInteger;public class Test30 { public static void main(String[] args) { //给一个初始值0 AtomicInteger atomicInteger = new AtomicInteger(0); //todo 有两个参数, (int expect,int update) //可以理解为,如果值为我们期望的值,那么就更新为更新的值 //compareAndSet等同于CAS boolean b = atomicInteger.compareAndSet(0, 1); //打印为true:1 System.out.println(b+":"+atomicInteger.get()); //此时再次使用同样的方法发现 boolean c = atomicInteger.compareAndSet(0, 1); //打印为false:1 System.out.println(c+":"+atomicInteger.get()); }}
可以看到我们使用使用的原子类AtomicInteger 有类似CAS的方法,进一步进入AtomicInteger类:
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; /** * Creates a new AtomicInteger with the given initial value. * * @param initialValue the initial value */ public AtomicInteger(int initialValue) { value = initialValue; } /** * Creates a new AtomicInteger with initial value {@code 0}. */ public AtomicInteger() { } ......
此时出现了Unsafe这个类,我们可以通过百度:
Unsafe
- 在JDK1.5之后,Java程序中才可以使用CAS操作.
该操作由sun.misc.Unsafe类里面的compareAndSwapInt()和compareAndSwapLong()等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令,没有方法调用的过程,或者可以认为是无条件内联进去了。- 由于Unsafe类不是提供给用户提供调用的类(Unsafe.getUnsafe()的代码中限制了只有启动类加载器(Bootstrap ClassLoader)加载的Class才能访问它),因此,如果不采用反射手段,我们只能通过其他的Java API来间接使用它,如J.U.C包里面的整数原子类,其中的compareAndSet()和 getAndIncrement()等方法都使用了 Unsafe 类的 CAS 操作。
回到我们之前的问题,通过原子类的atomicInteger.getAndIncrement()类似于num++,那么底层是:
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
unsafe.getAndAddInt:
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
getIntVolatile获取当前对象var1地址中值给到var5,compareAndSwapInt判断var1里的值var2是否与var5相等,如果相等,那么就var5+1,此时修改成功,那么this.compareAndSwapInt(var1, var2, var5, var5 + var4))就返回true,会天哦出循环,否则就一直循环(自旋锁)
CAS
比较当前工作内存中的值,如果这个值是期望的,那么则执执行操作!如果不是就一直循环!
- 缺点:
1.循环会耗时
2.一次性只能保证一个共享变量的原子性
3.ABA问题
ABA问题
我们期望的值被人动过了,但是动过了之后,又复原了!
package com.cx;import java.util.concurrent.atomic.AtomicInteger;public class Test30 { public static void main(String[] args) { //========================捣乱的线程======================== AtomicInteger atomicInteger = new AtomicInteger(0); boolean b = atomicInteger.compareAndSet(0, 1); System.out.println(b+":"+atomicInteger.get()); boolean c = atomicInteger.compareAndSet(1, 0); System.out.println(c+":"+atomicInteger.get()); //========================正常的线程======================== boolean d = atomicInteger.compareAndSet(0, 1); System.out.println(d+":"+atomicInteger.get()); }}
原子引用解决ABA问题
原子引用:带版本号的原子操作
package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicStampedReference;public class Test31 { //注意,如果泛型是一个包装类,注意对象的引用 //AtomicStampedReference(1,2) 初始值是1,版本号是2 static AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(1,2); public static void main(String[] args) { int stamp1 = atomicStampedReference.getStamp();//获取初始版本号 Integer reference2 = atomicStampedReference.getReference();//获取初始期望值 System.out.println(Thread.currentThread().getName()+",初始期望值是:"+atomicStampedReference.getReference()+",初始版本号是"+stamp1); //我是捣乱的线程,我现在不知道期望值和版本号是多少 new Thread(()->{ //捣乱的线程不知道期望值是多少,版本号是多少,所以 int stamp = atomicStampedReference.getStamp();//获取版本号 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } boolean b1 = atomicStampedReference.compareAndSet(1, 3, atomicStampedReference.getStamp(), atomicStampedReference.getStamp()+1); //查看是否更新成功,新值是多少 System.out.println(Thread.currentThread().getName()+":修改是否成功:"+b1+",现在期望值是:"+atomicStampedReference.getReference()+",现在版本号是:"+atomicStampedReference.getStamp()); boolean b2 = atomicStampedReference.compareAndSet(atomicStampedReference.getReference(), 1, atomicStampedReference.getStamp(), stamp); System.out.println("我又改回去啦!"+b2); },"A").start(); //我是正常的线程,正常的代码要写成死的 new Thread(()->{ int stamp = atomicStampedReference.getStamp();//获取版本号 Integer reference = atomicStampedReference.getReference();//获取期望值 //期望是1,改为2,并且将版本号+1 boolean b1 = atomicStampedReference.compareAndSet(1, 2, stamp, stamp + 1); //查看是否更新成功,新值是多少 System.out.println(Thread.currentThread().getName()+":修改是否成功:"+b1+",现在期望值是:"+atomicStampedReference.getReference()+",现在版本号是:"+atomicStampedReference.getStamp()); },"B").start(); }}
注意:所有的相同类型的包装类对象之间值的比较,全部使用equals方法比较
对于Integer var=? 在-128~127之间的赋值,Integer对象是在缓存产生,会复用已有对象,这个区间内的Integer值可以直接使用==进行判断,但是这个区间之外的所有数据,都会在堆上产生,并不会复用已有对象,这是一个大坑,推荐使用equals方法进行判断。
可重入锁
递归锁,;类似与房间里的房门,想进房门,就必须先进大门
synchronized 实现
package com.cx;import java.util.concurrent.TimeUnit;//synchronized 实现public class Test32 { public static void main(String[] args) { Phones phone = new Phones(); new Thread(()->{ phone.emill(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } },"A").start(); new Thread(()->{ phone.emill(); },"B").start(); }}class Phones{ public synchronized void emill(){ System.out.println(Thread.currentThread().getName()+"sms"); call(); } public synchronized void call(){ System.out.println(Thread.currentThread().getName()+"call"); }}
lock实现
package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Test32 { public static void main(String[] args) { Phones phone = new Phones(); new Thread(()->{ phone.emill(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } },"A").start(); new Thread(()->{ phone.call(); },"B").start(); }}class Phones{ Lock lock=new ReentrantLock(); public void emill(){ try { lock.lock();//锁必须配对,否则就会死在里面 lock.lock(); System.out.println(Thread.currentThread().getName()+"sms"); call(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); lock.unlock(); } } public void call(){ try { lock.lock(); System.out.println(Thread.currentThread().getName()+"emill"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }}
自旋锁
package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicReference;public class Singlock { public static void main(String[] args) { SpiglockT lock = new SpiglockT(); new Thread(()->{ lock.myLock(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.myUnLock(); } },"A").start(); new Thread(()->{ lock.myLock(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.myUnLock(); } },"B").start(); }}//底层使用CASclass SpiglockT{ //默认是个空线程 atomicReference=null AtomicReference<Thread> atomicReference=new AtomicReference<>(); //加锁 public void myLock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"--> myLock"); //自旋锁 //如果线程是null,就把atomicReference更新为thread 那么就退出循环 while (!atomicReference.compareAndSet(null,thread)){ } } //解锁 public void myUnLock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"--> myUnLock"); //如果线程是thread,就把atomicReference更新为null atomicReference.compareAndSet(thread,null); }}
死锁排查
package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class DiedLock { public static void main(String[] args) { TestT testT = new TestT("A","B"); TestT testT2 = new TestT("B","A"); new Thread(()->{ testT.run1(); },"A").start(); new Thread(()->{ testT2.run1(); },"B").start(); }}class TestT{ private String t1; private String t2; public TestT(String t1,String t2) { this.t1=t1; this.t2=t2; } Lock lock=new ReentrantLock(); public void run1(){ //synchronized (this) 锁住的是对象 synchronized (t1){ System.out.println(Thread.currentThread().getName()+"我拿到了"+t1+"我再去拿"+t2); synchronized (t2){ System.out.println(Thread.currentThread().getName()+"我拿到了"+t2+"我再去拿"+t1); } } }}
排查死锁
- 定位进程号:
jps -l
(小写的L)- 查看死锁问题:
jstack 进程号
=========================================================
java-JUC并发编程 推荐狂神说
端木振平资料集合分享