星空网站建设

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 85|回复: 0

新闻速看来聊聊 Redis 集群数据迁移

[复制链接]
  • TA的每日心情
    难过
    16 小时前
  • 签到天数: 101 天

    [LV.6]常住居民II

    5万

    主题

    90

    回帖

    15万

    积分

    超级版主

    Rank: 8Rank: 8

    积分
    157398
    发表于 2025-6-9 16:08:36 | 显示全部楼层 |阅读模式
    一、详解redis cluster数据迁移过程时至今日,人们对nebula也有着很多的依赖性,对产品的预期也越来越高。悦数图数据库是一款完全自主研发的国产图数据库和原生分布式图数据库,具有高性能,易扩展,安全稳定,自主可控的特点.万亿级数据仅需毫秒级查询延时,应用于金融风控,实时推荐,知识图谱等业务场景。https://www.yueshu.com.cn/


    1. 节点基本结构定义
    redis集群提供16384个slot,我们可以按需分配给节点上,后续进行键值对存储时,我们就可以按照算法将键值对存到对应slot上的redis服务器上:



    集群节点本质就是通过slots这个数组记录当前节点的所管理的情况,这里我们可以看到slots是一个char 数组,长度为REDIS_CLUSTER_SLOTS(16384)除8,这样做的原因是因为:

    char占1个字节,每个字节8位。
    每个char可以记录8个slot的情况,如果是自己的slot则对应char的某一个位置记录为1:
    我们以node-1为例,因为它负责0-5460的节点,所以它的slots0-5460都为1,对应的图解如下所示,可以看到笔者这里省略了后半部分,仅仅表示了0-15位置为1:



    对此我们也给出这段redis中节点的定义,即位于cluster.h中的clusterNode这个结构体中,可以看slots这段定义:

    复制
    typedef struct clusterNode {
      //......
        //记录集群负责的槽,总的为16384
        unsigned char slots[REDIS_CLUSTER_SLOTS/8];
        //......
    }
    1.
    2.
    3.
    4.
    5.
    6.
    2. 设置slot后续节点迁移
    以本文示例为例,我们希望后续节点2的数据全部存到节点1中,那么我们首先需要键入如下两条配置:

    复制
    # 在节点1上执行,将节点2数据导入到节点1上
    CLUSTER SETSLOT 3 IMPORTING node2
    # 在节点2上执行,将自己的数据迁移到节点1
    CLUSTER SETSLOT 3 MIGRATING node1
    1.
    2.
    3.
    4.
    这两条指比较终都会被各自的服务端解析,并调用clusterCommand执行,我们以节点1导入为例,假设我们执行clusterCommand解析到setslot 关键字和importing关键字,即知晓要导入其他节点的数据。对应的节点1就会通过importing_slots_from数组标记自己将导入这个slot的数据,而节点2也会通过migrating_slots_to数组标记自己要将数据导出给其他节点的slot:



    对此我们给出clusterCommand的执行流程,可以看到该函数解析出migrating或者importing关键字时就会将对的migrating_slots_to或者importing_slots_from数组对应slot位置的索引位置设置为当前上述命令传入的node id:

    复制
    void clusterCommand(redisClient *c) {
         //......

            if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {//处理迁出的逻辑
                //看看自己是否有迁出的slot,没有则报错
                if (server.cluster->slots[slot] != myself) {
                    addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                    return;
                }
                //查看自己是否知晓这个node id,如果没有则报错
                if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                    addReplyErrorFormat(c,"I don't know about node %s",
                        (char*)c->argv[4]->ptr);
                    return;
                }
                //标记迁出到slot为传入的node
                server.cluster->migrating_slots_to[slot] = n;
            } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {//处理迁入的逻辑
                //查看迁入的slot是否已经配置,如果有则报错
                if (server.cluster->slots[slot] == myself) {
                    addReplyErrorFormat(c,
                        "I'm already the owner of hash slot %u",slot);
                    return;
                }
                //查看自己是否知晓要迁入数据的node的信息,如果不知道则报错
                if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                    addReplyErrorFormat(c,"I don't know about node %s",
                        (char*)c->argv[3]->ptr);
                    return;
                }
                //标记迁入slot位置为传入的nodeid
                server.cluster->importing_slots_from[slot] = n;
            } //......
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    9.
    10.
    11.
    12.
    13.
    14.
    15.
    16.
    17.
    18.
    19.
    20.
    21.
    22.
    23.
    24.
    25.
    26.
    27.
    28.
    29.
    30.
    31.
    32.
    33.
    34.
    3. 请求重定向问题
    后续的我们假设还是将set key value请求发送到节点2,因为上述命令的原因,节点会返回move/ask告知客户端这个键值对现在要存到节点1上。对应节点1收到这个key请求时,通过key计算得slot正是自己,它就会将这个键值对存储到自己的数据库中:



    这里我们以节点1的角度查看这个问题,当客户端收到move指令后,继续向节点1发送指令,节点1通过收到指令调用processCommand,其内部调用getNodeByQuery获取当前key对应的slot,发现是自己则直接存储数据到当前节点的内存数据库中:

    复制
    int processCommand(redisClient *c) {
        //......
        //如果开启了集群模式,且发送者不是master且参数带key则进入逻辑
        if (server.cluster_enabled &&
            !(c->flags & REDIS_MASTER) &&
            !(c->flags & REDIS_LUA_CLIENT &&
              server.lua_caller->flags & REDIS_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
        {
            int hashslot;

            if (server.cluster->state != REDIS_CLUSTER_OK) {
               //......
            } else {
                int error_code;
                //查找键值对对应的slot和这个slot负责的节点
                clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
                //如果为空且或者非自己,则转交出去给别人处理
                if (n == NULL || n != server.cluster->myself) {
                    flagTransaction(c);
                    clusterRedirectClient(c,n,hashslot,error_code);
                    return REDIS_OK;
                }
            }
        }
    //......
    //将键值对存储到当前数据库中
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    9.
    10.
    11.
    12.
    13.
    14.
    15.
    16.
    17.
    18.
    19.
    20.
    21.
    22.
    23.
    24.
    25.
    26.
    27.
    28.
    我们以节点的视角再次直接步入getNodeByQuery查看这段逻辑,可以看到其内部会基于key计算slot然后将得到对应的node,然后进行如下判断:

    如果本次客户端请求是一个批量的请求,且首个key定位不到响应的slot,直接返回错误。
    如果key的slot属于当前节点,且当前节点正在迁出并且当前节点查不到这个key,则响应一个ask标识告知客户端到迁出的节点询问一下是否有数据。
    如果是key属于当前节点且正在进行导入,且key定位不到则响应异常,反之说明当前节点导入成功,直接返回当前节点信息。
    如果定位到的slot属于别的节点,则响应一个move告知客户端到别的节点获取键值对。
    对应的我们给出这段代码函数getNodeByQuery,对应的逻辑和笔者上述给出的核心分支一致:

    复制
    clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
       //.......

       
        //如果是exec命令则用mstate封装这些命令
        if (cmd->proc == execCommand) {
            /* If REDIS_MULTI flag is not set EXEC is just going to return an
             * error. */
            if (!(c->flags & REDIS_MULTI)) return myself;
            ms = &c->mstate;
        } else {
            //为了一个原子指向,创建一个假的multi记录这些指令
            ms = &_ms;
            _ms.commands = &mc;
            //命令个数1
            _ms.count = 1;
            //命令参数
            mc.argv = argv;
            //命令参数个数
            mc.argc = argc;
            //对应的命令
            mc.cmd = cmd;
        }

       
        //遍历命令
        for (i = 0; i < ms->count; i++) {
          //.......
            //解析出key以及个数
            keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
            for (j = 0; j < numkeys; j++) {
                //拿到key
                robj *thiskey = margv[keyindex[j]];
                //计算slot
                int thisslot = keyHashSlot((char*)thiskey->ptr,
                                           sdslen(thiskey->ptr));
       
                if (firstkey == NULL) {
                  
                    firstkey = thiskey;
                    slot = thisslot;
                    //拿着slot找到对应的集群节点
                    n = server.cluster->slots[slot];

                    //如果当前查询的key是首个key且找不到,则将错误码设置为REDIS_CLUSTER_REDIR_DOWN_UNBOUND并返回空
                    if (n == NULL) {
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND;
                        return NULL;
                    }

                    //如果就是当前节点正在做迁出或者迁入,则migrating_slot/importing_slot设置为1
                    if (n == myself &&
                        server.cluster->migrating_slots_to[slot] != NULL)
                    {
                        migrating_slot = 1;
                    } else if (server.cluster->importing_slots_from[slot] != NULL) {
                        importing_slot = 1;
                    }
                } else {
                 //.......
                }

             
                //如果正在做迁出或者嵌入找不到当前db找不到key的位置,则missing_keys++
                if ((migrating_slot || importing_slot) &&
                    lookupKeyRead(&server.db[0],thiskey) == NULL)
                {
                    missing_keys++;
                }
            }
            getKeysFreeResult(keyindex);
        }

       
        //所有key都没有对应slot节点,直接返回当前节点
        if (n == NULL) return myself;

      //.......
        //正在迁出且key找不到位置,错误码设置为ask并返回迁出的目标节点,让客户端到别的节点尝试看看
        if (migrating_slot && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
            return server.cluster->migrating_slots_to[slot];
        }

    //如果是节点正在导入且key找不到则返回,标识当前集群不稳定
        if (importing_slot &&
            (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
        {
            if (multiple_keys && missing_keys) {
                if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
                return NULL;
            } else {
             //反之说明导入成功则告知自己可以找到这个键值对
                return myself;
            }
        }

      //......
        //返回其他节点,error_code设置为move
        if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
        return n;
    }
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    快速回复 返回顶部 返回列表