Tergel

Redis源码阅读2

单线程一样可以很强

这一篇集中看一下redis中不同命令对应的底层数据存储结构。

redis.c中声明了全局变量redisCommand cmdTable,从这里我们就直接能找到每个命令对应的处理函数,这次就从这里开始。

redis解析命令时,每个参数都对应创建了一个redisObject对象。

typedef struct redisObject {
    void *ptr;
    unsigned char type;
    unsigned char encoding;
    unsigned char storage;  /* If this object is a key, where is the value?
                             * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
    unsigned char vtype; /* If this object is a key, and value is swapped out,
                          * this is the type of the swapped out object. */
    int refcount;
    /* VM fields, this are only allocated if VM is active, otherwise the
     * object allocation function will just allocate
     * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
     * Redis without VM active will not have any overhead. */
    struct redisObjectVM vm;
} robj;

其中ptr保存对应字符串指针 type表明当前object数据类型,在这里就是字符串。

get

static int getGenericCommand(redisClient *c) {
    robj *o;

    // c->argv[1] 就是get对应的key值
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return REDIS_OK;
    // 检查object的数据类型
    if (o->type != REDIS_STRING) {
        // 数据类型不一致,返回错误
        addReply(c,shared.wrongtypeerr);
        return REDIS_ERR;
    } else {
        // 返回数据
        addReplyBulk(c,o);
        return REDIS_OK;
    }
}

static void getCommand(redisClient *c) {
    getGenericCommand(c);
}
static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

static robj *lookupKeyRead(redisDb *db, robj *key) {
    expireIfNeeded(db,key);
    return lookupKey(db,key);
}

// db->expires中记录key的过期时期
static int expireIfNeeded(redisDb *db, robj *key) {

    time_t when;
    dictEntry *de;

    /* No expire? return ASAP */
    // expires没有元素,或者找不到key对应的过期事件配置时,立即返回
    if (dictSize(db->expires) == 0 ||
       (de = dictFind(db->expires,key)) == NULL) return 0;

    // 检查过期时间
    when = (time_t) dictGetEntryVal(de);
    if (time(NULL) <= when) return 0;

    // 确认过期,从expires中删除key,从db->dict中同样删除
    dictDelete(db->expires,key);
    return dictDelete(db->dict,key) == DICT_OK;
}
static robj *lookupKey(redisDb *db, robj *key) {
    // 从字典中寻找key对应的数据
    dictEntry *de = dictFind(db->dict,key);
    if (de) { // 找到了

        robj *key = dictGetEntryKey(de);
        robj *val = dictGetEntryVal(de);

        if (server.vm_enabled) { // vm相关,略过
            if (key->storage == REDIS_VM_MEMORY ||
                key->storage == REDIS_VM_SWAPPING)
            {
                /* If we were swapping the object out, stop it, this key
                 * was requested. */
                if (key->storage == REDIS_VM_SWAPPING)
                    vmCancelThreadedIOJob(key);
                /* Update the access time of the key for the aging algorithm. */
                key->vm.atime = server.unixtime;
            } else {
                int notify = (key->storage == REDIS_VM_LOADING);

                /* Our value was swapped on disk. Bring it at home. */
                redisAssert(val == NULL);
                val = vmLoadObject(key);
                dictGetEntryVal(de) = val;

                /* Clients blocked by the VM subsystem may be waiting for
                 * this key... */
                if (notify) handleClientsBlockedOnSwappedKey(db,key);
            }
        }
        // 返回给调用函数,在getGenericCommand中对它的数据类型进一步做了检查,通过后返回给客户端
        return val;
    } else {
        return NULL;
    }
}

set

static void setGenericCommand(redisClient *c, int nx) {
    int retval;

    if (nx) deleteIfVolatile(c->db,c->argv[1]);
    // 加入db->dict这个字典中, argv[1]为key, argv[2]为value
    retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
    if (retval == DICT_ERR) {
        if (!nx) {
            /* If the key is about a swapped value, we want a new key object
             * to overwrite the old. So we delete the old key in the database.
             * This will also make sure that swap pages about the old object
             * will be marked as free. */
            if (server.vm_enabled && deleteIfSwapped(c->db,c->argv[1]))
                incrRefCount(c->argv[1]);
            dictReplace(c->db->dict,c->argv[1],c->argv[2]);
            incrRefCount(c->argv[2]);
        } else {
            addReply(c,shared.czero);
            return;
        }
    } else {
        // 修改key/value的引用计数
        incrRefCount(c->argv[1]);
        incrRefCount(c->argv[2]);
    }
    server.dirty++;
    removeExpire(c->db,c->argv[1]);
    // 返回结果给客户端
    addReply(c, nx ? shared.cone : shared.ok);
}

其实看到一半就知道这个key-value是保存在一个字典结构当中了。 redis通过dict.h/dict.c定义了字典结构。

// 一个可以持有key/value指针的节点,同时指向下个节点。很明显这个结构可以组成一个单向链表。
typedef struct dictEntry {
    void *key;
    void *val;
    struct dictEntry *next;
} dictEntry;

// 看名字是对应字典的类型? 包含了散列函数,k/v拷贝,对比key,销毁k/v等函数指针。 看结构可以理解是用于定义一个字典时,提供字典所需的相关函数定义。
typedef struct dictType {
    unsigned int (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

// 一个字典结构, 包含一个dictEntry组成的数组用于存储数据, dictType定义该字典所需函数,size记录字典大小, sizemask用于约束散列函数得出索引值永远在[0,size-1]之间, used记录字典内存消耗, *privdata看起来又是一个上下文数据对象, redis中好像没有用到。
typedef struct dict {
    dictEntry **table;
    dictType *type;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
    void *privdata;
} dict;

// iterator, 没什么可讲的
typedef struct dictIterator {
    dict *ht;
    int index;
    dictEntry *entry, *nextEntry;
} dictIterator;

一些宏


// 散列表初始大小
#define DICT_HT_INITIAL_SIZE 4

// 函数式宏,用于释放entry
#define dictFreeEntryVal(ht, entry) \
    if ((ht)->type->valDestructor) \
        (ht)->type->valDestructor((ht)->privdata, (entry)->val)

// 更新entry内的值
#define dictSetHashVal(ht, entry, _val_) do { \
    if ((ht)->type->valDup) \
        entry->val = (ht)->type->valDup((ht)->privdata, _val_); \
    else \
        entry->val = (_val_); \
} while(0)

// 释放key
#define dictFreeEntryKey(ht, entry) \
    if ((ht)->type->keyDestructor) \
        (ht)->type->keyDestructor((ht)->privdata, (entry)->key)

// 更新entry的key
#define dictSetHashKey(ht, entry, _key_) do { \
    if ((ht)->type->keyDup) \
        entry->key = (ht)->type->keyDup((ht)->privdata, _key_); \
    else \
        entry->key = (_key_); \
} while(0)

// 对比key
#define dictCompareHashKeys(ht, key1, key2) \
    (((ht)->type->keyCompare) ? \
        (ht)->type->keyCompare((ht)->privdata, key1, key2) : \
        (key1) == (key2))

// 计算散列值
#define dictHashKey(ht, key) (ht)->type->hashFunction(key)

// 获取entry的key
#define dictGetEntryKey(he) ((he)->key)
// 获取entry的val
#define dictGetEntryVal(he) ((he)->val)
// 获取当前散列表的大小
#define dictSlots(ht) ((ht)->size)
// 获取当前散列表的消耗
#define dictSize(ht) ((ht)->used)

看到了一种函数宏的写法: do {…} while(0),看网上描述, 这样可以:

  • 创建复合语句块:
  • 这种结构允许在宏中包含多个语句,同时保证它们作为一个整体执行。
  • 避免悬空 else 问题:
  • 在条件语句中使用宏时,这种结构可以防止出现悬空 else 的问题。
  • 允许使用分号结尾:
  • 使用这种结构可以让宏的使用者像普通函数那样在调用后加分号,而不会引起语法错误。
  • 保证只执行一次:
  • while(0) 确保循环体只执行一次,同时又不会被编译器优化掉。
  • 支持 break 语句:
  • 在宏内部,可以使用 break 语句来提前退出宏的执行。

具体的函数实现暂时可以跳过。这里的dict实现,主要是通过dictEntry **table;定义了一个数组,每个数组索引又是一个链表组成。通过散列函数计算得出数组索引,再对链表进行相应操作。