这篇博客想讲一下 Redis 命令的注册、解析和执行。

本文采用的源码是 Redis 5.0.0

首先来看下 Redis 命令的结构:redisCommand

typedef void redisCommandProc(client *c);
typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
struct redisCommand {
    char *name;
    redisCommandProc *proc; // 执行函数
    int arity; // 参数数量
    char *sflags; // 标记,一个 char 代表一个标志 
    int flags;    // 实际的标记,从 sflags 中获得

    redisGetKeysProc *getkeys_proc; // 获得 key 参数

    int firstkey; // 第一个是 key 的参数 
    int lastkey;  // 最后一个是 key 的参数
    int keystep;  // 第一个和最后一个 key 之间的步数 
    long long microseconds, calls; // 命令的总执行时间、调用次数
}

Redis 所有的命令,都保存在数组 redisCommandTable。这里以 GET 命令为例:

  • 名称:get
  • 执行函数:getCommand
  • 参数个数:2
  • 标记:rF
  • 实际标记:0
  • 不用于集群重定向
  • 第一个为 key 的参数,是第 1 个参数
  • 最后一个为 key 的参数,也是第 1 个参数
  • 上述两个参数的间隔为 1
  • 总执行时间为 0
  • 调用次数为 0
struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    // ...
}

Redis 的 main 函数(server.c)中,调用 pupulateCommandTable 填充了命令表:

void populateCommandTable(void) {
    int j;
    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

    for (j = 0; j < numcommands; j++) {
        struct redisCommand *c = redisCommandTable+j;
        char *f = c->sflags;
        int retval1, retval2;

        while(*f != '\0') {
            switch(*f) {

            // 计算实际的 flag
            case 'w': c->flags |= CMD_WRITE; break;
            case 'r': c->flags |= CMD_READONLY; break;
            case 'm': c->flags |= CMD_DENYOOM; break;
            case 'a': c->flags |= CMD_ADMIN; break;
            case 'p': c->flags |= CMD_PUBSUB; break;
            case 's': c->flags |= CMD_NOSCRIPT; break;
            case 'R': c->flags |= CMD_RANDOM; break;
            case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
            case 'l': c->flags |= CMD_LOADING; break;
            case 't': c->flags |= CMD_STALE; break;
            case 'M': c->flags |= CMD_SKIP_MONITOR; break;
            case 'k': c->flags |= CMD_ASKING; break;
            case 'F': c->flags |= CMD_FAST; break;
            default: serverPanic("Unsupported command flag"); break;
            }
            f++;
        }

        // 将命令加入 server.commands 哈希表
        retval1 = dictAdd(server.commands, sdsnew(c->name), c);
        // 将命令加入 server.orig_commands 哈希表,这张表不会受到命令重命名的影响
        retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
        serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
    }
}

命令表保存在 server 变量中,它是一个全局变量:

struct redisServer server; /* Server global state */

struct redisServer {
    // ...
    dict *commands;             /* Command table */
    dict *orig_commands;        /* Command table before command renaming. */
    // ...
}

Redis 命令执行的函数是 processCommands,它会先调用lookupCommand 查找命令,然后 call 执行具体的操作。

lookupCommand就是一个字典查找操作,具体实现可以看这里

// server.c
struct redisCommand *lookupCommand(sds name) {
    return dictFetchValue(server.commands, name);
}

// dict.c
void *dictFetchValue(dict *d, const void *key) {
    dictEntry *he;

    he = dictFind(d,key);
    return he ? dictGetVal(he) : NULL;
}

call 函数:


void call(client *c, int flags) {
    long long dirty, start, duration;
    int client_old_flags = c->flags;
    struct redisCommand *real_cmd = c->cmd;

    // ...

    // 重新初始化标志位,并为命令传播创建数组
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    redisOpArray prev_also_propagate = server.also_propagate;
    redisOpArrayInit(&server.also_propagate);


    // 执行命令,记录执行时间
    dirty = server.dirty;
    start = ustime();
    c->cmd->proc(c);
    duration = ustime()-start;
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;

    // ...

    // 如有需要,记录慢日志。填充统计数据
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }
    if (flags & CMD_CALL_STATS) {
        // 记录真正被执行的命令,防止 MULTI-EXEC/EXPIRE/GEOADD 等
        real_cmd->microseconds += duration;
        real_cmd->calls++;
    }

    // 将命令传播给 AOF 和副本
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        // ...
    }

   // ...

   server.also_propagate = prev_also_propagate;
   server.stat_numcommands++;
}

那么 processCommand 在哪里触发呢?答案是 networking.c 里。

整个函数的调用关系是:

readQueryFromClient -> processInputBufferAndReplicate -> processInputBuffer -> processCommand

processInputBuffer 函数根据请求类型进行数据读取处理,processMultibulkBuffer 是针对多个命令请求的数据读取解析,并且放到命令字段argv中。

void processInputBuffer(client *c) {
    server.current_client = c;

    // 处理,直到 buffer 为空
    while(c->qb_pos < sdslen(c->querybuf)) {
        // client 暂停
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        // client 正在处理其他事情
        if (c->flags & CLIENT_BLOCKED) break;

        // 有 lua 脚本正在执行
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

        // 结果返回给 client 之后,就关闭连接,设置 CLIENT_CLOSE_AFTER_REPLY 状态
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        // 判断请求类型
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            // 这里读取buff中的数据,并且根据argc来填写argv数据
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        // 批量处理,这时长度可能小于 0
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 命令执行之后,重置 client
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    // 更新 master 的 replica 偏移量
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                }

                /* Don't reset the client structure for clients blocked in a
                 * module blocking command, so that the reply callback will
                 * still be able to access the client argv and argc field.
                 * The client will be reset in unblockClientFromModule(). */
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            /* freeMemoryIfNeeded may flush slave output buffers. This may
             * result into a slave, that may be the active client, to be
             * freed. */
            if (server.current_client == NULL) break;
        }
    }

    /* Trim to pos */
    if (server.current_client != NULL && c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }

    server.current_client = NULL;
}

那么 processInputBuffer 什么时候触发呢,这时候事件模型就登场了。To be continued…