> 文档中心 > redis scan 命令底层原理(为什么会重复扫描?)

redis scan 命令底层原理(为什么会重复扫描?)

文章目录

  • 前言
  • 一、迭代
    • 1. 全遍历
    • 2. 间断遍历
  • 二、scan 扫描原理
    • 1. 扫描算法:
    • 2. 减少重复扫描?
      • 2.1 扩容
      • 2.2 缩容
    • 3. 迭代过程中正在进行rehash
    • 4. 完整的 scan 逻辑
  • 总结

前言

本文参考源码版本为 redis 6.2

迭代器——可在容器(容器可为字典、链表等数据结构)上遍访的接口,设计人员无须关心容器的内容,调用迭代器固定的接口就可遍历数据,在很多高级语言中都有实现。

字典迭代器主要用于迭代字典这个数据结构中的数据,既然是迭代字典中的数据,必然会出现一个问题,迭代过程中,如果发生了数据增删,则可能导致字典触发 rehash 操作,或迭代开始时字典正在进行 rehash 操作,从而导致一条数据可能多次遍历到。

那 redis 如何解决这个问题呢?带着这个疑问,我们接下来一起看下迭代器的实现。


一、迭代器

我们先来看看 redis 中字典迭代器的数据结构

typedef struct dictIterator {    dict *d;  // 迭代的字典    long index;  // 当前迭代的hash表索引值(hash槽)    int table, safe; // table 表示当前迭代的hash表,ht[0] or ht[1];table 表示当前迭代器是否为安全迭代器    dictEntry *entry, *nextEntry; // 当前节点、下个节点    /* unsafe iterator fingerprint for misuse detection. */    long long fingerprint; // 指纹,当字典没有改变时,指纹不变;当字典值发生改变,则指纹也随之改变} dictIterator;

整个数据结构占用了 48 字节,其中:

  • d 字段指向需要迭代的字典;
  • index 字段代表当前读取到 hash 表中哪个索引值;
  • table 字段表示当前正在迭代的 hash 表(即 ht[0] 与 ht[1] 中的 0 和 1);
  • safe 字段表示当前创建的迭代器是否为安全模式;
  • entry 字段表示正在读取的节点数据;
  • nextEntry 字段表示 entry 节点中的 next 字段所指向的数据。

fingerprint 字段是一个 64 位的整数,表示在给定时间内字典的状态。在这里称其为字典的指纹,因为该字段的值为字典(dict结构体)中所有字段值组合在一起生成的 hash 值,所以当字典中数据发生任何变化时,其值都会不同,生成算法不做过多解读,读者可参见源码 dict.c文件中的 dictFingerprint 函数。

为了让迭代过程变得简单,redis 也提供了迭代相关的 API 函数,主要为:

dictIterator *dictGetIterator(dict *d);dictIterator *dictGetSafeIterator(dict *d);dictEntry *dictNext(dictIterator *iter);void dictReleaseIterator(dictIterator *iter);

我们把迭代器遍历数据分为两类:

  • 普通迭代器,只遍历数据;
  • 安全迭代器,遍历的同时删除数据。

1)普通迭代器

普通迭代器迭代字典中数据时,会对迭代器中 fingerprint 字段的值作严格的校验,来保证迭代过程中字典结构不发生任何变化,确保读取出的数据不出现重复

当 redis 执行部分命令时会使用普通迭代器迭代字典数据,例如 sort 命令。sort 命令主要作用是对给定列表、集合、有序集合的元素进行排序,如果给定的是有序集合,其成员名存储用的是字典,分值存储用的是跳跃表,则执行 sort 命令读取数据的时候会用到迭代器来遍历整个字典。

普通迭代器迭代数据的过程比较简单,主要分为如下几个步骤。

  • 调用 dictGetIterator 函数初始化一个普通迭代器,此时会把 iter->safe 值置为 0,表示初始化的迭代器为普通迭代器。
  • 循环调用 dictNext 函数依次遍历字典中 hash 表的节点,首次遍历时会通过 dictFingerprint 函数拿到当前字典的指纹值。
  • 当调用 dictNext 函数遍历完字典 hash 表中节点数据后,释放迭代器时会继续调用dictFingerprint 函数计算字典的指纹值,并与首次拿到的指纹值比较,不相等则输出异常"=== ASSERTION FAILED ===",且退出程序执行。

普通迭代器通过步骤1、步骤3的指纹值对比,来限制整个迭代过程中只能进行迭代操作,即迭代过程中字典数据的修改、添加、删除、查找等操作都不能进行,只能调用dictNext函数迭代整个字典,否则就报异常,由此来保证迭代器取出数据的准确性

值得注意的是

指纹的计算,通过字典的几个属性进行一定规则运算,当字典有增删改等操作时,这些属性就会有变动,从而导致前后指纹不一致,直接抛出异常。

2)安全迭代器

安全迭代器和普通迭代器迭代数据原理类似,也是通过循环调用 dictNext 函数依次遍历字典中 hash 表的节点。

安全迭代器确保读取数据的准确性,不是通过限制字典的部分操作来实现的,而是通过限制rehash 的进行来确保数据的准确性,因此迭代过程中可以对字典进行增删改查等操作

在字典进行增删查改的时候,都会调用 _dictRehashStep 来尝试进行 rehash,源码为:

static void _dictRehashStep(dict *d) {    if (d->iterators == 0) dictRehash(d,1);}

原理是,如果当前字典有安全迭代器运行,则不进行渐进式 rehash 操作,rehash 操作暂停,字典中数据就不会被重复遍历,由此确保了读取数据的准确性。

当 redis 执行部分命令时会使用安全迭代器迭代字典数据,例如 keys 命令。keys 命令主要作用是通过模式匹配,返回给定模式的所有 key 列表,遇到过期的键则会进行删除操作。redis 数据键值对都存储在字典中,因此 keys 命令会通过安全迭代器来遍历整个字典。安全迭代器整个迭代过程也较为简单,主要分如下几个步骤:

  • 第1步:调用 dictGetSafeIterator 函数初始化一个安全迭代器,此时会把 iter->safe 值置为1,表示初始化的迭代器为安全迭代器
  • 第2步:循环调用 dictNext 函数依次遍历字典中 hash 表的节点,首次遍历时会把字典中iterators 字段进行加 1 操作,确保迭代过程中渐进式 rehash 操作会被中断执行。
  • 第3步:当调用 dictNext 函数遍历完字典 hash 表中节点数据后,释放迭代器时会把字典中iterators 字段进行减 1 操作,确保迭代后渐进式 rehash 操作能正常进行。

安全迭代器是通过步骤1、步骤3中对字典的 iterators 字段进行修改,使得迭代过程中渐进式 rehash 操作被中断,由此来保证迭代器读取数据的准确性。

1. 全遍历

一次命令执行就遍历完整个数据库。比如执行 keys 命令进行一次数据库全遍历。

以上介绍的普通迭代器和安全迭代器都是一次性获取所有数据,由于 redis 单线程的原因,会造成一定的阻塞。

我们接下来看看,间断遍历如何在安全迭代器的基础上,解决这个阻塞问题。

2. 间断遍历

前文讲解了“全遍历”字典的实现,但有一个问题凸显出来,当数据库中有海量数据时,执行keys命令进行一次数据库全遍历,耗时肯定不短,会造成短暂的 redis 不可用,所以在 redis 在 2.8.0 版本后新增了 scan 操作,也就是“间断遍历”。

而 dictScan 是“间断遍历”中的一种实现,主要在迭代字典中数据时使用,例如 hscan 命令迭代整个数据库中的 key,以及 zscan 命令迭代有序集合所有成员与值时,都是通过 dictScan 函数来实现的字典遍历。dictScan 遍历字典过程中是可以进行 rehash 操作的,通过算法来保证所有的数据能被遍历到。

dictScan 函数间断遍历字典过程中会遇到如下 3 种情况。

  • 从迭代开始到结束,散列表没有进行 rehash 操作。
  • 从迭代开始到结束,散列表进行了扩容或缩容操作,且恰好为两次迭代间隔期间完成了 rehash 操作。
  • 从迭代开始到结束,某次或某几次迭代时散列表正在进行 rehash 操作。

总之,间断遍历的思想是每次命令执行只取部分数据,分多次遍历

二、scan 扫描原理

扫描算法的核心思想是:利用同模效应,减少扩容或者缩容过程中的 hash 槽重复扫描。

比如现在有,容量为8的 hash table,其遍历顺序为:
在这里插入图片描述
即 hash 槽下标遍历顺序:
redis scan 命令底层原理(为什么会重复扫描?)
从二进制变化来看,其实就是从左往右依次累加1,进位向右累计

而落实到具体实现上来看就是,将二进制反转后再进行常规+1,示例如下:

1 1 0 0  =>  反转0 0 1 1  => +1 操作0 1 0 0  => 再反转0 0 1 0

也就是说,12 (1 1 0 0) 之后的下一个遍历槽位是 2 (0 0 1 0)。

1. 扫描算法:

1)反转、+1、再反转

 /* Set unmasked bits so incrementing the reversed cursor  * operates on the masked bits */ v |= ~m0; /* Increment the reverse cursor */ v = rev(v); v++; v = rev(v);

2)反转算法

static unsigned long rev(unsigned long v) {    unsigned long s = CHAR_BIT * sizeof(v); // bit size; must be power of 2    unsigned long mask = ~0UL;    while ((s >>= 1) > 0) { mask ^= (mask << s); v = ((v >> s) & mask) | ((v << s) & ~mask);    }    return v;}

此算法是直接引用 位反转算法,具体可以点击查看,我们只需了解原理即可,不必深究。

2. 减少重复扫描?

上文提到,dictScan 函数间断遍历字典过程中会遇到如下 3 种情况。

  • 从迭代开始到结束,散列表没有进行 rehash 操作。
  • 从迭代开始到结束,散列表进行了扩容或缩容操作,且恰好为两次迭代间隔期间完成了rehash 操作。
  • 从迭代开始到结束,某次或某几次迭代时散列表正在进行 rehash 操作。

第一种情况最容易理解,常规扫描完即可。后面两种情况较为复杂。

下面主要以第二种情况主要说明:

比如我现在已经取槽位 12 (1 1 0 0) 数据,准备下一个槽位 2 (0 0 1 0) 数;碰巧,在这两次操作之间,完成了一次 rehash 扩容。

2.1 扩容

在这里插入图片描述
如上,我们即将遍历槽位6,此时,已经完成了一次扩容,容量从 8 扩容到 16;我们剩下待遍历的数据槽位有 6、1、5、3、7,原槽位上的数据在扩容后可能分散到两个槽位,比如槽位 6 的数据可能分散到新表的槽位 6、14。

在这里插入图片描述
对比两张图,可以发现,扩容一倍后,新表待遍历槽位都是我们目前还没有遍历的数据;换句话说,扩容操作不会重复遍历数据,同时也不会遗漏数据

2.2 缩容

我们以 hash table 容量由 16 缩容为 8 为例,如下图:
在这里插入图片描述
目前,我们刚遍历完槽位 6,即将遍历槽位 14,此时,完成了一次缩容 rehash;

由于槽位 14 已经超过目前缩容后的哈希表容量,所以会进行一次取模操作,即 14 % 8 = 6,因此,将读取槽位 6 上的数据;

值得注意的是,缩容后槽位 6 上会映射了原表槽位6、14的数据,因此,这里可能存在重复取原表槽位 6 的数据。

不过,庆幸的是,之后待扫描的数据都是没有处理过的,也就是不再有重复数据处理。

你可能已经注意到了,缩容操作可能引起合并槽位数据;但是,在原表处理的时候,两个可能合并的槽位都是连续处理的,这就意味着,在扫描的时候即使存在缩容操作,也最多重复扫描原表一个槽位的数据。将重复扫描的数据量降到了最低

简单总结下,缩容操作会重复扫描数据(最多重复扫描一个槽位),但不会遗漏数据

3. 迭代过程中正在进行rehash

从迭代开始到结束,某次或某几次迭代时散列表正在进行 rehash 操作,rehash 操作中会同时并存两个 hash 表:一张为扩容或缩容后的表 ht[1],一张为老表 ht[0], ht[0] 的数据通过渐进式 rehash 会逐步迁移到ht[1]中,最终完成整个迁移过程。

因为大小两表并存,所以需要从 ht[0] 和 ht[1] 中都取出数据,整个遍历过程为:先找到两个散列表中更小的表,先对小的 hash 表遍历,然后对大的 hash 表遍历,迭代的代码如下:

 t0 = &d->ht[0]; t1 = &d->ht[1]; //  确保始终先处理小表 if (t0->size > t1->size) {     t0 = &d->ht[1];     t1 = &d->ht[0]; } m0 = t0->sizemask; m1 = t1->sizemask;  if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; // 迭代第一张表 while (de) {      next = de->next;     fn(privdata, de);     de = next; } do {     if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);     de = t1->table[v & m1];     // 迭代第二张表     while (de) {  next = de->next;  fn(privdata, de);  de = next;     }     v |= ~m1;     v = rev(v);     v++;     v = rev(v); } while (v & (m0 ^ m1));

结合 rehash 中游标变更算法,为了让大家更好地理解该算法及整个迭代过程,举个例子简单讲解这种情况下迭代的顺序:假设 hash 表大小为8,扩容到 16,迭代从始至终每次迭代都在进行 rehash 操作,接下来两张表数据遍历次序如下图所示:
在这里插入图片描述
如果是缩容操作,其实和扩容操作遍历顺序是一样的,因为在处理的时候总是先处理小表,然后再处理大表。

值得注意的是,因为后台线程也会定期执行 rehash 操作,此过程会导致部分数据重复访问。

为啥会重复?

你想,一个用户线程和一个后台线程同时处理,比如,刚扫描完一个小表的 hash 槽,准备继续扫描对应大表的 hash 槽,此时,后台线程将一部分刚才扫描过的 hash 槽部分数据 rehash 到对应大表上,这种情况下,便会出现部分重复数据。

当然,这种同模的小-大表连续处理,间隙基本很短,因此,重复概率较小,影响不大。

来看看后台线程什么时候执行:

1)server.c#databasesCron

定时任务定期处理

   /* Rehash */   if (server.activerehashing) {for (j = 0; j < dbs_per_call; j++) {    int work_done = incrementallyRehash(rehash_db);    if (work_done) { /* If the function did some work, stop here, we'll do  * more at the next cron loop. */ break;    } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db++; rehash_db %= server.dbnum;    }}   }

2)处理指定的 dbid

int incrementallyRehash(int dbid) {    /* Keys dictionary */    if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */    }    /* Expires */    if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return 1; /* already used our millisecond for this loop... */    }    return 0;}

3)最终,通过调用 dictRehashMilliseconds 执行 rehash 操作

可以指定 rehash 的时间,后台线程指定的是 1ms

int dictRehashMilliseconds(dict *d, int ms) {    if (d->iterators > 0) return 0;    long long start = timeInMilliseconds();    int rehashes = 0;    while(dictRehash(d,100)) { rehashes += 100; if (timeInMilliseconds()-start > ms) break;    }    return rehashes;}

4. 完整的 scan 逻辑

1) 一次 dictScan 调用会扫描一个 hash 槽数据

一般情况下确实如此,但是,当正在 rehash 时,小表扫描一个 hash 槽数据,而大表则扫描小表对应的多个 hash 槽数据。

unsigned long dictScan(dict *d,  unsigned long v,  dictScanFunction *fn,  dictScanBucketFunction* bucketfn,  void *privdata){    dictht *t0, *t1;    const dictEntry *de, *next;    unsigned long m0, m1;    if (dictSize(d) == 0) return 0;    d->iterators++; // 安全迭代器,当次遍历结束后会-1 if (!dictIsRehashing(d)) { // 如果 rehash 已经完成,也就是只有 ht[0] 有数据 t0 = &(d->ht[0]); m0 = t0->sizemask; if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; // v & m0 是为了避免缩容后超出hash表最大值 while (de) { // 扫描此 hash 槽上的所有数据     next = de->next;     fn(privdata, de);     de = next; } v |= ~m0; // 二进制反转并+1 v = rev(v); v++; v = rev(v);    } else { // 如果正在 rehash,则需要处理两张表 ht[0]、ht[1]的数据 t0 = &d->ht[0]; t1 = &d->ht[1]; // 确保 t0 是小表 if (t0->size > t1->size) {     t0 = &d->ht[1];     t1 = &d->ht[0]; } m0 = t0->sizemask; m1 = t1->sizemask; if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { // 处理小表     next = de->next;     fn(privdata, de);     de = next; }  // 循环处理,处理大表,比如,size = 4 的 hash 表,当前槽位 1 // 需要处理对应扩容一倍后,槽位 1、5 do {     if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);     de = t1->table[v & m1];     while (de) {  next = de->next;  fn(privdata, de);  de = next;     }   v |= ~m1;     v = rev(v);     v++;     v = rev(v); } while (v & (m0 ^ m1));    } d->iterators--; // 对应前面 ++ 操作    return v;}
  • 变量 d 是当前迭代的字典;
  • 变量 v 标识迭代开始的游标(即 hash 表中数组索引,也就是我们常说的槽位),每次遍历后会返回新的游标值,整个遍历过程都是围绕这个游标值的改动进行,来保证所有的数据能被遍历到;
  • fn 是函数指针,每遍历一个节点则调用该函数处理;
  • bucketfn 函数在整理碎片时调用;
  • privdata 是回调函数 fn 所需参数

2)db.c#scanGenericCommand

客户端命令入口解析:

redis 127.0.0.1:6379> scan 0 COUNT 5

即,扫描游标从 0 开始的 5 条数据;不过,这里并不是严格意义上只返回 5 条数据,因为每一次扫描都要处理整个槽位的数据,因此是有可能多的。

具体是通过下面代码片段调用 dictScan:

    if (ht) { void *privdata[2]; // count 为 scan 命令传入的值,当hash表处于病态时(例如大部分节点为空),最大迭代次数为 10 * count long maxiterations = count*10; // list *keys = listCreate(); 创建新的 list 用来返回 keys privdata[0] = keys; // o != null 则为 Hash, Set or Zset object,反之为 NULL 则表示处理当前 db privdata[1] = o; do {     cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); } while (cursor && maxiterations-- && listLength(keys) < (unsigned long)count);    }

可见,do {} while() 语句中,通过循环调用 dictScan 来满足需要的 count 数量;也正因为如此,如果客户端传入的 count 过大,将会长时间阻塞;

由于 redis 是单线程处理,这种阻塞会影响其他命令处理,因此,需尽量避免 count 过大。


总结

本文先后介绍了 redis 的普通和安全两种迭代器:

  • 普通迭代器在迭代过程中不能进行增删改操作
  • 安全迭代器在迭代过程中可以进行增删改操作,但最初只能一次性迭代完整个字典,可能造成阻塞
  • 在安全迭代器基础上,演变出了间断遍历(scan 命令),解决了这种长时间阻塞问题。

关于间断遍历原理,文中已经详细描述,此处不在赘述。

参考文献:

Finally Redis collections are iterable(Antirez)
Add SCAN command #579
Fix dictScan(): It can’t scan all buckets when dict is shrinking. #4907
《Redis 设计与源码分析》【陈雷】

笑话娱乐网站