> 文档中心 > redis集群系列一

redis集群系列一

redis集群

redis集群模式是分布式存储的一个case。

数据分布

分布式数据库要解决将一堆数据按照分区规则映射到不同节点上。常见的分区规则有哈希分区和顺序分区。

分区方式 特点 产品
哈希分区 离散度好;数据分布与业务无关 redis cluster、cassandra
顺序分区 离散度容易倾斜;数据分布与业务有关;可以顺序访问 HBase、Hypertable

哈希分区

redis集群使用的是哈希分区,常见的哈希分区有以下几种:

  • 节点取余
    • hash(key)/N, 但是N变化会导致结果变化,所以当扩容和缩容过程中需要重新计算key的归宿。
    • 一般如果使用该方法会翻倍扩容,减少不必要的key搬迁
  • 一致性哈希分区
    • 为每一个节点分配一个token(数值范围在0-2^32),token会构成一个哈希环
    • 一般会根据key计算一个hash值,然后顺时针找到第一个大于等于hash值的token节点
    • 优点:增减节点只影响相邻节点上的数据
    • 缺点:少量节点的增减会大范围影响数据映射;一般要增减一辈才可以保证数据和负载均衡
  • 虚拟槽分区
    • 将数据映射到一个固定范围的整数集合中,比如redis是0-16383
    • 较多的槽位是为了能较好的拆分数据和集群扩展

redis集群的哈希分区

redis集群所采用的虚拟槽分区

  • 槽的计算方法:CRC16(key)& 16383
  • 优点:节点、槽、数据具有映射关系;节点来维护槽的映射关系,客户端不需要关心

redis集群的功能限制

  1. key批量操作,需要在同一个slot
  2. 多个key需要在一个节点上,才可以使用事务
  3. key是数据分布的最小单元
  4. 单机模式的redis可以支持16个数据库,集群模式只有1个数据库(db 0)
  5. 从节点只能复制主节点的数据

组建集群

集群节点

  1. 至少6个节点,每个节点需要开启集群模式
  2. 每个节点有redis配置文件和集群配置文件
  3. 集群配置文件
    1. 文件路径及名称可以通过cluster-config-file设置
    2. 当集群内节点的信息发生变化,比如添加节点、节点下线、故障转移等,节点会自动保存集群状态到配置文件,如下
    3. 配置文件中节点id是一个40位16进制字符串。节点id在集群初始化的过程中只会创建一次,节点重启之后会重用,redis的运行id每次重启都会变化
5a1acfc7c7c914232e41c4adac0219023843517c 192.168.5.52:6379@16379 slave e49c63b81bc667716ba085a4de7d9f9b3ae1e7aa 0 1652338370000 2 connected77c118e5cae9f962e4e8b17647d79d36f190d221 192.168.3.101:6379@16379 master - 0 1652338369776 4 connected 5461-109211e0a38848c0788eaf74f34e1416cab221371a0f9 192.168.3.102:6379@16379 slave 973ca4aac1373a46f8218e3c1838a1badcc73ffe 0 1652338370777 5 connectede49c63b81bc667716ba085a4de7d9f9b3ae1e7aa 192.168.4.117:6379@16379 master - 0 1652338370000 0 connected 10922-163839a1d88f575f705c32c93e8a3e31a2c328c55567b 192.168.4.122:6379@16379 slave 77c118e5cae9f962e4e8b17647d79d36f190d221 0 1652338371782 4 connected973ca4aac1373a46f8218e3c1838a1badcc73ffe 192.168.5.51:6379@16379 myself,master - 0 1652338367000 1 connected 0-5460vars currentEpoch 5 lastVoteEpoch 0

节点握手

  1. 节点之间相互认识通过gossip协议
  2. 客户端会向节点A发起cluster meet B-ip B-port的命令;cluster meet属于异步操作,执行之后会立刻返回给客户端
+---------------+ +---------------+| | | || | | || |     meet      | || || || |     pong      | || | | ||    nodeA      |     ping      |     nodeB     || || || |     pong      | || | | || |     ping      | || || || |     pong      | || | | || | | || || || |     ...| |+---------------+ +---------------+
  1. 如果在nodeB上执行cluster nodes能看到nodeA,说明B已经认识了A
  2. 一个集群N个节点,相互之间认识是N^2的网络。集群间建立联系并不需要客户端将所有连接全部meet一遍。客户端只需要在任意节点上meet其他节点,之后集群的拓扑会被传播到其他节点。其他节点发现有新的节点加入会主动发起握手流程
  3. 集群节点间关系正常之后,集群还不能正常工作,需要分配所有的槽之后,集群才会进入在线状态

槽位分配

  1. 通过cluster addslots $(seq 0 5461) 可以分配槽
  2. 分配结束后可以通过cluster info查看集群状态;通过cluster nodes可以看到节点和槽的对应关系

主从关系修正

  1. 主节点分配到槽之后,在从节点上执行cluster replicate master-node-id

大集群组建工具

  • 可以使用redis-trib.rb组建大集群
  • redis-trib.rb是redis官方推出的管理redis集群的工具,集成在redis的源码src目录下,是基于redis提供的集群命令封装成简单、便捷、实用的操作工具。redis-trib.rb是redis作者用ruby完成的

节点通信

通信流程

  1. 分布式存储中需要维护节点间元数据的信息。常见的有两种:集中式和P2P。
  2. Redis集群采用的P2P的Gossip协议。
  3. 集群控制面通信
    0. 集群节点会单独开启一个TCP端口用于节点间通信(端口在基础端口上加10000),对应的源码见下
    1. 接收到ping的节点回复pong
#ifndef __CLUSTER_H#define __CLUSTER_H/*----------------------------------------------------------------------------- * Redis cluster data structures, defines, exported API. *----------------------------------------------------------------------------*/#define CLUSTER_SLOTS 16384#define CLUSTER_OK 0   /* Everything looks ok */#define CLUSTER_FAIL 1 /* The cluster can't work */#define CLUSTER_NAMELEN 40    /* sha1 hex length */#define CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */

Gossip消息

  1. 常见的Gossip消息有:ping、pong、meet、fail等
    0. meet消息:用来通知新节点加入,meet结束后新加入节点会周期性向其他节点发送ping、pong
  2. ping消息:集群内的每个节点每秒会向其他节点(部分节点notall)发送ping消息,用来检测其他节点是否在线以及交换彼此的状态信息。ping消息里除了自身节点还有其他节点的状态数据
  3. pong消息:当收到ping、meet消息时,pong会作为响应消息向发送方确认消息正常通信。pong消息内部封装了自身的数据,所以也可以用来向集群的其他节点广播自身的状态更新
  4. fail消息:当节点A判断集群内的另一个节点B下线时,会向集群的其他节点广播fail消息。其他节点收到fail消息会将B节点状态更新为fail状态
  • 消息在代码中的呈现
typedef struct {    char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */    uint32_t totlen;    /* Total length of this message */    uint16_t ver;/* Protocol version, currently set to 1. */    uint16_t port;      /* TCP base port number. */    uint16_t type;      /* Message type */    uint16_t count;     /* Only used for some kind of messages. */    uint64_t currentEpoch;  /* The epoch accordingly to the sending node. */    uint64_t configEpoch;   /* The config epoch if it's a master, or the last   epoch advertised by its master if it is a   slave. */    uint64_t offset;    /* Master replication offset if node is a master or      processed replication offset if node is a slave. */    char sender[CLUSTER_NAMELEN]; /* Name of the sender node */    unsigned char myslots[CLUSTER_SLOTS/8];    char slaveof[CLUSTER_NAMELEN];    char myip[NET_IP_STR_LEN];    /* Sender IP, if not all zeroed. */    char notused1[34];  /* 34 bytes reserved for future usage. */    uint16_t cport;      /* Sender TCP cluster bus port */    uint16_t flags;      /* Sender node flags */    unsigned char state; /* Cluster state from the POV of the sender */    unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */    union clusterMsgData data;} clusterMsg;
  • 消息体在代码中的呈现
    • 有必要温习下union:
      • 一个union可以有多个数据成员;
      • 在任意时刻,联合中只能有一个数据成员可以有值。当给联合中某个成员赋值之后,该联合中的其它成员就变成未定义状态了;
      • union所占内存空间是共用体中所占空间最大的类型的长度。
union clusterMsgData {    /* PING, MEET and PONG */    struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1];    } ping;    /* FAIL */    struct { clusterMsgDataFail about;    } fail;    /* PUBLISH */    struct { clusterMsgDataPublish msg;    } publish;    /* UPDATE */    struct { clusterMsgDataUpdate nodecfg;    } update;    /* MODULE */    struct { clusterMsgModule msg;    } module;};
  1. 当接收节点收到ping/meet时,会解析消息头和消息体
    • 解析消息头:消息头是发送节点的信息,如果发送节点是新节点而且是meet消息,则加入到本地节点列表;如果是已知节点,则更新发送节点的状态
    • 解析消息体:如果clusterMsgDataGossip数组中节点是新节点,则尝试与新节点的meet流程;如果是已知节点,判断该节点是否下线,如果下线则更新
  2. 处理完之后返回pong
/* Process the gossip section of PING or PONG packets. * Note that this function assumes that the packet is already sanity-checked * by the caller, not in the content of the gossip section, but in the * length. */void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {    ...}

节点选择

  • Gossip协议如果一次广播全部的节点,可能会引发洪泛,占用带宽。
  • 每次ping/pong消息会携带当前节点和部分其他节点的信息,如果携带其他节点比较多,也会增加计算的负担。

那么,节点选择redis代码是如何实现的呢?

  • ping/pong的消息发送在定时任务中,定时任务的默认执行频率是每秒10次(开启debug log的时候还可以看到相关输出)
/* This is executed 10 times every second */void clusterCron(void) {    ...    di = dictGetSafeIterator(server.cluster->nodes);    server.cluster->stats_pfail_nodes = 0;    while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); ... if (node->link == NULL) {     ...     link = createClusterLink(node);     ...     clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?      CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);     ...     serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",      node->name, node->ip, node->cport); } ...    }    ...}

节点每秒发送ping的数量

  • 每秒会从随机的5个邻居节点中选择最久没有收到pong的1个节点去更新
void clusterCron(void) {    ...    if (!(iteration % 10)) { int j;  /* Check a few random nodes and ping the one with the oldest  * pong_received time. */ /*     随机抽查5个节点,向pong_received值最小的发送PING消息     pong_received【接收到PONG的时间】  */ for (j = 0; j nodes);     clusterNode *this = dictGetVal(de);      /* Don't ping nodes disconnected or with a ping currently active. */     /* 跳过无连接或已经发送过PING的节点 */     if (this->link == NULL || this->ping_sent != 0) continue;     /*  跳过myself节点和处于握手状态的节点 */     if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))  continue;   /* 需要再研究,这里是什么意思? */     /* 当min_pong_node为NULL或者min_pong大于当前节点收到的pong的时间的情况下 */     /* menwen-查找出这个5个随机抽查的节点,接收到PONG回复过去最久的节点 */     if (min_pong_node == NULL || min_pong > this->pong_received) {  min_pong_node = this;  min_pong = this->pong_received;     } } /* 如果min_pong_node不为NULL,    向接收到PONG回复过去最久的节点发送PING消息,判断是否可达  */ if (min_pong_node) {     serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);     clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); }    }    ...}
  • 每100ms会扫描该节点的邻居节点,如果其最近一次收到pong的时间大于cluster_node_timeout/2,则立即发送ping。
void clusterCron(void) {    ...    // 开始迭代所有节点    while((de = dictNext(di)) != NULL) { ... /* 如果当前没有发送PING消息,并且在一定时间内也没有收到PONG回复 */ if (node->link &&     node->ping_sent == 0 &&     (now - node->pong_received) > server.cluster_node_timeout/2) {     /*  给node节点发送一个PING消息 */     clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);     continue; } ...    }
  • 所以一个节点每秒需要发送1+10*num((now - node->pong_received) > server.cluster_node_timeout/2)次ping

消息数据量

  • ping消息主要是消息头clusterMsg和消息体clusterMsgData。
  • clusterMsg 主要大小是unsigned char myslots[CLUSTER_SLOTS/8]),16384/8bytes=2kB
  • clusterMsgData 中会携带几个节点的数据呢,可以围观这个源码func clusterSendPing,我就死皮赖脸的翻译下源码hh
    • 一般是节点总数的1/10,或者至少3个。
    • 为什么这么设定?
      • ping这么多次需要在cluster_node_timeout/2时间内完成,另外一半时间要留给pong的(哈哈哈现有乒乓球名字还是先有redis ping pong?十分的好奇)
      • 任意1个节点最少和其他节点在超时时间内互换4个心跳包(一个节点最久node_timeout/2,就会向其他节点发送一次ping包)
      • 认为一个节点fail,需要收到一个清理fail节点的report,一般下线检测时间是2*cluster_node_timeout,在这个期间会收到其他任意集群节点8个心跳包
      • 所以一个下线检测时间周期中节点共可以收到8N个心跳包,每个心跳包包含下线节点的概率是1/10,所以收到下线报告的期望值为80%N
/* Remove failure reports that are too old, where too old means reasonably * older than the global node timeout. Note that anyway for a node to be * flagged as FAIL we need to have a local PFAIL state that is at least * older than the global node timeout, so we don't just trust the number * of failure reports from other nodes. */void clusterNodeCleanupFailureReports(clusterNode *node) {    list *l = node->fail_reports;    listNode *ln;    listIter li;    clusterNodeFailReport *fr;    mstime_t maxtime = server.cluster_node_timeout *CLUSTER_FAIL_REPORT_VALIDITY_MULT;    mstime_t now = mstime();    listRewind(l,&li);    while ((ln = listNext(&li)) != NULL) { fr = ln->value; if (now - fr->time > maxtime) listDelNode(l,ln);    }}
  • 不得不说,源码的注释写的真好,流畅的英文以及简洁准确的表达hh(以前本科学校操作系统那门课,当时觉得有些作业属于看源码注释补充代码,觉得好像不太对劲,但其实是正道啊,你要把这块写明白,可不就要看别的相关func的代码和注释咩。以及一个初学毛毛爬虫,由浅及深学习嘛)
void clusterSendPing(clusterLink *link, int type) {    ...    /* How many gossip sections we want to add? 1/10 of the number of nodes     * and anyway at least 3. Why 1/10?     *     * If we have N masters, with N/10 entries, and we consider that in     * node_timeout we exchange with each other node at least 4 packets     * (we ping in the worst case in node_timeout/2 time, and we also     * receive two pings from the host), we have a total of 8 packets     * in the node_timeout*2 falure reports validity time. So we have     * that, for a single PFAIL node, we can expect to receive the following     * number of failure reports (in the specified window of time):     *     * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:     *     * PROB = probability of being featured in a single gossip entry,     * which is 1 / NUM_OF_NODES.     * ENTRIES = 10.     * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.     *     * If we assume we have just masters (so num of nodes and num of masters     * is the same), with 1/10 we always get over the majority, and specifically     * 80% of the number of nodes, to account for many masters failing at the     * same time.     *     * Since we have non-voting slaves that lower the probability of an entry     * to feature our node, we set the number of entries per packet as     * 10% of the total nodes we have. */     ...}
  • 节点超时的参数是:cluster_node_timeout默认值为15s(见下)。
  • 关于这个clusterCron函数的源码解读(建议看一下,写的非常好):https://blog.csdn.net/Edidaughter/article/details/116403296
#define CLUSTER_DEFAULT_NODE_TIMEOUT 15000
  • 文章中的源码均来自于redis-5.0.12(不要问我为什么是这个版本,哈哈不重要)
  • understand看C++源码好用,可以戳这个链接下载安装:https://www.macwk.com/soft/understand
  • 看书对于码农会有点枯燥,但感觉有源码对比就有趣很多。纸上得来终觉浅,绝知此事要躬行。【看到代码仿佛书里的不是假的,可考证hh】

哈尔滨心理咨询网