【源码篇】LinkedBlockingQueue源码超详细解读
文章目录
-
- ⭐️导读
- 🚩基本构造
- 📖核心源码解读
-
- `put`方法
- `poll/take`方法
- `drainTo`方法
- `remove`方法
- ⚠️注意事项
- 🚗应用场景
⭐️导读
LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。
🚩基本构造
阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue
是提供Queue
的基本实现,我们重点关注Queue
、BlockingQueue
提供的api。
Queue
类提供最基本的API
// 插入,失败返回异常boolean add(E e);// 插入,失败返回falseboolean offer(E e);// 移除队列头部元素,队列为空异常E remove();// 移除队列头部元素,队列为空返回nullE poll();// 查看头部元素,队列为空异常E element();// 查看头部元素,队列为空返回nullE peek();
BlockingQueue
提供阻塞操作相关API
// 将元素插入队列,如果队列没有可用空间则等待void put(E e);// 将元素插入队列,如果队列没用可用空间则等待设定的时间boolean offer(E e, long timeout, TimeUnit unit);// 移除头部元素,如果没有可用则等待E take();// 移除头部元素,如果没有可用则等待设定的时间E poll(long timeout, TimeUnit unit);// 返回剩余可插入的元素数量int remainingCapacity();// 从队列中取出全部的元素并插入到指定集合中int drainTo(Collection<? super E> c);// 从队列中取出指定数量的元素并插入到指定集合中int drainTo(Collection<? super E> c, int maxElements);
📖核心源码解读
LinkedBlockingQueue
分别使用了一个读锁和一个写锁来控制并发,并使用Condition
来控制他们的执行过程
// 读锁private final ReentrantLock takeLock = new ReentrantLock();// 队列不为空的Conditionprivate final Condition notEmpty = takeLock.newCondition();// 写锁private final ReentrantLock putLock = new ReentrantLock();// 队列没有满的Conditionprivate final Condition notFull = putLock.newCondition();
put
方法
将元素插入队列,如果队列没有可用空间则等待
public void put(E e) throws InterruptedException { // 如果元素是null抛出异常 if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); // 使用写锁 final ReentrantLock putLock = this.putLock; // 元素数量计数器 final AtomicInteger count = this.count; // 获得锁 putLock.lockInterruptibly(); try { // 判断队列是否已满,如果已满写线程等待 while (count.get() == capacity) { notFull.await(); } // 将尾部元素指向当前元素 enqueue(node); // 元素数量+1并返回操作前的数量 c = count.getAndIncrement(); // 如果元素数量没有满,则唤醒notFull.wait(),表示当前队列未满 if (c + 1 < capacity) notFull.signal(); } finally { // 解锁 putLock.unlock(); } if (c == 0) // 如果操作前元素数量为0,则通知写线程 signalNotEmpty();}
此处signalNotEmpty();
就是通知被阻塞的读线程(如take/poll方法),队列里有数据了,赶紧消费
poll/take
方法
poll 查看头部元素,队列为空异常
take 移除并返回头部元素,如果没有可用则等待
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; // 等待纳秒数 long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; // 使用读锁 final ReentrantLock takeLock = this.takeLock; // 加锁 takeLock.lockInterruptibly(); try { // 如果当前队列中没有元素则等待指定时长 while (count.get() == 0) { if (nanos <= 0) // 等待超时,直接返回null return null; nanos = notEmpty.awaitNanos(nanos); } // 移除队列头中的节点并返回 x = dequeue(); // 元素-1 c = count.getAndDecrement(); // 如果队列中有数据,则通知其他线程该队列不为空 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 通知其他线程表示该队列未满 if (c == capacity) signalNotFull(); return x;}
此处signalNotFull();
是通知阻塞的写入线程(如put/offer),表示队列没满,可以写入
take逻辑与poll类似,只是等待策略不相同,take方法如下
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 等待逻辑与poll不一样,此处表示如果没有数据则一直等待 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x;}
drainTo
方法
从队列中取出全部的元素并插入到指定集合中
public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; boolean signalNotFull = false; // 使用读锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 读取数量不能大于剩余元素数量 int n = Math.min(maxElements, count.get()); // 从头部开始读取,h表示当前头节点 Node<E> h = head; int i = 0; try { while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { if (i > 0) { // 有读取出来元素,则更新最新头节点 head = h; // 队列是否还有空间 signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); }}
remove
方法
public boolean remove(Object o) { if (o == null) return false; // 加锁 fullyLock(); try { // 遍历全部元素 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { // 移除链接 unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); }}
注意一下此处的加锁逻辑
void fullyLock() { putLock.lock(); takeLock.lock();}
可以看到,remove方法会将读写锁都上锁,并且会扫描整个链表,时间复杂度为O(n)+悲观锁
。
一般情况下不建议使用remove方法,该方法性能较差,会阻塞所有核心逻辑。
⚠️注意事项
使用LinkedBlockingQueue
时要额外注意影响性能的方法
如:remove
/contains
/toArray
/toString
/clear
以上方法的时间复杂度均为O(n)+悲观锁
,如非必要最好不要使用
🚗应用场景
LinkedBlockingQueue本质上就是个内存级队列,它同样可以达到削峰填谷的目的,使用得当可以给系统减轻不小的压力。
- 调度外部服务,防止调用过于频繁,可以放入队列中,等待消费,并用
drainTo
归集然后统一请求。 - 令牌桶,可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
- 使用对象池化技术来减轻jvm回收的压力,将池化对象放入队列中。
下面使用LinkedBlockingQueue实现一个对象池,使用对象池可以防止频繁创建/回收对象,减少gc次数,池化对象长期存储在老年代中,对象数量可控
ResourcePool
对象池抽象类,实现该类就能初始化一个对象池
public abstract class ResourcePool<T extends ResourceModel> { private final LinkedBlockingQueue<T> queue; public ResourcePool(int poolMax) { queue = new LinkedBlockingQueue<>(poolMax); for (int i = 0; i < poolMax; i++) { T model = createResource(); model.pool = this; model.invalid = true; queue.add(model); } } public T getResource() { try { do { T t = queue.take(); if (t.invalid) { t.invalid = false; return open(t); } } while (true); } catch (InterruptedException e) { throw new RuntimeException(e); } } protected T open(T t) { return t; } protected abstract T createResource(); public void free(T t) { if (!t.invalid) { t.invalid = true; queue.offer(close(t)); } } protected T close(T t) { return t; }}
ResourceModel
抽象对象
public abstract class ResourceModel implements Closeable { ResourcePool pool; boolean invalid; @Override public void close() throws IOException { pool.free(this); }}
TestModel
对象实例
@Setterpublic class TestModel extends ResourceModel { public TestModel(String name, int age) { this.name = name; this.age = age; } private String name; private int age;}
TestPool
对象池实例
public class TestPool extends ResourcePool<TestModel> { public TestPool(int poolMax) { super(poolMax); } // 创建对象实例 @Override protected TestModel createResource() { return new TestModel("", 0); } // 获得对象的前置操作 @Override protected TestModel open(TestModel testModel) { return super.open(testModel); } // 对象回收后操作 @Override protected TestModel close(TestModel testModel) { testModel.setAge(0); testModel.setName(""); return super.close(testModel); }}
使用方式1
public static void main(String[] args) throws IOException { TestPool testPool = new TestPool(30); // 从池中获得一个对象 TestModel model = testPool.getResource(); // 回收对象 model.close();}
使用方式2
public static void main(String[] args) throws IOException { TestPool testPool = new TestPool(30); try(TestModel model = testPool.getResource()) { }}