> 文档中心 > 【源码篇】LinkedBlockingQueue源码超详细解读

【源码篇】LinkedBlockingQueue源码超详细解读

文章目录

    • ⭐️导读
    • 🚩基本构造
    • 📖核心源码解读
      • `put`方法
      • `poll/take`方法
      • `drainTo`方法
      • `remove`方法
    • ⚠️注意事项
    • 🚗应用场景

⭐️导读

LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。

🚩基本构造

【源码篇】LinkedBlockingQueue源码超详细解读

阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue是提供Queue的基本实现,我们重点关注QueueBlockingQueue提供的api。

Queue类提供最基本的API

// 插入,失败返回异常boolean add(E e);// 插入,失败返回falseboolean offer(E e);// 移除队列头部元素,队列为空异常E remove();// 移除队列头部元素,队列为空返回nullE poll();// 查看头部元素,队列为空异常E element();// 查看头部元素,队列为空返回nullE peek();

BlockingQueue提供阻塞操作相关API

// 将元素插入队列,如果队列没有可用空间则等待void put(E e);// 将元素插入队列,如果队列没用可用空间则等待设定的时间boolean offer(E e, long timeout, TimeUnit unit)// 移除头部元素,如果没有可用则等待E take();// 移除头部元素,如果没有可用则等待设定的时间E poll(long timeout, TimeUnit unit);// 返回剩余可插入的元素数量int remainingCapacity();// 从队列中取出全部的元素并插入到指定集合中int drainTo(Collection<? super E> c);// 从队列中取出指定数量的元素并插入到指定集合中int drainTo(Collection<? super E> c, int maxElements);

📖核心源码解读

LinkedBlockingQueue分别使用了一个读锁和一个写锁来控制并发,并使用Condition来控制他们的执行过程

// 读锁private final ReentrantLock takeLock = new ReentrantLock();// 队列不为空的Conditionprivate final Condition notEmpty = takeLock.newCondition();// 写锁private final ReentrantLock putLock = new ReentrantLock();// 队列没有满的Conditionprivate final Condition notFull = putLock.newCondition();

put方法

将元素插入队列,如果队列没有可用空间则等待

public void put(E e) throws InterruptedException {    // 如果元素是null抛出异常    if (e == null) throw new NullPointerException();    int c = -1;    Node<E> node = new Node<E>(e);    // 使用写锁    final ReentrantLock putLock = this.putLock;    // 元素数量计数器    final AtomicInteger count = this.count;    // 获得锁    putLock.lockInterruptibly();    try { // 判断队列是否已满,如果已满写线程等待 while (count.get() == capacity) {     notFull.await(); } // 将尾部元素指向当前元素 enqueue(node); // 元素数量+1并返回操作前的数量 c = count.getAndIncrement(); // 如果元素数量没有满,则唤醒notFull.wait(),表示当前队列未满 if (c + 1 < capacity)     notFull.signal();    } finally { // 解锁 putLock.unlock();    }    if (c == 0) // 如果操作前元素数量为0,则通知写线程 signalNotEmpty();}

此处signalNotEmpty();就是通知被阻塞的读线程(如take/poll方法),队列里有数据了,赶紧消费

poll/take方法

poll 查看头部元素,队列为空异常

take 移除并返回头部元素,如果没有可用则等待

public E poll(long timeout, TimeUnit unit) throws InterruptedException {    E x = null;    int c = -1;    // 等待纳秒数    long nanos = unit.toNanos(timeout);    final AtomicInteger count = this.count;    // 使用读锁    final ReentrantLock takeLock = this.takeLock;    // 加锁    takeLock.lockInterruptibly();    try { // 如果当前队列中没有元素则等待指定时长 while (count.get() == 0) {     if (nanos <= 0)  // 等待超时,直接返回null  return null;     nanos = notEmpty.awaitNanos(nanos); } // 移除队列头中的节点并返回 x = dequeue(); // 元素-1 c = count.getAndDecrement(); // 如果队列中有数据,则通知其他线程该队列不为空 if (c > 1)     notEmpty.signal();    } finally { takeLock.unlock();    }    // 通知其他线程表示该队列未满    if (c == capacity) signalNotFull();    return x;}

此处signalNotFull();是通知阻塞的写入线程(如put/offer),表示队列没满,可以写入

take逻辑与poll类似,只是等待策略不相同,take方法如下

public E take() throws InterruptedException {    E x;    int c = -1;    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    takeLock.lockInterruptibly();    try { // 等待逻辑与poll不一样,此处表示如果没有数据则一直等待 while (count.get() == 0) {     notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1)     notEmpty.signal();    } finally { takeLock.unlock();    }    if (c == capacity) signalNotFull();    return x;}

drainTo方法

从队列中取出全部的元素并插入到指定集合中

public int drainTo(Collection<? super E> c, int maxElements) {    if (c == null) throw new NullPointerException();    if (c == this) throw new IllegalArgumentException();    if (maxElements <= 0) return 0;    boolean signalNotFull = false;    // 使用读锁    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();    try { // 读取数量不能大于剩余元素数量 int n = Math.min(maxElements, count.get()); // 从头部开始读取,h表示当前头节点 Node<E> h = head; int i = 0; try {     while (i < n) {  Node<E> p = h.next;  c.add(p.item);  p.item = null;  h.next = h;  h = p;  ++i;     }     return n; } finally {     if (i > 0) {  // 有读取出来元素,则更新最新头节点  head = h;  // 队列是否还有空间  signalNotFull = (count.getAndAdd(-i) == capacity);     } }    } finally { takeLock.unlock(); if (signalNotFull)     signalNotFull();    }}

remove方法

public boolean remove(Object o) {    if (o == null) return false;    // 加锁    fullyLock();    try { // 遍历全部元素 for (Node<E> trail = head, p = trail.next;      p != null;      trail = p, p = p.next) {     if (o.equals(p.item)) {  // 移除链接  unlink(p, trail);  return true;     } } return false;    } finally { fullyUnlock();    }}

注意一下此处的加锁逻辑

void fullyLock() {    putLock.lock();    takeLock.lock();}

可以看到,remove方法会将读写锁都上锁,并且会扫描整个链表,时间复杂度为O(n)+悲观锁

一般情况下不建议使用remove方法,该方法性能较差,会阻塞所有核心逻辑。

⚠️注意事项

使用LinkedBlockingQueue时要额外注意影响性能的方法

如:remove/contains/toArray/toString/clear

以上方法的时间复杂度均为O(n)+悲观锁,如非必要最好不要使用

🚗应用场景

LinkedBlockingQueue本质上就是个内存级队列,它同样可以达到削峰填谷的目的,使用得当可以给系统减轻不小的压力。

  1. 调度外部服务,防止调用过于频繁,可以放入队列中,等待消费,并用drainTo归集然后统一请求。
  2. 令牌桶,可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
  3. 使用对象池化技术来减轻jvm回收的压力,将池化对象放入队列中。

下面使用LinkedBlockingQueue实现一个对象池,使用对象池可以防止频繁创建/回收对象,减少gc次数,池化对象长期存储在老年代中,对象数量可控

ResourcePool 对象池抽象类,实现该类就能初始化一个对象池

public abstract class ResourcePool<T extends ResourceModel> {    private final LinkedBlockingQueue<T> queue;    public ResourcePool(int poolMax) { queue = new LinkedBlockingQueue<>(poolMax); for (int i = 0; i < poolMax; i++) {     T model = createResource();     model.pool = this;     model.invalid = true;     queue.add(model); }    }    public T getResource() { try {     do {  T t = queue.take();  if (t.invalid) {      t.invalid = false;      return open(t);  }     } while (true); } catch (InterruptedException e) {     throw new RuntimeException(e); }    }    protected T open(T t) { return t;    }    protected abstract T createResource();    public void free(T t) { if (!t.invalid) {     t.invalid = true;     queue.offer(close(t)); }    }    protected T close(T t) { return t;    }}

ResourceModel抽象对象

public abstract class ResourceModel implements Closeable {    ResourcePool pool;    boolean invalid;    @Override    public void close() throws IOException { pool.free(this);    }}

TestModel对象实例

@Setterpublic class TestModel extends ResourceModel {    public TestModel(String name, int age) { this.name = name; this.age = age;    }    private String name;    private int age;}

TestPool对象池实例

public class TestPool extends ResourcePool<TestModel> {    public TestPool(int poolMax) { super(poolMax);    }    // 创建对象实例    @Override    protected TestModel createResource() { return new TestModel("", 0);    }    // 获得对象的前置操作    @Override    protected TestModel open(TestModel testModel) { return super.open(testModel);    }    // 对象回收后操作    @Override    protected TestModel close(TestModel testModel) { testModel.setAge(0); testModel.setName(""); return super.close(testModel);    }}

使用方式1

public static void main(String[] args) throws IOException {    TestPool testPool = new TestPool(30);    // 从池中获得一个对象    TestModel model = testPool.getResource();    // 回收对象    model.close();}

使用方式2

public static void main(String[] args) throws IOException {    TestPool testPool = new TestPool(30);    try(TestModel model = testPool.getResource()) {     }}