> 技术文档 > JAVAEE--4.多线程案例

JAVAEE--4.多线程案例

设计模式

1.单例模式

        1.1饿汉模式

        1.2懒汉模式(单线程版)

        1.3懒汉模式(多线程版本)

        1.4懒汉模式(多线程版本进阶版)

2.阻塞队列

3.定时器

4.线程池

1.单例模式

        设计模式是\"软性约束\",不是强制的,可以遵守也可以不遵守,按照设计模式写代码使代码不会太差

        框架是\"硬性约束\",必须遵守

        单例模式能保证某个类在程序中只保存唯一一个实例,而不会创建多个

        单例模式具体的实现方式有很多,最常见的是\"饿汉\"和\"懒汉\"两种

1.1饿汉模式
 

饿汉模式的饿代表急迫,一开始就把实例加载到内存中了

class singleton{ private singleton(){}; private static singleton instance=new singleton(); public static singleton getInstance(){ return instance; }}public class demo17 { public static void main(String[] args) { singleton s1=singleton.getInstance(); singleton s2=singleton.getInstance(); System.out.println(s1==s2); }}

饿汉模式属于线程安全的,不涉及修改操作,只有读取操作

注释:1.private singleton(){};将构造方法设为私有,可以使得这个类无法new出来,更安全

        2.只有一个对外的接口,getInstance(){},更安全

        3.这个实例在程序加载的时候就被初始化了,属于类对象

1.2懒汉模式(单线程版)

懒汉模式的懒标识懒加载,提高效率

class singletonLazy{ private singletonLazy(){}; private static singletonLazy instance =null; public static singletonLazy getInstance(){ if(instance==null){ instance=new singletonLazy(); } return instance; }}public class demo18 { public static void main(String[] args) { singletonLazy s1=singletonLazy.getInstance(); singletonLazy s2=singletonLazy.getInstance(); System.out.println(s1==s2); }}

这里是单线程版本的懒汉模式,属于线程不安全

if(instance==null){

instance=new singletonLazy();

}

涉及到判定和修改操作,这个操作因为不是原子性的,所以会导致出现问题

此处实例的初始化并不是程序启动时,而是第一次调用方法的时候初始化实例

 会提升性能,减少加载时间                      

单例模式只能避免别人失误,无法应对别人的故意攻击(如:序列化反序列化,反射)

先判定后修改是常见的线程不安全问题

1.3懒汉模式(多线程版)

class singletonLazy{//多线程版本 private singletonLazy(){}; private static Object Locker=new Object(); private static singletonLazy instance =null; public static singletonLazy getInstance(){ synchronized (Locker){ if(instance==null){ instance=new singletonLazy(); } } return instance; }}public class demo18 { public static void main(String[] args) { singletonLazy s1=singletonLazy.getInstance(); singletonLazy s2=singletonLazy.getInstance(); System.out.println(s1==s2); }}

多线程版本需要考虑到线程的安全性,刚刚提到先修改后判定是不安全的,所以我们给它加上synchronize

把if和new包裹到一起

1.4懒汉模式(多线程版本进阶)

仅仅是上述代码依旧无法解决线程安全问题

上完锁后也可能会造成阻塞状态

会造成性能问题,所以可以在所外进行应该判定if(instance==null)
同时修改操作有可能造成内存可见性问题以及指令重排序问题,所以需要加上volatile关键字

class singletonLazy{//多线程版本进阶版 private singletonLazy(){}; private static Object Locker=new Object(); private static volatile singletonLazy instance =null; public static singletonLazy getInstance(){ if(instance ==null){ synchronized (Locker){ if(instance==null){  instance=new singletonLazy(); } } } return instance; }}public class demo18 { public static void main(String[] args) { singletonLazy s1=singletonLazy.getInstance(); singletonLazy s2=singletonLazy.getInstance(); System.out.println(s1==s2); }}

2.阻塞队列

阻塞队列是一种特殊的队列,遵守\"先进先出的\"原则(线程安全+阻塞特性)

阻塞队列是一种线程安全的数据结构,并且具有以下的特点

1.当队列满的时候,继续入队列就会产生阻塞, 直到有其他线程从队列中取走元素

2.当队列为空的时候,继续出队列也会产生阻塞,直到有其他的线程从队列里插入元素

阻塞队列的一个典型的应用场景就是\"生产者消费者模型\",这是一种非常经典的开发模型

通常提到的\"阻塞队列\"是代码中的一个数据结构

但由于这个东西太好用了,以至于把这样的数据结构单独的封装成一个服务器程序

并且在单独的服务器上进行部署

此时,这样的阻塞队列,有了一个新的名字,\"消息队列\"(Message Queue,MQ)

A只和队列通信,B也只和队列通信

A不知道B的存在,代码中更没有B的影子

B也不知道A的存在,代码中没有A的影子

A的数据量激增的情况下,A往队列中写入的数据变快了,但是B任然可以按照原有的速度来消费数据,阻塞队列抗下这样的压力

生产者/消费者模型        

        这个模型的好处:

1.服务器之间的\"解耦合\"

                                模块之间的关联程度/影响程度

                                更希望见到的是低耦合

        

2.阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力(削峰填谷)   

问题:

1).为啥一个服务器,收到的请求更多,就可能会挂(可能会崩溃)

一台服务器就是一台电脑,上面提供了一些硬件资源(包括但不限于,CPU,内存,硬盘,网络带宽...)

就算你这个机器,配置的再好,硬件资源也是有限的

服务器每次收到一个请求,处理这个请求的过程都需要执行一系列的代码,在执行这些代码过程中,就会需要消耗一定的硬件资源\"CPU,内存,硬盘,网络带宽\"

这些请求消耗的总的硬件资源的量,超过了机器能提供的上限,那么此时机器就会出现问题(卡死,程序直接崩溃等)

2).为什么A和消息队列没有挂,只有B会挂

A的角色是一个\"网关服务器\",收到客户端的请求再把请求转发给其他服务器

这样的服务器里面的代码,做的工作比较简单(单纯的数据转发),消耗的硬件资源通常是更少

处理一个请求消耗的资源更少,同样的配置下,就能支持更多的请求来处理

同理,队列其实也是比较简单的程序,单位请求消耗的硬件资源也是比较少的

B这个服务器,是真正干活的服务器,要真正的完成一系列的业务逻辑

这一系列工作,代码量是非常的庞大,消耗的时间也是很多的,消耗的系统硬件资源也是更多的

类似的,MySQL这样的数据库处理每个请求的时候,做的工作就是比较多的,消耗的硬件资源也是比较多的,因此MySQL也是后端系统重容易挂的部分

对应的,像Redis这种内存数据库,处理请求做的工作远远少于MySQL做的工作,消耗的资源更少,Redis就比MySQL更不容易挂

代价:

1)需要更多的机器

2)A和B之间的通信的延时会变长

标准库中的阻塞队列

在java标准库中内置了阻塞队列,如果我们需要在一些程序中使用阻塞队列,直接使用标准库中的即可

BlockingQueue是一个接口,真正实现的类是Linked/Array/priorityBlockingQueue.

put方法用于阻塞式的入队列,take用于阻塞式的出队列

BlockingQueue也有offer,poll,peek等方法,但是这些方法不带有阻塞性质

生产者消费者模型

public class demo20 {//生产者消费者模型 public static void main(String[] args) throws InterruptedException { BlockingQueue queue =new ArrayBlockingQueue(1000); Thread thread1=new Thread(()->{//生产者 int i=0; while(true){ try {  System.out.println(\"生产元素\"+i);  queue.put(i);  i++; } catch (InterruptedException e) {  e.printStackTrace(); } } }); Thread thread2=new Thread(()->{//消费者 while(true){  try {  int num=queue.take();  System.out.println (\"消费元素\"+num);  Thread.sleep(1000);  } catch (InterruptedException e) {  e.printStackTrace();  } } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); }}

阻塞队列的实现

1.通过\"循环队列\"的方式来实现

2.使用synchronized进行加锁控制

3.put插入元素的时候,判定如果队列满了,就进行wait.(注意,要在循环中进行wait,被唤醒时候不一定队列就不满了,因为同时是可能唤醒多个线程),以后只要出现wait,就使用while!!

4.take取出元素的时候,判定如果队列为空,就进行wait(也是循环wait)

class MyBlockingQueue{ private String[] data=null; private volatile int head=0; private volatile int tail=0; private volatile int size=0; public MyBlockingQueue(int capacity){ data=new String[capacity]; } public void put(String s) throws InterruptedException {//生产者 synchronized (this){ while(size==data.length){  this.wait(); //满了就不能再放了 } data[tail]=s; size++; tail++; if(tail>=data.length){ tail=0; } this.notify(); } } public String take() throws InterruptedException {//消费者 String s=\"\"; synchronized (this){ while(data.length==0){ this.wait();//如果没有内容的时候就阻塞 } s=data[head]; head++; size--; if(head>=data.length){ head=0; } this.notify(); } return s; }}public class demo21 { public static void main(String[] args) throws InterruptedException { MyBlockingQueue queue =new MyBlockingQueue(1000); Thread thread1=new Thread(()->{//生产者 int i=0; while(true){ try {  System.out.println(\"生产元素\"+i);  queue.put(\"\"+i);  i++; } catch (InterruptedException e) {  e.printStackTrace(); } } }); Thread thread2=new Thread(()->{//消费者 while(true){ try {  String num=queue.take();  System.out.println (\"消费元素\"+num);  Thread.sleep(1000); } catch (InterruptedException e) {  e.printStackTrace(); } } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); }}

3.定时器(Timer)

        定义:定时器也是软件开发的一个重要组件,类似于一个\"闹钟\",达到一个设定的时间之后,就执行某个制定好的代码

        定时器是一种实际开发中非常常用的组件

        比如网络通信中,如果对方500ms内没有返回数据,则会断开连接尝试重连

比如一个Map,希望里面的KEY在3s之后过期(自动删除)

类似于这样的场景就需要使用到定时器

标准库中的定时器

标准库中提供了Timer类,Timer类的核心方法是schedule

schedule 包含两个参数,第一个参数指定即将要执行的任务代码,第二个参数指定多长时间之后执行(单位为毫秒)

import java.util.Timer;import java.util.TimerTask;public class demo24 { public static void main(String[] args) {//使用定时器 Timer timer=new Timer(); timer.schedule(new TimerTask() { @Override public void run() { System.out.println(\"Hello world3\"); } },3000); timer.schedule(new TimerTask() { @Override public void run() { System.out.println(\"Hello world2\"); } },2000); timer.schedule(new TimerTask() { @Override public void run() { System.out.println(\"Hello world1\"); } },1000); }}

结果:

模拟实现定时器

class MyTimerTask implements Comparable{ Runnable runnable; long time; public MyTimerTask(Runnable runnable,long delay){ this.runnable=runnable; this.time=delay+System.currentTimeMillis(); } public long getTime(){ return time; } public void run(){ runnable.run(); } @Override public int compareTo(MyTimerTask o) { return (int)(this.time-o.time); }}

Task类用来描述一个任务(作为Timer的内部类),里面包含一个runnable和一个time(毫秒时间戳)

这个对象需要放在优先级队列中,因此需要实现Comparable接口

class MyTimer{ private PriorityQueue queue=new PriorityQueue(); private Object Locker=new Object(); MyTimer(){ Thread thread=new Thread(()->{  try { while(true){ synchronized (Locker){ while(queue.isEmpty()){//如果为空的时候就进入阻塞 Locker.wait(); } MyTimerTask currentmyTimeTask=queue.poll(); if(System.currentTimeMillis()>=currentmyTimeTask.getTime()){ currentmyTimeTask.run(); }else{  Locker.wait(currentmyTimeTask.getTime()-System.currentTimeMillis());  queue.offer(currentmyTimeTask); } } }  } catch (InterruptedException e) {  e.printStackTrace();  } }); thread.start(); } public void schedule(Runnable runnable,long delay){ synchronized (Locker){ MyTimerTask myTimerTask=new MyTimerTask(runnable,delay); queue.offer(myTimerTask); Locker.notify(); } }}

Timer实例中,通过priorityQueue来组织若干个Task对象

通过schedule来往队列中插入一个个Task对象

Timer类中存在一个worker线程,一直不停的扫描队首元素,看看是否能执行这个任务

全部代码:

import java.util.PriorityQueue;import java.util.Timer;import java.util.TimerTask;class MyTimerTask implements Comparable{ Runnable runnable; long time; public MyTimerTask(Runnable runnable,long delay){ this.runnable=runnable; this.time=delay+System.currentTimeMillis(); } public long getTime(){ return time; } public void run(){ runnable.run(); } @Override public int compareTo(MyTimerTask o) { return (int)(this.time-o.time); }}class MyTimer{ private PriorityQueue queue=new PriorityQueue(); private Object Locker=new Object(); MyTimer(){ Thread thread=new Thread(()->{  try { while(true){ synchronized (Locker){ while(queue.isEmpty()){//如果为空的时候就进入阻塞 Locker.wait(); } MyTimerTask currentmyTimeTask=queue.poll(); if(System.currentTimeMillis()>=currentmyTimeTask.getTime()){ currentmyTimeTask.run(); }else{  Locker.wait(currentmyTimeTask.getTime()-System.currentTimeMillis());  queue.offer(currentmyTimeTask); } } }  } catch (InterruptedException e) {  e.printStackTrace();  } }); thread.start(); } public void schedule(Runnable runnable,long delay){ synchronized (Locker){ MyTimerTask myTimerTask=new MyTimerTask(runnable,delay); queue.offer(myTimerTask); Locker.notify(); } }}public class demo25 {//模拟实现定时器 public static void main(String[] args) { MyTimer myTimer=new MyTimer(); myTimer.schedule(new TimerTask() { @Override public void run() { System.out.println(\"Hello world3\"); } },3000); myTimer.schedule(new TimerTask() { @Override public void run() { System.out.println(\"Hello world2\"); } },2000); myTimer.schedule(new TimerTask() { @Override public void run() { System.out.println(\"Hello world1\"); } },1000); }}

 定时器的构成:

1.一个带优先级队列(不要使用PriorityBlockingQueue,容易死锁)

2.队列中每一个元素是一个Task对象\\

3.Task中带有一个时间属性,队首元素就是要执行的任务

4.同时有一个worker线程一直扫描队首元素,看队首元素是否需要执行

业界实现定时器,除了基于优先级队列的方法之外,还有一个经典的实现方式,\"时间轮\" ,它也是一个巧妙设计的数据结构

定时器这个东西特别重要,特别常用,尤其是后端开发,和\"阻塞队列\"类似,也会有专门的服务器,就是用来在分布式系统中实现定时器这样的效果

hashmap  => redis

阻塞队列  => 消息队列

定时器  => 定时任务

4.线程池(ThreadPoolExecutor)

最初引入线程是因为线程太重了,频繁的创建销毁进程,开销大

随着业务上对于性能的要求越来越高,线程创建/销毁的频次越来越多,此时线程创建销毁的开销变的比较明显,无法忽略不计了

线程池就是解决上述问题的常见方案

线程池就是把线程提前从系统中申请好,放到一个地方

后面需要使用线程的时候,直接从这个地方来取,而不是系统重新申请

线程用完了之后,也是还是回到刚才的地方

\"池\"---->本质上是为了提高程序的效率

内核态&用户态

操作系统=操作系统内核态+操作系统配套的应用程序

操作系统内核-->操作系统的核心功能部分,负责完成一个操作系统的核心工作(管理)

包括为软件提供稳定的运行环境和管理硬件

对应的,执行的很多代码逻辑都是要用户态的代码和内核态的代码配合完成的

应用程序有很多,这些应用程序都是由内核统一负责管理和服务

内核里的工作十分的繁忙--->所以提交给内核要做的任务可能是不可控的

        从系统创建线程,就相当于让银行的人给我银钱->这样的逻辑就是调用系统api,由系统内核执行一系列逻辑来完成这个过程

        直接从线程池里取,这就相当于是自助复印,整个过程都是纯用户态代码,都是自己控制的,整个过程更可控,效率更高

因此通常认为,纯用户态操作,就比经过内核态操作的效率更高

java标准库中提供了现成的线程池

标准库提供了类ThreadPoolExcutor(构造方法有很多的参数)

带入的包是java.util.concurrent   concurrent的意思是并发

它的构造方法中有7个参数

1.corePoolSize:核心线程数-------正式员工的数量(正式员工,一旦录用,永不辞退)(最少有多少个)

2.maximunPoolSize:最大线程数---正式员工+临时员工的数量(临时工:一段时间不干活,就会被辞退)

最大线程数=核心线程数+非核心线程数

如果核心线程数和最大线程数一样的话,将不会自动扩容

核心线程会始终存在于线程池内部

非核心线程会在繁忙的时候被创建出来,不繁忙了,就会把这些线程真正的释放掉

3.keepAliveTimer:临时工允许的空闲时间

4.unit:keepAliveTime的时间单位,可以是分,是秒,或者是其他

5.workQueue:工作队列--传递任务的阻塞队列,可以传递你的任务Runnable

6.threadFactory:创造线程的工厂,参与具体的线程创建工作,通过不同线程工厂创建出的线程相当于对一些属性进行了不同的初始化设置

\"工厂\"是指\"工厂设计模式\"也是一种常见的设计模式

工厂设计模式是一种在创建类的实力时使用的设计模式,由于构造方法有\"坑\",通过工厂设计模式来填坑

就是Thread类的工厂类,通过这个类完成Thread的实例创建和初始化操作,此时ThreadFactory就可以针对线程池里的线程进行批量的设置属性,此处一般不会进行调整,就使用标准库提供的ThreadFactory的默认值就可以了---->ThreadFactory threadFactory

7.RejectedExecutionHandler:拒绝策略,如果任务量超过公司的负荷该如何处理

         7.1)AbortPolicy:超过负荷直接抛出异常

        7.2)CallerRunsPolicy():调用者负责处理多出来的任务

        7.3)DiscardOldestPolicy():丢弃队列中最老的任务

        7.4)DiscardPolicy():丢弃新来的任务

标准库中的线程池

1.使用Executors.newFixedThreadPool(10)能创建出固定包含十个线程的线程池

2.返回值类型为ExecutorService 

3.通过ExecutorService.submit 可以注册一个任务到线程池中

public class demo22 { public static void main1(String[] args) { ExecutorService service= Executors.newFixedThreadPool(4); // Executors的本质是ThreadPoolExecutor类的封装 //返回的类型是ExecutorService service.submit(new Runnable() {//此处也可以使用lambda @Override public void run() { System.out.println(\"Hello World!\"); } }); service.shutdown();//service是前端线程,得主动结束 } public static void main(String[] args) { ExecutorService service= Executors.newFixedThreadPool(4); // Executors的本质是ThreadPoolExecutor类的封装 //返回的类型是ExecutorService service.submit(()->{//此处可以使用lambda System.out.println(\"Hello World!\"); }); service.shutdown();//service是前端线程,得主动结束 }}

Executors创建线程池的几种方式

1.newFixedThreadPool:创建固定线程数的线程池

核心线程数和最大线程数一样,所以不会扩容

2.newCachedThreadPool:创建线程数目动态增长的线程池

设置了非常大的线程数,自动扩容

3.newSingleThreadExcutor:创建只包含单个线程的线程池

4.newScheduledThreadPool:设定延迟时间后执行命令,或定期执行命令,是进阶版的Timer

Executors的本质上是ThreadPoolExecutor类的封装

这里的i报错了,涉及的知识点是变量捕获,只能捕获final或者事实final

可以

创建一个新的变量id,来得到i的值,并且id是不会改变的,这样即可

问题:使用线程池的时候,需要指定线程个数,线程个数如何指定,指定多少合适?

        实际开发中,更建议的做法是通过实验的方式找到一个合适的线程池的个数的值

        给线程池设置不同的线程数,分别进行性能测试是,关注响应时间/消耗的资源指标,挑选一个比较合适的数值

线程池的模拟实现

import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;class myThreadPool{ BlockingQueue queue=new ArrayBlockingQueue(1000); public myThreadPool(int n){ for(int i=0;i{  while(true){ try { Runnable runnable =queue.take(); runnable.run(); } catch (InterruptedException e) { e.printStackTrace(); }  } }); thread.start(); } } public void sunbmit(Runnable runnable){ try { queue.put(runnable); } catch (InterruptedException e) { e.printStackTrace(); } }}public class demo23 { public static void main(String[] args) { myThreadPool myThreadPool=new myThreadPool( 4); for(int i=0;i{ System.out.println(\"执行任务\"+id+\", \"+Thread.currentThread().getName()); }); } }}