> 文档中心 > JAVA-JUC并发编程

JAVA-JUC并发编程


JUC:java.util.concurrent

环境准备

1.maven项目
2.准备一个lombok
org.projectlombok lombok 1.18.22
3.项目的Project StructureProjectSDK改为1.8Language level改为8ModulesSourcesLanguage level改为8
4.Settings->Java CompilerTarget bytecode version改为8

并发与并行

  • 并发编程的本质:充分使用CPU资源
  • 并发:多个线程操作同一个资源,针对CPU单核的,通过快速交替,来模拟多条线程
  • 并行:CPU多核,多个线程可以同时执行,线程池
package com.cx;public class Test1 {    public static void main(String[] args) { //获取cpu核数 //CPU密集型,IO密集型 System.out.println(Runtime.getRuntime().availableProcessors());    }}

synchronized

package com.cx;/** * 线程应该是一个单独的资源类,没有其他附属操作 * 1.属性,方法 */public class Test2 {    public static void main(String[] args){ Ticket ticket = new Ticket(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      ticket.sell();      Thread.sleep(10);  } catch (InterruptedException e) {      e.printStackTrace();  }     }     },"A").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  ticket.sell();     }     },"B").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  ticket.sell();     }     },"C").start();    }}//资源类class Ticket{    //属性    private int num=29;    //方法    //synchronized相当于队列    public synchronized void sell(){ if (num>0){     System.out.println(Thread.currentThread().getName()+"买了第"+num--+"张票,还剩下"+num+"张票."); }    }}

生产者-消费者(synchronized实现)

package com.cx;public class Test4 {    public static void main(String[] args) { Data data = new Data(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.increment();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"A").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.decrement();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"B").start();    }}//等待-业务-通知class Data{    private int num=0;    public synchronized void increment() throws InterruptedException { if (num!=0){     //等待     this.wait(); } num++; System.out.println(Thread.currentThread().getName()+"+"+num); //通知其他线程,此线程+1完成 this.notifyAll();    }    public synchronized void decrement() throws InterruptedException { if (num==0){     //等待     this.wait(); } num--; System.out.println(Thread.currentThread().getName()+"-"+num); //通知其他线程,此线程-1完成 this.notifyAll();    }}

当new两个加,两个减的线程就会出问题,这是因为if导致的,这种情况叫做虚假唤醒,从官方文档中可以看出,等待应该是发生在循环中的,所以我们将if换成while就不会出错了

package com.cx;public class Test4 {    public static void main(String[] args) { Data data = new Data(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.increment();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"A").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.decrement();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"B").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.decrement();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"C").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.decrement();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"D").start();    }}//等待-业务-通知class Data{    private int num=0;    public synchronized void increment() throws InterruptedException { while (num!=0){     //等待     this.wait(); } num++; System.out.println(Thread.currentThread().getName()+"+"+num); //通知其他线程,此线程+1完成 this.notifyAll();    }    public synchronized void decrement() throws InterruptedException { while (num==0){     //等待     this.wait(); } num--; System.out.println(Thread.currentThread().getName()+"-"+num); //通知其他线程,此线程-1完成 this.notifyAll();    }}

Lock

  • 公平锁:先来后到,例如3h,3m,这样会等待3h先执行完后,才会执行3m的
  • 非公平锁(默认):不遵循先来后到
package com.cx;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 线程应该是一个单独的资源类,没有其他附属操作 * 1.属性,方法 */public class Test3 {    public static void main(String[] args){ Ticket2 ticket = new Ticket2(); new Thread(()->{for (int i = 0; i < 10; i++) ticket.sell();},"A").start(); new Thread(()->{for (int i = 0; i < 10; i++) ticket.sell();},"B").start(); new Thread(()->{for (int i = 0; i < 10; i++) ticket.sell();},"C").start();    }}//资源类class Ticket2{    //属性    private int num=29;    Lock lock=new ReentrantLock();    //方法    //synchronized相当于队列    public void sell(){ //加锁 lock.lock(); try {     //业务代码     if (num>0){  System.out.println(Thread.currentThread().getName()+"买了第"+num--+"张票,还剩下"+num+"张票.");     } } catch (Exception e) {     e.printStackTrace(); } finally {     //解锁     lock.unlock(); }    }}

生产者-消费者(Lock实现)

Condition:精准的通知和唤醒线程

package com.cx;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Test5 {    public static void main(String[] args) { Data2 data = new Data2(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.p1();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"p1").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.p2();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"p2").start(); new Thread(()->{     for (int i = 0; i < 10; i++) {  try {      data.p3();  } catch (InterruptedException e) {      e.printStackTrace();  }     } },"p3").start();    }}//等待-业务-通知class Data2{    private int num=1;    Lock lock=new ReentrantLock();    Condition condition1=lock.newCondition();    Condition condition2=lock.newCondition();    Condition condition3=lock.newCondition();    public void p1() throws InterruptedException { lock.lock(); try {     while (num!=1){  //等待  condition1.await();     }     num=2;     System.out.println(Thread.currentThread().getName()+"+"+num);     //通知2线程,此线程完成     condition2.signal(); } catch (InterruptedException e) {     e.printStackTrace(); } finally {     lock.unlock(); }    }    public void p2() throws InterruptedException { lock.lock(); try {     while (num!=2){  //等待  condition2.await();     }     num=3;     System.out.println(Thread.currentThread().getName()+"-"+num);     //通知3线程,此线程完成     condition3.signal(); } catch (InterruptedException e) {     e.printStackTrace(); } finally {     lock.unlock(); }    }    public void p3() throws InterruptedException { lock.lock(); try {     while (num!=3){  //等待  condition3.await();     }     num=1;     System.out.println(Thread.currentThread().getName()+"-"+num);     //通知1线程     condition1.signal(); } catch (InterruptedException e) {     e.printStackTrace(); } finally {     lock.unlock(); }    }}

八锁问题

package com.cx;import java.util.concurrent.TimeUnit;public class Test6 {    public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{     /**      * 1.此时由于还没有进入当方法中,也就是还没锁住对象,所以此时先执行B,在执行A     try {  TimeUnit.SECONDS.sleep(1);     } catch (InterruptedException e) {  e.printStackTrace();     }      */     phone.info(); },"A").start(); new Thread(()->{     phone.call(); },"B").start(); new Thread(()->{     phone.p1(); },"C").start();    }}class  Phone{    //锁的对象是方法的调用者,谁先拿到谁先执行    public synchronized void info(){ System.out.println("发消息之前"); /** 2.此时由于已经进入方法中,锁住对象,所以在执行A的过程中,无论A占用多长时间,B也要等下去  try {  TimeUnit.SECONDS.sleep(1);  } catch (InterruptedException e) {  e.printStackTrace();  }  */ /**  * 3.此时先执行输出”发消息之前“,然后此处需要等待,但是普通方法不会等待  * 所以会输出”普通方法“,最后会执行之后的过程 try {     TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {     e.printStackTrace(); }  */ System.out.println("发消息");    }    public synchronized void call(){ System.out.println("打电话");    }    /**     * 3.普通方法不受锁的影响:当被锁住的方法执行过程中遇到等待,此时普通     * 方法不必等待被锁住的方法完全执行完才会执行     */    public void p1(){ System.out.println("普通方法");    }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test7 {    public static void main(String[] args) { Phone2 phone = new Phone2(); Phone2 phone2 = new Phone2(); new Thread(()->{     phone.info(); },"A").start(); //4.由于是两个对象,此时跟两把锁互补感染,此时下面延迟的时间 //跟 发消息 方法中的延迟差,决定了是先发消息还是先打电话 try {     TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) {     e.printStackTrace(); } new Thread(()->{     phone2.call(); },"B").start();    }}class  Phone2{    //锁的对象是方法的调用者,谁先拿到谁先执行    public synchronized void info(){ try {  TimeUnit.SECONDS.sleep(4);  } catch (InterruptedException e) {  e.printStackTrace();  } System.out.println("发消息");    }    public synchronized void call(){ System.out.println("打电话");    }    public void p1(){ System.out.println("普通方法");    }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test8{    public static void main(String[] args) { Phone3 phone = new Phone3(); Phone3 phone2 = new Phone3(); new Thread(()->{     phone.info(); },"A").start(); //4.由于是两个对象,但是锁的同一个类,此时,还是需要等待A执行完后才能执行B try {     TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {     e.printStackTrace(); } new Thread(()->{     phone2.call(); },"B").start();    }}class  Phone3{    //锁的对象是方法的调用者,谁先拿到谁先执行    //static 静态    //类一加载就有,锁的是class    public static synchronized void info(){ try {     TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) {     e.printStackTrace(); } System.out.println("发消息");    }    public static synchronized void call(){ System.out.println("打电话");    }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test9{    public static void main(String[] args) { Phone4 phone = new Phone4(); new Thread(()->{     phone.info(); },"A").start(); //4.由于一个是锁的类模板,一个是锁的对象,此时,执行结果跟延时差有关 try {     TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {     e.printStackTrace(); } new Thread(()->{     phone.call(); },"B").start();    }}class  Phone4{    //锁的对象是方法的调用者,谁先拿到谁先执行    //static 静态    //类一加载就有,锁的是class    public static synchronized void info(){ try {     TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) {     e.printStackTrace(); } System.out.println("发消息");    }    //锁的是 调用者    public synchronized void call(){ System.out.println("打电话");    }}
package com.cx;import java.util.concurrent.TimeUnit;public class Test9{    public static void main(String[] args) { Phone4 phone = new Phone4(); Phone4 phone2 = new Phone4(); new Thread(()->{     phone.info(); },"A").start(); //4.对象不同,锁也不同,此时,执行结果跟延时差有关 try {     TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {     e.printStackTrace(); } new Thread(()->{     phone2.call(); },"B").start();    }}class  Phone4{    //锁的对象是方法的调用者,谁先拿到谁先执行    //static 静态    //类一加载就有,锁的是class    public static synchronized void info(){ try {     TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) {     e.printStackTrace(); } System.out.println("发消息");    }    //锁的是 调用者    public synchronized void call(){ System.out.println("打电话");    }}

多线程下的List、Set、Map

CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentHashMap

package com.cx;import java.util.*;import java.util.concurrent.CopyOnWriteArrayList;public class Test10 {    public static void main(String[] args) { Test10 test10 = new Test10(); //会出现并发修改异常 //test10.test(); //Vector是线程安全的,不会出错 //test10.test2(); //也不会出错 //test10.test3(); //底层是用的lock /**  * 是写入时复制,是计算机程序涉及领域的一种优化策略  * 多个线程的时候,list读写的时候,国定写入,比Vector效果好,因为底层是lock  */ test10.test4();    }    public void test(){ //java.util.ConcurrentModificationException,会出现并发修改异常 ArrayList<Object> objects = new ArrayList<>(); for (int i = 0; i < 10; i++) {     new Thread(()->{  objects.add(UUID.randomUUID().toString().substring(0,5));  System.out.println(objects);     }).start(); }    }    public void test2(){ //vector底层用的是sychronized List<Object> objects = new Vector<>(); for (int i = 0; i < 10; i++) {     new Thread(()->{  objects.add(UUID.randomUUID().toString().substring(0,5));  System.out.println(objects);     }).start(); }    }    public void test3(){ List<Object> objects = Collections.synchronizedList(new ArrayList<>()); for (int i = 0; i < 10; i++) {     new Thread(()->{  objects.add(UUID.randomUUID().toString().substring(0,5));  System.out.println(objects);     }).start(); }    }    public void test4(){ List<Object> objects = new CopyOnWriteArrayList<>(); for (int i = 0; i < 10; i++) {     new Thread(()->{  objects.add(UUID.randomUUID().toString().substring(0,5));  System.out.println(objects);     }).start(); }    }}

并发控制常用类

CountDownLatch

原理:每次有线程调用countDown(),数量就会减1,当计数器变为0后, countDownLatch.await()就会被唤醒,才会执行后面的操作

package com.cx;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class Test12 {    public static void main(String[] args) throws InterruptedException {     //CountDownLatch(数字), CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 7; i++) {     new Thread(()->{  try {      TimeUnit.SECONDS.sleep(4);  } catch (InterruptedException e) {      e.printStackTrace();  }  System.out.println(Thread.currentThread().getName()+"success");  countDownLatch.countDown();//数量减1     },String.valueOf(i)).start(); } countDownLatch.await();//等待计数器归零后,才会向下执行 System.out.println("end");    }}

CycliBarrier

package com.cx;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class Test13 {    public static void main(String[] args) { //等待集齐第一批6个线程,就会输出一个success,等待再次集齐6个线程,就再会输出一个success //依次类推 CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{     System.out.println("success"); }); for (int i = 0; i <11; i++) {     final int temp=i;     new Thread(()->{  System.out.println(Thread.currentThread().getName()+temp);  try {      cyclicBarrier.await();//等待集齐第一批6个线程  } catch (InterruptedException e) {      e.printStackTrace();  } catch (BrokenBarrierException e) {      e.printStackTrace();  }     }).start(); } System.out.println("end");//只会执行一次,并且不会受cyclicBarrier集齐六个线程才会执行    }}

Semaphore

作用:多个共享资源互斥的使用!并发限流,控制最大线程数

package com.cx;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class Test14 {    public static void main(String[] args) { //线程数量,Semaphore用来限流 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 6; i++) {     new Thread(()->{  try {      //acquire()将此车位锁定,如果该车位没有被释放,其他车不能进来      semaphore.acquire();      System.out.println(Thread.currentThread().getName()+"进入停车位");      //模拟等待      TimeUnit.SECONDS.sleep(2);      //离开车位      System.out.println(Thread.currentThread().getName()+"离开停车位");  } catch (InterruptedException e) {      e.printStackTrace();  }finally {      //release()将此车位释放,其他车可以进来了      semaphore.release();  }     },String.valueOf(i)).start(); }    }}

ReadWriteLock

package com.cx;import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class Test15 {    public static void main(String[] args) { MyCache myCache = new MyCache(); //写入 for (int i = 0; i < 5; i++) {     final int temp=i;     new Thread(()->{  myCache.put(temp+"",temp+"");  try {      Thread.sleep(1);  } catch (InterruptedException e) {      e.printStackTrace();  }     },String.valueOf(i)).start(); } //读取 for (int i = 0; i < 5; i++) {     final int temp=i;     new Thread(()->{  myCache.get(temp+"",temp+"");     },String.valueOf(i)).start(); }    }}class MyCache{    private volatile Map<String,Object> map=new HashMap<>();    //读写锁:更加细粒的控制    private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();    //存,写    public void put(String key,Object value){ try {     readWriteLock.writeLock().lock();     System.out.println(Thread.currentThread().getName()+"写入"+key);     map.put(key,value);     System.out.println(Thread.currentThread().getName()+"写入ok"); } catch (Exception e) {     e.printStackTrace(); } finally {     readWriteLock.writeLock().unlock(); }    }    //取,读    public void get(String key,Object value){ try {     readWriteLock.readLock().lock();     System.out.println(Thread.currentThread().getName()+"读取"+key);     Object o = map.get(key);     System.out.println(Thread.currentThread().getName()+"读取ok"); } catch (Exception e) {     e.printStackTrace(); } finally {     readWriteLock.readLock().unlock(); }    }}

队列

阻塞队列BlockingQueue

JAVA-JUC并发编程

BlockingQueue四组API

package com.cx;import java.sql.Time;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.TimeUnit;public class Test16 {    public static void main(String[] args) throws InterruptedException { //会抛出异常 //test1(); //不会抛出异常 //test2(); //阻塞一直等待 //test3(); //超时等待 test4();    }    public static void test1(){ //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 System.out.println(objects.add("c")); System.out.println(objects.add("x")); //java.lang.IllegalStateException: Queue full  抛出异常 //System.out.println(objects.add("w")); System.out.println(objects.element());//判断对首是谁 System.out.println("---------"); System.out.println(objects.remove()); System.out.println(objects.remove()); //java.util.NoSuchElementException 抛出异常 System.out.println(objects.remove());    }    public static void test2(){ //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 System.out.println(objects.offer("c")); System.out.println(objects.offer("x")); //不会抛出异常,会输出false System.out.println(objects.offer("w")); System.out.println(objects.peek());//判断对首是谁 System.out.println("---------"); System.out.println(objects.poll()); System.out.println(objects.poll()); //不会抛出异常,会输出null System.out.println(objects.poll());    }    public static void test3() throws InterruptedException { //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 objects.put("c"); objects.put("x"); //objects.put("w");//会一直阻塞 System.out.println(objects.take()); System.out.println(objects.take()); //System.out.println(objects.take());//会一直阻塞    }    public static void test4() throws InterruptedException { //队列的大小 ArrayBlockingQueue<Object> objects = new ArrayBlockingQueue<>(2); //增加完成后,会返回一个boolean值 System.out.println(objects.offer("c")); System.out.println(objects.offer("x")); //不会抛出异常,会输出false //超过两秒就会退出 System.out.println(objects.offer("w",2,TimeUnit.SECONDS)); System.out.println(objects.peek());//判断对首是谁 System.out.println("---------"); System.out.println(objects.poll()); System.out.println(objects.poll()); //不会抛出异常,会输出null //超过两秒就会退出 System.out.println(objects.poll(2,TimeUnit.SECONDS));    }}

同步队列SynchronousQueue

进去一个元素,必须等待取出来之后,才能再往里面放一个元素!

package com.cx;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;/** * 同步队列不储存元素,put了一个元素,必须从里面先take取一个,否则不能再向里面put */public class Test17 {    public static void main(String[] args) { //同步队列 SynchronousQueue<Object> objects = new SynchronousQueue<>(); new Thread(()->{     try {  System.out.println(Thread.currentThread().getName()+"放入put1");  objects.put("put1");  System.out.println(Thread.currentThread().getName()+"放入put2");  objects.put("put2");  System.out.println(Thread.currentThread().getName()+"放入put3");  objects.put("put3");     } catch (InterruptedException e) {  e.printStackTrace();     } },"C").start(); new Thread(()->{     try {  TimeUnit.SECONDS.sleep(2);  System.out.println(Thread.currentThread().getName()+"获取"+objects.take());  TimeUnit.SECONDS.sleep(2);  System.out.println(Thread.currentThread().getName()+"获取"+objects.take());  TimeUnit.SECONDS.sleep(2);  System.out.println(Thread.currentThread().getName()+"获取"+objects.take());     } catch (InterruptedException e) {  e.printStackTrace();     } },"X").start();    }}

池化技术及线程池使用

池化技术:事先准备好一些资源,需要即直接从里面拿来使用,使用过后就返还里面去

线程池好处

  1. 降低资源消耗
  2. 提高响应速度
  3. 方便管理
    即:线程复用,可以控制最大并发数,管理线程

线程池:三大方法、7大参数,4种拒绝策略

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源消耗尽的风险。
Executors返回的线程池对象弊端如下:
1).FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
2).CachedThreadPool和ScheduledThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM

三大方法

本质:ThreadPoolExecutor()

package com.cx;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Executors里的三大方法 */public class Test18 {    public static void main(String[] args) { //test(); //test2(); //test3();    }    public static void test(){ //单个线程 ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 3; i++) {     executorService.execute(()->{  System.out.println(Thread.currentThread().getName()+"ok");     }); }    }    public static void test2(){ //创建一个固定的线程池的大小 ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 3; i++) {     executorService.execute(()->{  System.out.println(Thread.currentThread().getName()+"ok");     }); }    }    public static void test3(){ //可伸缩的,遇强则强,遇弱则弱 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) {     executorService.execute(()->{  System.out.println(Thread.currentThread().getName()+"ok");     }); }    }}

7大参数及自定义线程池

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(  2,//核心线程池大小  5,//最大线程池大小  3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放  TimeUnit.SECONDS,//超时的单位  new ArrayBlockingQueue<>(3),//阻塞队列的大小  Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变  new ThreadPoolExecutor.AbortPolicy()//拒绝策略,有四种 );

四种拒绝策略

  • new ThreadPoolExecutor.AbortPolicy()

会抛出异常,并且不会执行超出线程池最大承载的任务

public class Test19 {    public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(  2,//核心线程池大小  5,//最大线程池大小  3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放  TimeUnit.SECONDS,//超时的单位  new ArrayBlockingQueue<>(3),//阻塞队列的大小  Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变  new ThreadPoolExecutor.AbortPolicy()//拒绝策略,有四种 );//最大承载:线程池最大大小+阻塞队列大小 for (int i = 0; i < 10; i++) {     threadPoolExecutor.execute(()->{  System.out.println(Thread.currentThread().getName()+"ok");     }); }    }}
  • new ThreadPoolExecutor.CallerRunsPolicy()

不会抛出异常,超出最大承载的任务,都会进入主线程main中执行

package com.cx;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test19 {    public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(  2,//核心线程池大小  5,//最大线程池大小  3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放  TimeUnit.SECONDS,//超时的单位  new ArrayBlockingQueue<>(3),//阻塞队列的大小  Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变  new ThreadPoolExecutor.CallerRunsPolicy()//拒绝策略,有四种 ); //最大承载:线程池最大大小+阻塞队列大小 for (int i = 1; i <= 10; i++) {     threadPoolExecutor.execute(()->{  System.out.println(Thread.currentThread().getName()+"ok");     }); }    }}
  • new ThreadPoolExecutor.DiscardPolicy()

不会抛出异常,会直接丢掉超出最大承载的任务

package com.cx;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test19 {    public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(  2,//核心线程池大小  5,//最大线程池大小  3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放  TimeUnit.SECONDS,//超时的单位  new ArrayBlockingQueue<>(3),//阻塞队列的大小  Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变  new ThreadPoolExecutor.DiscardPolicy()//拒绝策略,有四种 ); //最大承载:线程池最大大小+阻塞队列大小 for (int i = 1; i <= 10; i++) {     threadPoolExecutor.execute(()->{  System.out.println(Thread.currentThread().getName()+"ok");     }); }    }}
  • new ThreadPoolExecutor.DiscardOldestPolicy()

最大承载满了之后,后续进来的线程会和第一个进来的线程竞争,如果竞争过了就会执行此线程,将第一个线程抛弃,否则,则此线程被抛弃

package com.cx;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test19 {    public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(  2,//核心线程池大小  5,//最大线程池大小  3,//5-2,当不是核心线程的线程等待3秒钟还没有被使用,就会被释放  TimeUnit.SECONDS,//超时的单位  new ArrayBlockingQueue<>(3),//阻塞队列的大小  Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不会改变  new ThreadPoolExecutor.DiscardOldestPolicy()//拒绝策略,有四种 ); //最大承载:线程池最大大小+阻塞队列大小 for (int i = 1; i <= 10; i++) {     threadPoolExecutor.execute(()->{  System.out.println(Thread.currentThread().getName()+"ok");     }); }    }}

CPU密集型和IO密集型

Runtime.getRuntime().availableProcessors()获取CPU核数

  • CPU密集型:计算一般是几核,一般最大线程数就设置为几
  • IO密集型:程序中消耗IO很大的线程有几个,一般最大线程数设置为2倍

四大函数式接口

只有一个方法的接口

函数型接口Function

Function 函数型接口,有一个参数,一个输出
public interface Function {
R apply(T t);
}

例1:

package com.cx;import java.util.function.Function;public class Test20 {    public static void main(String[] args) { Function function=new Function<String,String>() {     @Override     public String apply(String o) {  return o;     } }; System.out.println(function.apply("cx"));  Function function1=(str)->{return str;}; System.out.println(function1.apply(2));    }}

断定型接口Predicate

有一个输入参数,返回值只能是布尔值
public interface Predicate {
boolean test(T t);
}

package com.cx;import java.util.function.Predicate;public class Test21 {    public static void main(String[] args) { Predicate<String > objectPredicate = new Predicate<String>() {     @Override     public boolean test(String s) {  return s.isEmpty();     } }; System.out.println(objectPredicate.test("")); Predicate<String > objectPredicate1=(str)->{     return str.isEmpty(); }; System.out.println(objectPredicate1.test("cx"));    }}

消费型接口Consumer

有一个输入参数,没有返回值
public interface Consumer {
void accept(T t);
}

package com.cx;import java.util.function.Consumer;public class Test22 {    public static void main(String[] args) { Consumer<String> objectConsumer = new Consumer<String>() {     @Override     public void accept(String s) {  System.out.println(s);     } }; objectConsumer.accept("get"); Consumer<String> objectConsumer2=(str)->{     System.out.println(str); }; objectConsumer2.accept("get2");    }}

供给型接口Supplier

没有输入参数,有一个返回值
public interface Supplier {
T get();
}

package com.cx;import java.util.function.Supplier;public class Test23 {    public static void main(String[] args) { Supplier<String> objectSupplier = new Supplier<String>() {     @Override     public String get() {  return "put";     } }; System.out.println(objectSupplier.get()); Supplier<String> objectSupplier2=()->{     return "put2"; }; System.out.println(objectSupplier2.get());    }}

Stream流式计算

package com.cx;import java.util.ArrayList;import java.util.Locale;import java.util.Random;/** * ID必须是偶数 * 年龄必须大于4 *用户名大写 * 用户名字母倒序 * 输出一个 */public class Test24 {    public static void main(String[] args) { ArrayList<User> users = new ArrayList<>(); Random random = new Random(); for (int i = 0; i < 5; i++) {     int i1 = random.nextInt(10);     User user = new User(i, String.valueOf((char)(i+i1+97)),i+i1);     users.add(user); } for (User user : users) {     System.out.println(user); } System.out.println("-----------"); users.stream()  .filter(u->{return u.getId()%2==0;})  .filter(u->{return u.getAge()>4;})  .map(u->{return u.getName().toUpperCase();})  .sorted((u1,u2)->{return ((String) u2).compareTo((String)u1);})  .limit(1)  .forEach(System.out::println);    }}

ForkJoin详解

特点 :工作窃取,里面维护的是双端队列,将大任务化为小任务来执行

package com.cx;import java.util.concurrent.RecursiveTask;/** * 求从任意连个值之间的累计和 */public class ForkJoinTest extends RecursiveTask<Long> {    private Long start;    private Long end;    //临界值    private Long temp;    //返回值    private Long sum=0L;    public ForkJoinTest(Long start, Long end,Long temp) { this.start = start; this.end = end; this.temp=temp;    }    //计算方法    @Override    protected Long compute() { if ((end-start)<temp){     for (Long i = start; i < end; i++) {  sum+=i;     }     return sum; }else {     long middle=(start+end)/2;//取中间值     ForkJoinTest task1 = new ForkJoinTest(start, middle,temp);     task1.fork();//拆分任务,把任务压入线程队列     ForkJoinTest task2 = new ForkJoinTest(middle, end,temp);     task2.fork();//拆分任务,把任务压入线程队列     return task1.join()+task2.join();//返回两个任务的结果和 }    }}
package com.cx;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.stream.LongStream;public class Test25 {    public static void main(String[] args) throws ExecutionException, InterruptedException { long start=0L; long end=10_0000_0000L; long temp=5_00_0000L; //test1(start,end);//3107s //test2(start,end,temp);//5429 test3(start,end);//125    }    public static void test1(long start,long end){ Long sum=0L; long l = System.currentTimeMillis(); for (long i = start; i < end; i++) {     sum+=i; } long l2 = System.currentTimeMillis(); System.out.println("sum="+sum+"所有时间是:"+(l2-l));    }    public static void test2(long start,long end,long temp) throws ExecutionException, InterruptedException { Long sum=0L; long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinTest(start,end,temp); ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务 sum=submit.get();//获取最终结果 long l2 = System.currentTimeMillis(); System.out.println("sum="+sum+"所有时间是:"+(l2-l));    }    //并行流    public static void test3(long start,long end) throws ExecutionException, InterruptedException { long l = System.currentTimeMillis(); //range是(),rangeClosed是(] long sum = LongStream.range(start, end).parallel().reduce(0, Long::sum); long l2 = System.currentTimeMillis(); System.out.println("sum="+sum+"所有时间是:"+(l2-l));    }}

异步回调

  • Future设计初衷:对将来的某个事件的结果进行建模
package com.cx;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;public class Test26 {    public static void main(String[] args) { //test1(); test2();    }    /**     * 没有返回值的异步回调     */    public static void test1(){ CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(()->{     try {  TimeUnit.SECONDS.sleep(4);  System.out.println(Thread.currentThread().getName()+":runAsync void");     } catch (InterruptedException e) {  e.printStackTrace();     } }); System.out.println("正常执行");//不需要等待上面的程序执行结果 try {     Object o=voidCompletableFuture.get();//获取阻塞结果     System.out.println(o); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); } System.out.println("正常执行2");//需要等待异步执行结果的获取    }    /**     * 有返回值的异步回调     */    public static void test2(){ try {     CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{  int i=10/0;  //此时的返回值,就是whenComplete,里面的SUCCESS参数  return Thread.currentThread().getName()+":SUCCESS";     });     String s = objectCompletableFuture.whenComplete((SUCCESS, ERROR) -> {     }).exceptionally((e) -> {  //此时的返回值,就是whenComplete,里面的ERROR参数  return "exception:" + e.getMessage();//返回异常信息     }).get();     System.out.println(s); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); }    }}

理解JMM

JMM:java内存模型,是一种概念,或者约定。
JMM即为JAVA 内存模型(java memory model)。因为在不同的硬件⽣产商和不同的操作系统下,内存的访问逻辑有⼀定的差异,结果就是当你的代码在某个系统环境下运⾏良好,并且线程安全,但是换了个系统就出现各种问题。Java内存模型,就是为了屏蔽系统和硬件的差异,让⼀套代码在不同平台下能到达相同的访问结果。

JMM规定了内存主要划分为主内存和⼯作内存两种。此处的主内存和⼯作内存跟JVM内存划分(堆、栈、⽅法区)是在不同的层次上进⾏的,如果⾮要对应起来,主内存对应的是Java堆中的对象实例部分,⼯作内存对应的是栈中的部分区域,从更底层的来说,主内存对应的是硬件的物理内存,⼯作内存对应的是寄存器和⾼速缓存。

JVM在设计时候考虑到,如果JAVA线程每次读取和写⼊变量都直接操作主内存,对性能影响⽐较⼤,所以每条线程拥有各⾃的⼯作内
存,⼯作内存中的变量是主内存中的⼀份拷贝,线程对变量的读取和写⼊,直接在⼯作内存中操作,⽽不能直接去操作主内存中的变量。但
是这样就会出现⼀个问题,当⼀个线程修改了⾃⼰⼯作内存中变量,对其他线程是不可见的,会导致线程不安全的问题。因为JMM制定了⼀
套标准来保证开发者在编写多线程程序的时候,能够控制什么时候内存会被同步给其他线程
JAVA-JUC并发编程

内存交互操作

内存交互操作有8种,虚拟机实现必须保证每⼀个操作都是原⼦的,不可在分的

  • lock (锁定):作⽤于主内存的变量,把⼀个变量标识为线程独占状态
  • unlock (解锁):作⽤于主内存的变量,它把⼀个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  • read (读取):作⽤于主内存变量,它把⼀个变量的值从主内存传输到线程的⼯作内存中,以便随后的load动作使⽤
  • load (载⼊):作⽤于⼯作内存的变量,它把read操作从主存中变量放⼊⼯作内存中
  • use (使⽤):作⽤于⼯作内存中的变量,它把⼯作内存中的变量传输给执⾏引擎,每当虚拟机遇到⼀个需要使⽤到变量的值,就会使⽤到这个指令
  • assign (赋值):作⽤于⼯作内存中的变量,它把⼀个从执⾏引擎中接受到的值放⼊⼯作内存的变量副本中
  • store (存储):作⽤于主内存中的变量,它把⼀个从⼯作内存中⼀个变量的值传送到主内存中,以便后续的write使⽤
  • write  (写⼊):作⽤于主内存中的变量,它把store操作从⼯作内存中得到的变量的值放⼊主内存的变量中

JMM对这⼋种指令的使⽤,制定了如下规则:

  • 不允许read和load、store和write操作之⼀单独出现。即使⽤了read必须load,使⽤了store必须write
  • 不允许线程丢弃他最近的assign操作,即⼯作变量的数据改变了之后,必须告知主存
  • 不允许⼀个线程将没有assign的数据从⼯作内存同步回主内存
  • ⼀个新的变量必须在主内存中诞⽣,不允许⼯作内存直接使⽤⼀个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
  • ⼀个变量同⼀时间只有⼀个线程能对其进⾏lock。多次lock后,必须执⾏相同次数的unlock才能解锁
  • 如果对⼀个变量进⾏lock操作,会清空所有⼯作内存中此变量的值,在执⾏引擎使⽤这个变量前,必须重新load或assign操作初始化变量的值
  • 如果⼀个变量没有被lock,就不能对其进⾏unlock操作。也不能unlock⼀个被其他线程锁住的变量
  • 对⼀个变量进⾏unlock操作之前,必须把此变量同步回主内存

JMM对这⼋种操作规则和对就能确定哪⾥操作是线程安全,哪些操作是线程不安全的了。但是这些规则实在复杂,很难在实践中直接分析。所以⼀般我们也不会通过上述规则进⾏分析。更多的时候,使⽤java的happen-before规则来进⾏分析。
JAVA-JUC并发编程
例如下面代码,循环不会停止:

package com.cx;import java.util.concurrent.TimeUnit;public class Test27 {    private static int num=0;    public static void main(String[] args) { new Thread(()->{//线程对主线程内存的变化不可见,即不知道其已经发生了变化     while (num==0){  //System.out.println("start");     } }).start(); try {     TimeUnit.SECONDS.sleep(1);     num=1;//main线程执行 } catch (InterruptedException e) {     e.printStackTrace(); }    }}

模型特征

JAVA-JUC并发编程

Volatile

  • volatile:是java虚拟机提供轻量级的同步机制
  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

保证可见性

package com.cx;import java.util.concurrent.TimeUnit;public class Test27 {//volatile 保证了可见性    private volatile static int num=0;    public static void main(String[] args) { new Thread(()->{     while (num==0){  //System.out.println("start");     } }).start(); try {     TimeUnit.SECONDS.sleep(1);     num=1;//main线程执行     System.out.println(num); } catch (InterruptedException e) {     e.printStackTrace(); }    }}

不保证原子性

package com.cx;import java.util.concurrent.TimeUnit;public class Test28 {    /**     * 加了volatile,不能保证原子性     *就比如A线程将NUM变为100后,从工作内存中放入主内存中,此时NUM=100     *但是此时线程B将NUM变为20后,从工作内存中放入主内存中,此时NUM=20     * 这样连个线程执行完后,理论上是加到2000000,实际达不到     *     */    private  volatile static int num=0;    public static void main(String[] args) { new Thread(()->{     for (int i=0;i<1000000;i++){  add();     } },"A").start(); new Thread(()->{     for (int i=0;i<1000000;i++){  add();     } },"B").start(); while (Thread.activeCount()>2){     Thread.yield(); } System.out.println("num:"+num);    }    public static void add(){     num++;    }}

通过原子类保证原子性

  • java.util.concurrent.atomic
    这些类的底层都直接喝操作系统挂钩!在内存中修改值
package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class Test28 {    /**     * 加了volatile,不能保证原子性     *就比如A线程将NUM变为100后,从工作内存中放入主内存中,此时NUM=100     *但是此时线程B将NUM变为20后,从工作内存中放入主内存中,此时NUM=20     * 这样连个线程执行完后,理论上是加到2000000,实际达不到,     * 想要达到保证原子性,不用lock和synchronized,     * 那么可以使用 todo 原子类     *     */    private  volatile static AtomicInteger num=new AtomicInteger();    public static void main(String[] args) { new Thread(()->{     for (int i=0;i<1000000;i++){  add();     } },"A").start(); new Thread(()->{     for (int i=0;i<1000000;i++){  add();     } },"B").start(); while (Thread.activeCount()>2){//main gc基本线程     Thread.yield(); } System.out.println("num:"+num);    }    public static void add(){     num.getAndIncrement();//相当于 num+1,底层使用的是CAS    }}

禁止指令重排

指令重排:程序的执行顺序,可以被改变
** 内存屏障**作用:

  1. 保证特定的操作的执行顺序!
  2. 可以保证某些变量的内存可见
    JAVA-JUC并发编程

深入理解CAS

CAS即compare and swap的缩写,比较并且交换,CAS是cpu的并发原语

  • 处理器(包括 Intel 和 Sparc 处理器)使用的最通用的方法是实现名为 比较并转换或 CAS 的原语。(在 Intel 处理器中,比较并交换通过指令的 cmpxchg 系列实现。PowerPC 处理器有一对名为“加载并保留”和“条件存储”的指令,它们实现相同的目地;MIPS 与 PowerPC 处理器相似,除了第一个指令称为“加载链接”。)
  • CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置的值即可。”
  • 通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。
  • 类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法可以对该操作重新计算。清单 3 说明了 CAS 操作的行为(而不是性能特征),但是 CAS 的价值是它可以在硬件中实现,并且是极轻量级的(在大多数处理器中)。
package com.cx;import java.util.concurrent.atomic.AtomicInteger;public class Test30 {    public static void main(String[] args) { //给一个初始值0 AtomicInteger atomicInteger = new AtomicInteger(0); //todo 有两个参数, (int expect,int update) //可以理解为,如果值为我们期望的值,那么就更新为更新的值 //compareAndSet等同于CAS boolean b = atomicInteger.compareAndSet(0, 1); //打印为true:1 System.out.println(b+":"+atomicInteger.get()); //此时再次使用同样的方法发现 boolean c = atomicInteger.compareAndSet(0, 1); //打印为false:1 System.out.println(c+":"+atomicInteger.get());    }}

可以看到我们使用使用的原子类AtomicInteger 有类似CAS的方法,进一步进入AtomicInteger类:

public class AtomicInteger extends Number implements java.io.Serializable {    private static final long serialVersionUID = 6214790243416807050L;    // setup to use Unsafe.compareAndSwapInt for updates    private static final Unsafe unsafe = Unsafe.getUnsafe();    private static final long valueOffset;    static { try {     valueOffset = unsafe.objectFieldOffset  (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); }    }    private volatile int value;    /**     * Creates a new AtomicInteger with the given initial value.     *     * @param initialValue the initial value     */    public AtomicInteger(int initialValue) { value = initialValue;    }    /**     * Creates a new AtomicInteger with initial value {@code 0}.     */    public AtomicInteger() {    }    ......

此时出现了Unsafe这个类,我们可以通过百度:

Unsafe

  • 在JDK1.5之后,Java程序中才可以使用CAS操作.
    该操作由sun.misc.Unsafe类里面的compareAndSwapInt()compareAndSwapLong()等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令,没有方法调用的过程,或者可以认为是无条件内联进去了。
  • 由于Unsafe类不是提供给用户提供调用的类(Unsafe.getUnsafe()的代码中限制了只有启动类加载器(Bootstrap ClassLoader)加载的Class才能访问它),因此,如果不采用反射手段,我们只能通过其他的Java API来间接使用它,如J.U.C包里面的整数原子类,其中的compareAndSet()和 getAndIncrement()等方法都使用了 Unsafe 类的 CAS 操作

回到我们之前的问题,通过原子类的atomicInteger.getAndIncrement()类似于num++,那么底层是:

public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1);    }

unsafe.getAndAddInt:

    public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do {     var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5;    }

getIntVolatile获取当前对象var1地址中值给到var5,compareAndSwapInt判断var1里的值var2是否与var5相等,如果相等,那么就var5+1,此时修改成功,那么this.compareAndSwapInt(var1, var2, var5, var5 + var4))就返回true,会天哦出循环,否则就一直循环(自旋锁)

CAS

比较当前工作内存中的值,如果这个值是期望的,那么则执执行操作!如果不是就一直循环!

  • 缺点:
    1.循环会耗时
    2.一次性只能保证一个共享变量的原子性
    3.ABA问题

ABA问题

我们期望的值被人动过了,但是动过了之后,又复原了!

package com.cx;import java.util.concurrent.atomic.AtomicInteger;public class Test30 {    public static void main(String[] args) { //========================捣乱的线程======================== AtomicInteger atomicInteger = new AtomicInteger(0); boolean b = atomicInteger.compareAndSet(0, 1); System.out.println(b+":"+atomicInteger.get()); boolean c = atomicInteger.compareAndSet(1, 0); System.out.println(c+":"+atomicInteger.get()); //========================正常的线程======================== boolean d = atomicInteger.compareAndSet(0, 1); System.out.println(d+":"+atomicInteger.get());    }}

JAVA-JUC并发编程

原子引用解决ABA问题

原子引用:带版本号的原子操作

package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicStampedReference;public class Test31 {    //注意,如果泛型是一个包装类,注意对象的引用    //AtomicStampedReference(1,2) 初始值是1,版本号是2    static AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(1,2);    public static void main(String[] args) { int stamp1 = atomicStampedReference.getStamp();//获取初始版本号 Integer reference2 = atomicStampedReference.getReference();//获取初始期望值 System.out.println(Thread.currentThread().getName()+",初始期望值是:"+atomicStampedReference.getReference()+",初始版本号是"+stamp1); //我是捣乱的线程,我现在不知道期望值和版本号是多少 new Thread(()->{     //捣乱的线程不知道期望值是多少,版本号是多少,所以     int stamp = atomicStampedReference.getStamp();//获取版本号     try {  TimeUnit.SECONDS.sleep(2);     } catch (InterruptedException e) {  e.printStackTrace();     }     boolean b1 = atomicStampedReference.compareAndSet(1, 3, atomicStampedReference.getStamp(), atomicStampedReference.getStamp()+1);     //查看是否更新成功,新值是多少     System.out.println(Thread.currentThread().getName()+":修改是否成功:"+b1+",现在期望值是:"+atomicStampedReference.getReference()+",现在版本号是:"+atomicStampedReference.getStamp());     boolean b2 = atomicStampedReference.compareAndSet(atomicStampedReference.getReference(), 1, atomicStampedReference.getStamp(), stamp);     System.out.println("我又改回去啦!"+b2); },"A").start(); //我是正常的线程,正常的代码要写成死的 new Thread(()->{     int stamp = atomicStampedReference.getStamp();//获取版本号     Integer reference = atomicStampedReference.getReference();//获取期望值     //期望是1,改为2,并且将版本号+1     boolean b1 = atomicStampedReference.compareAndSet(1, 2, stamp, stamp + 1);     //查看是否更新成功,新值是多少     System.out.println(Thread.currentThread().getName()+":修改是否成功:"+b1+",现在期望值是:"+atomicStampedReference.getReference()+",现在版本号是:"+atomicStampedReference.getStamp()); },"B").start();    }}

注意:所有的相同类型的包装类对象之间值的比较,全部使用equals方法比较

对于Integer var=? 在-128~127之间的赋值,Integer对象是在缓存产生,会复用已有对象,这个区间内的Integer值可以直接使用==进行判断,但是这个区间之外的所有数据,都会在堆上产生,并不会复用已有对象,这是一个大坑,推荐使用equals方法进行判断。

可重入锁

递归锁,;类似与房间里的房门,想进房门,就必须先进大门

synchronized 实现

package com.cx;import java.util.concurrent.TimeUnit;//synchronized 实现public class Test32 {    public static void main(String[] args) { Phones phone = new Phones(); new Thread(()->{     phone.emill();     try {  TimeUnit.SECONDS.sleep(1);     } catch (InterruptedException e) {  e.printStackTrace();     } },"A").start(); new Thread(()->{     phone.emill(); },"B").start();    }}class Phones{    public synchronized void emill(){ System.out.println(Thread.currentThread().getName()+"sms"); call();    }    public synchronized void call(){ System.out.println(Thread.currentThread().getName()+"call");    }}

lock实现

package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Test32 {    public static void main(String[] args) { Phones phone = new Phones(); new Thread(()->{     phone.emill();     try {  TimeUnit.SECONDS.sleep(1);     } catch (InterruptedException e) {  e.printStackTrace();     } },"A").start(); new Thread(()->{     phone.call(); },"B").start();    }}class Phones{    Lock lock=new ReentrantLock();    public void emill(){ try {     lock.lock();//锁必须配对,否则就会死在里面     lock.lock();     System.out.println(Thread.currentThread().getName()+"sms");     call(); } catch (Exception e) {     e.printStackTrace(); } finally {     lock.unlock();     lock.unlock(); }    }    public void call(){ try {     lock.lock();     System.out.println(Thread.currentThread().getName()+"emill"); } catch (Exception e) {     e.printStackTrace(); } finally {     lock.unlock(); }    }}

自旋锁

package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicReference;public class Singlock {    public static void main(String[] args) { SpiglockT lock = new SpiglockT(); new Thread(()->{     lock.myLock();     try {  TimeUnit.SECONDS.sleep(5);     } catch (InterruptedException e) {  e.printStackTrace();     }finally {  lock.myUnLock();     } },"A").start(); new Thread(()->{     lock.myLock();     try {  TimeUnit.SECONDS.sleep(5);     } catch (InterruptedException e) {  e.printStackTrace();     }finally {  lock.myUnLock();     } },"B").start();    }}//底层使用CASclass SpiglockT{    //默认是个空线程 atomicReference=null    AtomicReference<Thread> atomicReference=new AtomicReference<>();    //加锁    public void myLock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"--> myLock"); //自旋锁 //如果线程是null,就把atomicReference更新为thread 那么就退出循环 while (!atomicReference.compareAndSet(null,thread)){ }    }    //解锁    public void myUnLock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"--> myUnLock"); //如果线程是thread,就把atomicReference更新为null atomicReference.compareAndSet(thread,null);    }}

死锁排查

package com.cx;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class DiedLock {    public static void main(String[] args) { TestT testT = new TestT("A","B"); TestT testT2 = new TestT("B","A"); new Thread(()->{     testT.run1(); },"A").start(); new Thread(()->{     testT2.run1(); },"B").start();    }}class TestT{    private String t1;    private String t2;    public TestT(String t1,String t2) { this.t1=t1; this.t2=t2;    }    Lock lock=new ReentrantLock();    public void run1(){ //synchronized (this) 锁住的是对象 synchronized (t1){     System.out.println(Thread.currentThread().getName()+"我拿到了"+t1+"我再去拿"+t2);     synchronized (t2){  System.out.println(Thread.currentThread().getName()+"我拿到了"+t2+"我再去拿"+t1);     } }    }}

排查死锁

  • 定位进程号:jps -l(小写的L)
  • 查看死锁问题: jstack 进程号

JAVA-JUC并发编程

JAVA-JUC并发编程

=========================================================
java-JUC并发编程 推荐狂神说

端木振平资料集合分享

书本网