【重学数据结构】队列 Queue
目录
什么是队列?
延迟队列介绍
说明
二叉堆结构
实现延迟队列
实现说明
入队操作
为什么元素是队列头部就要唤醒正在等待的消费者线程?
出队操作
优先级队列是如何实现的?
入队操作
出队操作
自测过程
什么是队列?
队列是一种先进先出的线性数据结构,将元素添加到队列后的操作称为入队,从队列中移除元素的操作称为出队。队列还分为 单端队列(queue) 和 双端队列(deque) 。
- 从理论上讲,队列的一个特征是它没有特定的容量。不管已经包含多少元素,总是可以再添加一个新元素。
- 队列既可以是数组实现也可以是链表实现。所以当我们在 Java 中使用队列的时候,Deque 的实现类就是;LinkedList 和 ArrayDeque的实现类。
- 队列不只是单端从一个口入另外一个口出,也可以是双端队列。例如在 Java 中 Queue 是单端队列接口、Deque 是双端队列接口,都有对应的实现类
延迟队列介绍
这里我们来扩展实现一个延迟队列,并在这个过程中会涉及到阻塞队列、优先队列的使用。通过这样的一个手写源码来学习队列的扩展使用。
说明
DelayQueue 是一个 BlockingQueue(无界阻塞)队列,它封装了一个使用完全二叉堆排序元素的 PriorityQueue(优先队列)。在添加元素时使用 Delay(延迟时间)作为排序条件,延迟最小的元素会优先放到队首。
这个延迟队列中用到的排序方式就是 PriorityQueue 优先队列,它的数据结构是数组实现的队列,但体现形式是一棵二叉堆树结构。在元素存放时,通过对存放元素的比较和替换形成二叉堆结构。
二叉堆结构
二叉堆是一种特殊结构的堆,它的表现形态可以是一棵完整或近似二叉树的结构。我们要实现的延迟队列中的元素存放,使用的就是 PriorityQueue 实现的平衡二叉堆结构,数据以队列形式存放在基础数组中。
父子节点索引关系:
- 假如父节点为queue[n],那么左子节点为queue[2n+1],右子节点为queue[2n+2]
- 任意孩子节点的父节点位置,都是 (n-1)>>>1 相当于减1后除2取整
节点间大小关系:
- 父节点小于等于任意孩子节点
- 同一层级的两个孩子节点大小不需要维护,它是在弹出元素的时候进行判断的
实现延迟队列
实现说明
- 延迟队列的使用,是以在 DelayQueue 中存放实现了 Delayed 延迟接口的对象。因为只有实现这个对象,才能比较出当前元素与所需存放到对应位置的一个比对计算过程。
- 另外这里的核心点包括:PriorityQueue —— 优先队列、ReentrantLock —— 可重入锁、Condition —— 信号量
包含属性
public class DelayQueue implements BlockingQueue{ // 可重入锁,用于保证线程安全 private final ReentrantLock lock = new ReentrantLock(); // 优先级队列 private final PriorityQueue pq = new PriorityQueue(); // 条件变量,用于实现阻塞等待 private final Condition available = lock.newCondition();}
入队操作
@Override public boolean add(E e) { offer(e); return true; } @Override public boolean offer(E e) { lock.lock(); try { pq.offer(e); // 如果新添加的元素成为队列头部(延迟时间最短) // 需要唤醒可能正在等待的消费者线程 if (pq.peek() == e) { available.signal(); } return true; } finally { lock.unlock(); } }
为什么元素是队列头部就要唤醒正在等待的消费者线程?
考虑以下场景:
- 队列中原有元素A的延迟时间是10秒
- 消费者线程正在等待A到期
- 此时插入新元素B,延迟时间只有2秒
如果不唤醒等待的消费者线程:
- 消费者线程会继续等待10秒(基于元素A的延迟时间)
- 但实际上2秒后元素B就可以被消费了
- 这样就造成了8秒的不必要等待
出队操作
@Override public E poll() { lock.lock(); try { E first = pq.peek(); // 如果队列为空,或者头部元素延迟时间未到 if (first == null || first.getDelay(NANOSECONDS) > 0) { return null; } else { return pq.poll(); } } finally { lock.unlock(); } }
为什么没有实现接口,first.getDelay也能正确获取到延迟时间?
优先级队列是如何实现的?
包含属性
public class PriorityQueue implements Queue { // 日志记录器,用于记录队列操作信息 private Logger logger = LoggerFactory.getLogger(PriorityQueue.class); // 队列默认初始容量,使用堆结构的典型初始大小 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 使用Object数组存储队列元素,保存的是Comparable对象 private Object[] queue; // 记录队列中实际元素的数量 private int size;}
入队操作
@Override public boolean offer(E e) { if (e == null) { throw new NullPointerException(); } // 看是否需要扩容 int i = size; if (i >= queue.length) { // i+1表示插入新元素后所需的最小容量 grow(i + 1); } size = i + 1; // 插入元素 if (i == 0) { queue[i] = e; } else { // 目标值上浮 siftUp(i, e); } return true; }
上浮过程
private void siftUp(int k, E x) { Comparable key = (Comparable) x; logger.info(\"【入队】元素:{} 当前队列:{}\", JSON.toJSONString(key), JSON.toJSONString(queue)); while (k > 0) { int parent = (k - 1) >>> 1; logger.info(\"【入队】寻找当前节点的父节点位置。k:{} parent:{}\", k, parent); Object e = queue[parent]; // 如果当前位置元素,大于父节点元素,则退出循环 if (key.compareTo((E) e) >= 0) { logger.info(\"【入队】值比对,父节点:{} 目标节点:{}\", e, key); break; } // 相反父节点位置大于当前位置元素,则进行替换 logger.info(\"【入队】替换过程,父子节点位置替换,继续循环,父节点:{}, 存放到位置:{}\", JSON.toJSONString(e), k); queue[k] = e; k = parent; } // 此时的k为key上浮的最终位置 queue[k] = key; logger.info(\"【入队】完成 Idx:{},Val:{}, 当前队列:{}\", k, JSON.toJSONString(key), JSON.toJSONString(queue));}
上浮过程画图举例说明
出队操作
@Override public E poll() { if (size == 0) { return null; } E result = (E) queue[0]; int s = --size; E x = (E) queue[s]; queue[s] = null; if (s != 0) { //因为堆是一种完全二叉树结构,删除堆顶元素后,为了保持树的结构完整(即完全二叉树的特性), // 我们通常会将最后一个元素移动到堆顶,然后通过下沉操作找到合适的位置。 siftDown(0, x); } return result; }
下沉过程
private void siftDown(int k, E x) { Comparable key = (Comparable) x; // 找到非叶子结点的位置(因为只有非叶子节点才有左右子节点) int notLeaf = size >>> 1; while (k < notLeaf) { int child = (k << 1) + 1; Object c = queue[child]; int right = child + 1; // 左右子节点比对,取最小的节点 if (right < size && ((Comparable) c).compareTo((E) queue[right]) > 0) { logger.info(\"【出队】左右子节点比对,获取最小值。left:{} right:{}\", JSON.toJSONString(c), JSON.toJSONString(queue[right])); // 右子节点小,把right索引值赋给child,child此时就是代表最小的子节点索引值 c = queue[child = right]; } // 目标值与子节点比对,当目标值小于子节点值,退出循环。说明此时目标值所在位置适合,迁移完成。 if (key.compareTo((E) c) <= 0) { break; } // 当目标值大于子节点值,位置替换,继续比较 logger.info(\"【出队】替换过程,节点的值比对。上节点:{},下节点:{} 位置替换\", JSON.toJSONString(queue[k]), JSON.toJSONString(c)); queue[k] = c; k = child; } // 迁移完成,将目标值存放到k位置,k此时应该是一个叶子结点位置或者目标值所在位置适合 logger.info(\"【出队】替换结果,最终更换位置。Idx:{} Val:{}\", k, JSON.toJSONString(key)); queue[k] = key;}
下沉过程画图举例说明
自测过程
package queue.test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import queue.DelayQueue;import queue.Delayed;import queue.Queue;import java.util.concurrent.TimeUnit;public class QueueTest { private static final Logger logger = LoggerFactory.getLogger(QueueTest.class); public static void main(String[] args) throws InterruptedException { Queue queue = new DelayQueue(); queue.add(new Job(\"1号\", 1000L)); queue.add(new Job(\"3号\", 3000L)); queue.add(new Job(\"5号\", 5000L)); queue.add(new Job(\"11号\", 11000L)); queue.add(new Job(\"4号\", 4000L)); queue.add(new Job(\"6号\", 6000L)); queue.add(new Job(\"7号\", 7000L)); queue.add(new Job(\"12号\", 12000L)); queue.add(new Job(\"15号\", 15000L)); queue.add(new Job(\"10号\", 10000L)); queue.add(new Job(\"9号\", 9000L)); queue.add(new Job(\"8号\", 8000L)); while (true) { Job poll = queue.poll(); if (null == poll) { Thread.sleep(10); continue; } logger.info(poll.getName()); } } static class Job implements Delayed { private final String name; private final Long begin; private final Long delayTime; public Job(String name, Long delayTime) { this.name = name; this.begin = System.currentTimeMillis(); this.delayTime = delayTime;//延时时长 } @Override public long getDelay(TimeUnit unit) { return unit.convert(begin + delayTime - System.currentTimeMillis(), TimeUnit.MICROSECONDS); } public String getName() { return name; } @Override public int compareTo(Delayed o) { Job job = (Job) o; return (int) (this.getDelay(TimeUnit.MICROSECONDS) - job.getDelay(TimeUnit.MICROSECONDS)); } }}
测试结果如图:
常见问题
单端队列和双端队列,分别对应的实现类是哪个?
单端队列的接口为Queue:对应实现类有LinkedList、ArrayDeque、PriorityQueue
双端队列的接口为Deque:对应实现类有LinkedList、ArrayDeque
简述延迟队列/优先队列的实现方式
延迟队列是基于优先级队列实现的(依赖PriorityQueue),根据延迟队列的延迟时间在优先级队列中排序,除此之外,为了保证入队和出队的线程安全,引入可重入锁和Condition来唤醒等待消费的线程。
优先队列根据实现了Comparable接口的类进行排序,它的数据结构是数组实现的队列,但体现形式是一棵二叉堆树结构。
二叉堆插入/弹出元素的过程
- 插入过程:首先将元素插入到末尾,然后进行上浮的过程(如果元素大于其父节点元素说明位置合适,否则跟父节点位置进行替换,然后继续循环,直到当前位置的元素大于父节点元素或当前位置索引走到了0就退出for循环。此时的k就是上浮的最终位置,再赋值就好)。
- 弹出过程:首先弹出堆顶元素,然后将最后一个元素移动到堆顶,通过下沉操作找到合适的位置。下沉操作的流程是当前位置索引小于非叶子节点索引就进入while循环,因为只有非叶子节点才有子节点,循环里需要进行左右节点比对选出最小的子节点,然后父节点和子节点比较,大于就交换位置。走出while循环代表迁移完成,此时的k就是下沉的最终位置,再赋值就好。
延迟队列的使用场景
延迟队列可用于定时调度某个时间点执行的任务。例如,定时发送邮件、定时发布社交媒体帖子等。
延迟队列为什么添加信号量
信号量用于线程的等待和唤醒,比如说上文所说的入队操作时就要唤醒线程,因为可以减少线程不必要的等待时间。