请选择 进入手机版 | 继续访问电脑版

Redis中国用户组(CRUG)论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

搜索
热搜: 活动 交友 discuz
查看: 725|回复: 0

Redis源码分析(十二)--- redis-check-dump本地数据库检测

[复制链接]
  • TA的每日心情
    奋斗
    2017-6-13 10:19
  • 签到天数: 92 天

    [LV.6]常住居民II

    360

    主题

    465

    帖子

    3718

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    3718

    最佳新人活跃会员宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2016-4-1 10:02:43 | 显示全部楼层 |阅读模式

    这个文件我在今天分析学习的时候,一直有种似懂非懂的感觉,代码量700+的代码,最后开放给系统的就是一个process()方法。这里说的说的数据库检测,是针对key的检测,会用到,下面提到的结构体:

    1. /* Data type to hold opcode with optional key name an success status */  
    2. /* 用于key的检测时使用,后续检测操作都用到了entry结构体 */  
    3. typedef struct {  
    4.     //key的名字  
    5.     char* key;  
    6.     //类型  
    7.     int type;  
    8.     //是否是成功状态  
    9.     char success;  
    10. } entry;  
    复制代码

    后续所涉及到的很多API都是与这个结构体相关,此代码最终检测的其实是一个叫dump.rdb的文件,在检测的后面还会加上循环冗余校验CRC64。下面亮出API:
    1. int checkType(unsigned char t) /* 每当添加一个新的obj类型时,都要检测这个类型是否合理 */  
    2. int readBytes(void *target, long num) /* 在当前文件偏移量位置往后读取num个字节位置 */  
    3. int processHeader(void) /* 读取快照文件的头部,检测头部名称或版本号是否正确 */  
    4. int loadType(entry *e) /* 为entry赋上obj的Type */  
    5. int peekType() /* 弹出版本号 */  
    6. int processTime(int type) /* 去除用来表示时间的字节 */  
    7. uint32_t loadLength(int *isencoded) /* 分type读取长度 */  
    8. char *loadIntegerObject(int enctype) /* 根据当前整型的编码方式,获取数值,以字符形式返回 */  
    9. char* loadLzfStringObject() /* 获得解压后的字符串 */  
    10. char* loadStringObject() /* 获取当前文件信息字符串对象 */  
    11. int processStringObject(char** store) /* 将字符串对象赋给所传入的参数 */  
    12. double* loadDoubleValue() /* 文件中读取double类型值 */  
    13. int processDoubleValue(double** store) /* 对double类型进行赋予给参数 */  
    14. int loadPair(entry *e) /* 读取键值对 */  
    15. entry loadEntry() /* 获取entry的key结构体 */  
    16. void printCentered(int indent, int width, char* body) /* 输出界面对称的信息 */  
    17. void printValid(uint64_t ops, uint64_t bytes) /* 输出有效信息 */  
    18. void printSkipped(uint64_t bytes, uint64_t offset) /* 输出Skipped跳过bytes字节信息 */  
    19. void printErrorStack(entry *e) /* 输出错误栈的信息 */  
    20. void process(void) /* process方法是执行检测的主要方法 */  
    复制代码

    方法里面好多loadXXX()方法,这几个load方法的确比较有用,在这个检测文件中,编写者又很人性化的构造了error的结构体,用于模拟错误信息栈的输出。
    1. /* Hold a stack of errors */  
    2. /* 错误信息结构体 */  
    3. typedef struct {  
    4.     //具体的错误信息字符串  
    5.     char error[16][1024];  
    6.     //内部偏移量  
    7.     size_t offset[16];  
    8.     //错误信息等级  
    9.     size_t level;  
    10. } errors_t;  
    11. static errors_t errors;  
    复制代码

    不同的level等级对应不同的出错信息。在API里有个比较关键的方法,loadEntry,获取key相关的结构体;
    1. /* 获取entry的key结构体 */  
    2. entry loadEntry() {  
    3.     entry e = { NULL, -1, 0 };  
    4.     uint32_t length, offset[4];  
    5.   
    6.     /* reset error container */  
    7.     errors.level = 0;  
    8.   
    9.     offset[0] = CURR_OFFSET;  
    10.     //此处赋值type  
    11.     if (!loadType(&e)) {  
    12.         return e;  
    13.     }  
    14.   
    15.     offset[1] = CURR_OFFSET;  
    16.     if (e.type == REDIS_SELECTDB) {  
    17.         if ((length = loadLength(NULL)) == REDIS_RDB_LENERR) {  
    18.             SHIFT_ERROR(offset[1], "Error reading database number");  
    19.             return e;  
    20.         }  
    21.         if (length > 63) {  
    22.             SHIFT_ERROR(offset[1], "Database number out of range (%d)", length);  
    23.             return e;  
    24.         }  
    25.     } else if (e.type == REDIS_EOF) {  
    26.         if (positions[level].offset < positions[level].size) {  
    27.             SHIFT_ERROR(offset[0], "Unexpected EOF");  
    28.         } else {  
    29.             e.success = 1;  
    30.         }  
    31.         return e;  
    32.     } else {  
    33.         /* optionally consume expire */  
    34.         if (e.type == REDIS_EXPIRETIME ||  
    35.             e.type == REDIS_EXPIRETIME_MS) {  
    36.             if (!processTime(e.type)) return e;  
    37.             if (!loadType(&e)) return e;  
    38.         }  
    39.   
    40.         offset[1] = CURR_OFFSET;  
    41.         //调用loadPair为Entry赋值key  
    42.         if (!loadPair(&e)) {  
    43.             SHIFT_ERROR(offset[1], "Error for type %s", types[e.type]);  
    44.             return e;  
    45.         }  
    46.     }  
    47.   
    48.     /* all entries are followed by a valid type:
    49.      * e.g. a new entry, SELECTDB, EXPIRE, EOF */  
    50.     offset[2] = CURR_OFFSET;  
    51.     if (peekType() == -1) {  
    52.         SHIFT_ERROR(offset[2], "Followed by invalid type");  
    53.         SHIFT_ERROR(offset[0], "Error for type %s", types[e.type]);  
    54.         e.success = 0;  
    55.     } else {  
    56.         e.success = 1;  
    57.     }  
    58.   
    59.     return e;  
    60. }  
    复制代码

    其中里面的关键的赋值key,value在loadPair()方法:
    1. /* 读取键值对 */  
    2. int loadPair(entry *e) {  
    3.     uint32_t offset = CURR_OFFSET;  
    4.     uint32_t i;  
    5.   
    6.     /* read key first */  
    7.     //首先从文件中读取key值  
    8.     char *key;  
    9.     if (processStringObject(&key)) {  
    10.         e->key = key;  
    11.     } else {  
    12.         SHIFT_ERROR(offset, "Error reading entry key");  
    13.         return 0;  
    14.     }  
    15.   
    16.     uint32_t length = 0;  
    17.     if (e->type == REDIS_LIST ||  
    18.         e->type == REDIS_SET  ||  
    19.         e->type == REDIS_ZSET ||  
    20.         e->type == REDIS_HASH) {  
    21.         if ((length = loadLength(NULL)) == REDIS_RDB_LENERR) {  
    22.             SHIFT_ERROR(offset, "Error reading %s length", types[e->type]);  
    23.             return 0;  
    24.         }  
    25.     }  
    26.   
    27.     //读取key值后面跟着的value值  
    28.     switch(e->type) {  
    29.     case REDIS_STRING:  
    30.     case REDIS_HASH_ZIPMAP:  
    31.     case REDIS_LIST_ZIPLIST:  
    32.     case REDIS_SET_INTSET:  
    33.     case REDIS_ZSET_ZIPLIST:  
    34.     case REDIS_HASH_ZIPLIST:  
    35.         //因为类似ziplist,zipmap等结构体其实是一个个结点连接而成的超级字符串,所以是直接读取  
    36.         if (!processStringObject(NULL)) {  
    37.             SHIFT_ERROR(offset, "Error reading entry value");  
    38.             return 0;  
    39.         }  
    40.     break;  
    41.     case REDIS_LIST:  
    42.     case REDIS_SET:  
    43.         //而上面这2种是传统的结构,要分结点读取  
    44.         for (i = 0; i < length; i++) {  
    45.             offset = CURR_OFFSET;  
    46.             if (!processStringObject(NULL)) {  
    47.                 SHIFT_ERROR(offset, "Error reading element at index %d (length: %d)", i, length);  
    48.                 return 0;  
    49.             }  
    50.         }  
    51.     break;  
    52.     case REDIS_ZSET:  
    53.         for (i = 0; i < length; i++) {  
    54.             offset = CURR_OFFSET;  
    55.             if (!processStringObject(NULL)) {  
    56.                 SHIFT_ERROR(offset, "Error reading element key at index %d (length: %d)", i, length);  
    57.                 return 0;  
    58.             }  
    59.             offset = CURR_OFFSET;  
    60.             if (!processDoubleValue(NULL)) {  
    61.                 SHIFT_ERROR(offset, "Error reading element value at index %d (length: %d)", i, length);  
    62.                 return 0;  
    63.             }  
    64.         }  
    65.     break;  
    66.     case REDIS_HASH:  
    67.         for (i = 0; i < length; i++) {  
    68.             offset = CURR_OFFSET;  
    69.             if (!processStringObject(NULL)) {  
    70.                 SHIFT_ERROR(offset, "Error reading element key at index %d (length: %d)", i, length);  
    71.                 return 0;  
    72.             }  
    73.             offset = CURR_OFFSET;  
    74.             if (!processStringObject(NULL)) {  
    75.                 SHIFT_ERROR(offset, "Error reading element value at index %d (length: %d)", i, length);  
    76.                 return 0;  
    77.             }  
    78.         }  
    79.     break;  
    80.     default:  
    81.         SHIFT_ERROR(offset, "Type not implemented");  
    82.         return 0;  
    83.     }  
    84.     /* because we're done, we assume success */  
    85.     //只要执行过了,我们就认定为成功  
    86.     e->success = 1;  
    87.     return 1;  
    88. }  
    复制代码

    如果e-success=1则说明这个key的检测就过关了。为什么这么说呢,我们来看主检测方法process()方法:
    1. /* process方法是执行检测的主要方法 */  
    2. void process(void) {  
    3.     uint64_t num_errors = 0, num_valid_ops = 0, num_valid_bytes = 0;  
    4.     entry entry;  
    5.     //读取文件头部获取快照文件版本号  
    6.     int dump_version = processHeader();  
    7.   
    8.     /* Exclude the final checksum for RDB >= 5. Will be checked at the end. */  
    9.     if (dump_version >= 5) {  
    10.         if (positions[0].size < 8) {  
    11.             printf("RDB version >= 5 but no room for checksum.\n");  
    12.             exit(1);  
    13.         }  
    14.         positions[0].size -= 8;  
    15.     }  
    16.   
    17.     level = 1;  
    18.     while(positions[0].offset < positions[0].size) {  
    19.         positions[1] = positions[0];  
    20.   
    21.         entry = loadEntry();  
    22.         if (!entry.success) {  
    23.             //如果Entry不为成功状态  
    24.             printValid(num_valid_ops, num_valid_bytes);  
    25.             printErrorStack(&entry);  
    26.             num_errors++;  
    27.             num_valid_ops = 0;  
    28.             num_valid_bytes = 0;  
    29.   
    30.             /* search for next valid entry */  
    31.             uint64_t offset = positions[0].offset + 1;  
    32.             int i = 0;  
    33.               
    34.             //接着寻找后面3个有效entries  
    35.             while (!entry.success && offset < positions[0].size) {  
    36.                 positions[1].offset = offset;  
    37.   
    38.                 /* find 3 consecutive valid entries */  
    39.                 //寻找3个有效的entries  
    40.                 for (i = 0; i < 3; i++) {  
    41.                     entry = loadEntry();  
    42.                     if (!entry.success) break;  
    43.                 }  
    44.                 /* check if we found 3 consecutive valid entries */  
    45.                 if (i < 3) {  
    46.                     offset++;  
    47.                 }  
    48.             }  
    49.   
    50.             /* print how many bytes we have skipped to find a new valid opcode */  
    51.             if (offset < positions[0].size) {  
    52.                 printSkipped(offset - positions[0].offset, offset);  
    53.             }  
    54.   
    55.             positions[0].offset = offset;  
    56.         } else {  
    57.             num_valid_ops++;  
    58.             num_valid_bytes += positions[1].offset - positions[0].offset;  
    59.   
    60.             /* advance position */  
    61.             positions[0] = positions[1];  
    62.         }  
    63.         free(entry.key);  
    64.     }  
    65.   
    66.     /* because there is another potential error,
    67.      * print how many valid ops we have processed */  
    68.     printValid(num_valid_ops, num_valid_bytes);  
    69.   
    70.     /* expect an eof */  
    71.     if (entry.type != REDIS_EOF) {  
    72.         /* last byte should be EOF, add error */  
    73.         errors.level = 0;  
    74.         SHIFT_ERROR(positions[0].offset, "Expected EOF, got %s", types[entry.type]);  
    75.   
    76.         /* this is an EOF error so reset type */  
    77.         entry.type = -1;  
    78.         printErrorStack(&entry);  
    79.   
    80.         num_errors++;  
    81.     }  
    82.   
    83.     /* Verify checksum */  
    84.     //版本号>=5的时候,需要检验校验和  
    85.     if (dump_version >= 5) {  
    86.         uint64_t crc = crc64(0,positions[0].data,positions[0].size);  
    87.         uint64_t crc2;  
    88.         unsigned char *p = (unsigned char*)positions[0].data+positions[0].size;  
    89.         crc2 = ((uint64_t)p[0] << 0) |  
    90.                ((uint64_t)p[1] << 8) |  
    91.                ((uint64_t)p[2] << 16) |  
    92.                ((uint64_t)p[3] << 24) |  
    93.                ((uint64_t)p[4] << 32) |  
    94.                ((uint64_t)p[5] << 40) |  
    95.                ((uint64_t)p[6] << 48) |  
    96.                ((uint64_t)p[7] << 56);  
    97.         if (crc != crc2) {  
    98.             SHIFT_ERROR(positions[0].offset, "RDB CRC64 does not match.");  
    99.         } else {  
    100.             printf("CRC64 checksum is OK\n");  
    101.         }  
    102.     }  
    103.   
    104.     /* print summary on errors */  
    105.     if (num_errors) {  
    106.         printf("\n");  
    107.         printf("Total unprocessable opcodes: %llu\n",  
    108.             (unsigned long long) num_errors);  
    109.     }  
    110. }  
    复制代码

    如果想了解检测的详细原理,事先了解dump.rdb的文件内容结构也许会对我们又很大帮助。


    转自:http://blog.csdn.net/androidlushangderen/article/details/40190195


    上一篇:Redis源码分析(十一)--- memtest内存检测
    下一篇:Redis源码分析(十三)--- redis-benchmark性能测试

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    阿里云
    阿里云

    Archiver|手机版|小黑屋|Redis中国用户组 ( 京ICP备15003959号

    GMT+8, 2017-6-23 20:14 , Processed in 0.134409 second(s), 33 queries .

    Powered by Discuz! X3.2

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表