请选择 进入手机版 | 继续访问电脑版

Redis中国用户组(CRUG)论坛

 找回密码
 立即注册

扫一扫,访问微社区

搜索
热搜: 活动 交友 discuz
查看: 2807|回复: 0

Redis源码解析(十五)--- aof-append only file解析

[复制链接]
  • TA的每日心情
    开心
    2019-4-19 23:07
  • 签到天数: 95 天

    [LV.6]常住居民II

    406

    主题

    515

    帖子

    3981

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    3981

    最佳新人活跃会员宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2016-4-5 10:16:41 | 显示全部楼层 |阅读模式

    继续学习redis源码下的Data数据相关文件的代码分析,今天我看的是一个叫aof的文件,这个字母是append ONLY file的简称,意味只进行追加文件操作。这里的文件追加记录时为了记录数据操作的改变记录,用以异常情况的数据恢复的。类似于之前我说的redo,undo日志的作用。我们都知道,redis作为一个内存数据库,数据的每次操作改变是先放在内存中,等到内存数据满了,在刷新到磁盘文件中,达到持久化的目的。所以aof的操作模式,也是采用了这样的方式。这里引入了一个block块的概念,其实就是一个缓冲区块。关于块的一些定义如下:

    1. /* AOF的下面的一些代码都用到了一个简单buffer缓存块来进行存储,存储了数据的一些改变操作记录,等到
    2.     缓冲中的达到一定的数据规模时,在持久化地写入到一个文件中,redis采用的方式是append追加的形式,这意味
    3.     每次追加都要调整存储的块的大小,但是不可能会有无限大小的块空间,所以redis在这里引入了块列表的概念,
    4.     设定死一个块的大小,超过单位块大小,存入另一个块中,这里定义每个块的大小为10M. */  
    5. #define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */  
    6.   
    7. /* 标准的aof文件读写块 */  
    8. typedef struct aofrwblock {  
    9.     //当前文件块被使用了多少,空闲的大小  
    10.     unsigned long used, free;  
    11.     //具体存储内容,大小10M  
    12.     char buf[AOF_RW_BUF_BLOCK_SIZE];  
    13. } aofrwblock;  
    复制代码

    也就是说,每个块的大小默认为10M,这个大小说大不大,说小不小了,如果填入的数据超出长度了,系统会动态申请一个新的缓冲块,在server端是通过一个块链表的形式,组织整个块的:
    1. /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */  
    2. /* 在缓冲区中追加数据,如果超出空间,会新申请一个缓冲块 */  
    3. void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {  
    4.     listNode *ln = listLast(server.aof_rewrite_buf_blocks);  
    5.     //定位到缓冲区的最后一块,在最后一块的位置上进行追加写操作  
    6.     aofrwblock *block = ln ? ln->value : NULL;  
    7.   
    8.     while(len) {  
    9.         /* If we already got at least an allocated block, try appending
    10.          * at least some piece into it. */  
    11.         if (block) {  
    12.             //如果当前的缓冲块的剩余空闲能支持len长度的内容时,直接写入  
    13.             unsigned long thislen = (block->free < len) ? block->free : len;  
    14.             if (thislen) {  /* The current block is not already full. */  
    15.                 memcpy(block->buf+block->used, s, thislen);  
    16.                 block->used += thislen;  
    17.                 block->free -= thislen;  
    18.                 s += thislen;  
    19.                 len -= thislen;  
    20.             }  
    21.         }  
    22.   
    23.         if (len) { /* First block to allocate, or need another block. */  
    24.             int numblocks;  
    25.             //如果不够的话,需要新创建,进行写操作  
    26.             block = zmalloc(sizeof(*block));  
    27.             block->free = AOF_RW_BUF_BLOCK_SIZE;  
    28.             block->used = 0;  
    29.             //还要把缓冲块追加到服务端的buffer列表中  
    30.             listAddNodeTail(server.aof_rewrite_buf_blocks,block);  
    31.   
    32.             /* Log every time we cross more 10 or 100 blocks, respectively
    33.              * as a notice or warning. */  
    34.             numblocks = listLength(server.aof_rewrite_buf_blocks);  
    35.             if (((numblocks+1) % 10) == 0) {  
    36.                 int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :  
    37.                                                          REDIS_NOTICE;  
    38.                 redisLog(level,"Background AOF buffer size: %lu MB",  
    39.                     aofRewriteBufferSize()/(1024*1024));  
    40.             }  
    41.         }  
    42.     }  
    43. }  
    复制代码

    当想要主动的将缓冲区中的数据刷新到持久化到磁盘中时,调用下面的方法:
    1. /* Write the append only file buffer on disk.
    2. *
    3. * Since we are required to write the AOF before replying to the client,
    4. * and the only way the client socket can get a write is entering when the
    5. * the event loop, we accumulate all the AOF writes in a memory
    6. * buffer and write it on disk using this function just before entering
    7. * the event loop again.
    8. *
    9. * About the 'force' argument:
    10. *
    11. * When the fsync policy is set to 'everysec' we may delay the flush if there
    12. * is still an fsync() going on in the background thread, since for instance
    13. * on Linux write(2) will be blocked by the background fsync anyway.
    14. * When this happens we remember that there is some aof buffer to be
    15. * flushed ASAP, and will try to do that in the serverCron() function.
    16. *
    17. * However if force is set to 1 we'll write regardless of the background
    18. * fsync. */  
    19. #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */  
    20. /* 刷新缓存区的内容到磁盘中 */  
    21. void flushAppendOnlyFile(int force) {  
    22.     ssize_t nwritten;  
    23.     int sync_in_progress = 0;  
    24.     mstime_t latency;  
    25.   
    26.     if (sdslen(server.aof_buf) == 0) return;  
    27.   
    28.     if (server.aof_fsync == AOF_FSYNC_EVERYSEC)  
    29.         sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;  
    30.   
    31.     if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {  
    32.         /* With this append fsync policy we do background fsyncing.
    33.          * If the fsync is still in progress we can try to delay
    34.          * the write for a couple of seconds. */  
    35.         if (sync_in_progress) {  
    36.             if (server.aof_flush_postponed_start == 0) {  
    37.                 /* No previous write postponinig, remember that we are
    38.                  * postponing the flush and return. */  
    39.                 server.aof_flush_postponed_start = server.unixtime;  
    40.                 return;  
    41.             } else if (server.unixtime - server.aof_flush_postponed_start < 2) {  
    42.                 /* We were already waiting for fsync to finish, but for less
    43.                  * than two seconds this is still ok. Postpone again. */  
    44.                 return;  
    45.             }  
    46.             /* Otherwise fall trough, and go write since we can't wait
    47.              * over two seconds. */  
    48.             server.aof_delayed_fsync++;  
    49.             redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");  
    50.         }  
    51.     }  
    52.     /* We want to perform a single write. This should be guaranteed atomic
    53.      * at least if the filesystem we are writing is a real physical one.
    54.      * While this will save us against the server being killed I don't think
    55.      * there is much to do about the whole server stopping for power problems
    56.      * or alike */  
    57.   
    58.     //在进行写入操作的时候,还监听了延迟  
    59.     latencyStartMonitor(latency);  
    60.     nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));  
    61.     latencyEndMonitor(latency);  
    62.     /* We want to capture different events for delayed writes:
    63.      * when the delay happens with a pending fsync, or with a saving child
    64.      * active, and when the above two conditions are missing.
    65.      * We also use an additional event name to save all samples which is
    66.      * useful for graphing / monitoring purposes. */  
    67.     if (sync_in_progress) {  
    68.         latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);  
    69.     } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {  
    70.         latencyAddSampleIfNeeded("aof-write-active-child",latency);  
    71.     } else {  
    72.         latencyAddSampleIfNeeded("aof-write-alone",latency);  
    73.     }  
    74.     latencyAddSampleIfNeeded("aof-write",latency);  
    75.   
    76.     /* We performed the write so reset the postponed flush sentinel to zero. */  
    77.     server.aof_flush_postponed_start = 0;  
    78.   
    79.     if (nwritten != (signed)sdslen(server.aof_buf)) {  
    80.         static time_t last_write_error_log = 0;  
    81.         int can_log = 0;  
    82.   
    83.         /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */  
    84.         if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {  
    85.             can_log = 1;  
    86.             last_write_error_log = server.unixtime;  
    87.         }  
    88.   
    89.         /* Lof the AOF write error and record the error code. */  
    90.         if (nwritten == -1) {  
    91.             if (can_log) {  
    92.                 redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",  
    93.                     strerror(errno));  
    94.                 server.aof_last_write_errno = errno;  
    95.             }  
    96.         } else {  
    97.             if (can_log) {  
    98.                 redisLog(REDIS_WARNING,"Short write while writing to "  
    99.                                        "the AOF file: (nwritten=%lld, "  
    100.                                        "expected=%lld)",  
    101.                                        (long long)nwritten,  
    102.                                        (long long)sdslen(server.aof_buf));  
    103.             }  
    104.   
    105.             if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {  
    106.                 if (can_log) {  
    107.                     redisLog(REDIS_WARNING, "Could not remove short write "  
    108.                              "from the append-only file.  Redis may refuse "  
    109.                              "to load the AOF the next time it starts.  "  
    110.                              "ftruncate: %s", strerror(errno));  
    111.                 }  
    112.             } else {  
    113.                 /* If the ftrunacate() succeeded we can set nwritten to
    114.                  * -1 since there is no longer partial data into the AOF. */  
    115.                 nwritten = -1;  
    116.             }  
    117.             server.aof_last_write_errno = ENOSPC;  
    118.         }  
    119.   
    120.         /* Handle the AOF write error. */  
    121.         if (server.aof_fsync == AOF_FSYNC_ALWAYS) {  
    122.             /* We can't recover when the fsync policy is ALWAYS since the
    123.              * reply for the client is already in the output buffers, and we
    124.              * have the contract with the user that on acknowledged write data
    125.              * is synched on disk. */  
    126.             redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");  
    127.             exit(1);  
    128.         } else {  
    129.             /* Recover from failed write leaving data into the buffer. However
    130.              * set an error to stop accepting writes as long as the error
    131.              * condition is not cleared. */  
    132.             server.aof_last_write_status = REDIS_ERR;  
    133.   
    134.             /* Trim the sds buffer if there was a partial write, and there
    135.              * was no way to undo it with ftruncate(2). */  
    136.             if (nwritten > 0) {  
    137.                 server.aof_current_size += nwritten;  
    138.                 sdsrange(server.aof_buf,nwritten,-1);  
    139.             }  
    140.             return; /* We'll try again on the next call... */  
    141.         }  
    142.     } else {  
    143.         /* Successful write(2). If AOF was in error state, restore the
    144.          * OK state and log the event. */  
    145.         if (server.aof_last_write_status == REDIS_ERR) {  
    146.             redisLog(REDIS_WARNING,  
    147.                 "AOF write error looks solved, Redis can write again.");  
    148.             server.aof_last_write_status = REDIS_OK;  
    149.         }  
    150.     }  
    151.     server.aof_current_size += nwritten;  
    152.   
    153.     /* Re-use AOF buffer when it is small enough. The maximum comes from the
    154.      * arena size of 4k minus some overhead (but is otherwise arbitrary). */  
    155.     if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {  
    156.         sdsclear(server.aof_buf);  
    157.     } else {  
    158.         sdsfree(server.aof_buf);  
    159.         server.aof_buf = sdsempty();  
    160.     }  
    161.   
    162.     /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
    163.      * children doing I/O in the background. */  
    164.     if (server.aof_no_fsync_on_rewrite &&  
    165.         (server.aof_child_pid != -1 || server.rdb_child_pid != -1))  
    166.             return;  
    167.   
    168.     /* Perform the fsync if needed. */  
    169.     if (server.aof_fsync == AOF_FSYNC_ALWAYS) {  
    170.         /* aof_fsync is defined as fdatasync() for Linux in order to avoid
    171.          * flushing metadata. */  
    172.         latencyStartMonitor(latency);  
    173.         aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */  
    174.         latencyEndMonitor(latency);  
    175.         latencyAddSampleIfNeeded("aof-fsync-always",latency);  
    176.         server.aof_last_fsync = server.unixtime;  
    177.     } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&  
    178.                 server.unixtime > server.aof_last_fsync)) {  
    179.         if (!sync_in_progress) aof_background_fsync(server.aof_fd);  
    180.         server.aof_last_fsync = server.unixtime;  
    181.     }  
    182. }  
    复制代码

    当然有操作会对数据库中的所有数据,做操作记录,便宜用此文件进行全盘恢复:
    1. /* Write a sequence of commands able to fully rebuild the dataset into
    2. * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
    3. *
    4. * In order to minimize the number of commands needed in the rewritten
    5. * log Redis uses variadic commands when possible, such as RPUSH, SADD
    6. * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
    7. * are inserted using a single command. */  
    8. /* 将数据库的内容按照键值,再次完全重写入文件中 */  
    9. int rewriteAppendOnlyFile(char *filename) {  
    10.     dictIterator *di = NULL;  
    11.     dictEntry *de;  
    12.     rio aof;  
    13.     FILE *fp;  
    14.     char tmpfile[256];  
    15.     int j;  
    16.     long long now = mstime();  
    17.   
    18.     /* Note that we have to use a different temp name here compared to the
    19.      * one used by rewriteAppendOnlyFileBackground() function. */  
    20.     snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());  
    21.     fp = fopen(tmpfile,"w");  
    22.     if (!fp) {  
    23.         redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));  
    24.         return REDIS_ERR;  
    25.     }  
    26.   
    27.     rioInitWithFile(&aof,fp);  
    28.     if (server.aof_rewrite_incremental_fsync)  
    29.         rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);  
    30.     for (j = 0; j < server.dbnum; j++) {  
    31.         char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";  
    32.         redisDb *db = server.db+j;  
    33.         dict *d = db->dict;  
    34.         if (dictSize(d) == 0) continue;  
    35.         di = dictGetSafeIterator(d);  
    36.         if (!di) {  
    37.             fclose(fp);  
    38.             return REDIS_ERR;  
    39.         }  
    40.   
    41.         /* SELECT the new DB */  
    42.         if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;  
    43.         if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;  
    44.   
    45.         /* Iterate this DB writing every entry */  
    46.         //遍历数据库中的每条记录,进行日志记录  
    47.         while((de = dictNext(di)) != NULL) {  
    48.             sds keystr;  
    49.             robj key, *o;  
    50.             long long expiretime;  
    51.   
    52.             keystr = dictGetKey(de);  
    53.             o = dictGetVal(de);  
    54.             initStaticStringObject(key,keystr);  
    55.   
    56.             expiretime = getExpire(db,&key);  
    57.   
    58.             /* If this key is already expired skip it */  
    59.             if (expiretime != -1 && expiretime < now) continue;  
    60.   
    61.             /* Save the key and associated value */  
    62.             if (o->type == REDIS_STRING) {  
    63.                 /* Emit a SET command */  
    64.                 char cmd[]="*3\r\n$3\r\nSET\r\n";  
    65.                 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;  
    66.                 /* Key and value */  
    67.                 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;  
    68.                 if (rioWriteBulkObject(&aof,o) == 0) goto werr;  
    69.             } else if (o->type == REDIS_LIST) {  
    70.                 if (rewriteListObject(&aof,&key,o) == 0) goto werr;  
    71.             } else if (o->type == REDIS_SET) {  
    72.                 if (rewriteSetObject(&aof,&key,o) == 0) goto werr;  
    73.             } else if (o->type == REDIS_ZSET) {  
    74.                 if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;  
    75.             } else if (o->type == REDIS_HASH) {  
    76.                 if (rewriteHashObject(&aof,&key,o) == 0) goto werr;  
    77.             } else {  
    78.                 redisPanic("Unknown object type");  
    79.             }  
    80.             /* Save the expire time */  
    81.             if (expiretime != -1) {  
    82.                 char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";  
    83.                 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;  
    84.                 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;  
    85.                 if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;  
    86.             }  
    87.         }  
    88.         dictReleaseIterator(di);  
    89.     }  
    90.   
    91.     /* Make sure data will not remain on the OS's output buffers */  
    92.     if (fflush(fp) == EOF) goto werr;  
    93.     if (fsync(fileno(fp)) == -1) goto werr;  
    94.     if (fclose(fp) == EOF) goto werr;  
    95.   
    96.     /* Use RENAME to make sure the DB file is changed atomically only
    97.      * if the generate DB file is ok. */  
    98.     if (rename(tmpfile,filename) == -1) {  
    99.         redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));  
    100.         unlink(tmpfile);  
    101.         return REDIS_ERR;  
    102.     }  
    103.     redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");  
    104.     return REDIS_OK;  
    105.   
    106. werr:  
    107.     fclose(fp);  
    108.     unlink(tmpfile);  
    109.     redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));  
    110.     if (di) dictReleaseIterator(di);  
    111.     return REDIS_ERR;  
    112. }  
    复制代码

    系统同样开放了后台的此方法操作:
    1. /* This is how rewriting of the append only file in background works:
    2. *
    3. * 1) The user calls BGREWRITEAOF
    4. * 2) Redis calls this function, that forks():
    5. *    2a) the child rewrite the append only file in a temp file.
    6. *    2b) the parent accumulates differences in server.aof_rewrite_buf.
    7. * 3) When the child finished '2a' exists.
    8. * 4) The parent will trap the exit code, if it's OK, will append the
    9. *    data accumulated into server.aof_rewrite_buf into the temp file, and
    10. *    finally will rename(2) the temp file in the actual file name.
    11. *    The the new file is reopened as the new append only file. Profit!
    12. */  
    13. /* 后台进行AOF数据文件写入操作 */  
    14. int rewriteAppendOnlyFileBackground(void)  
    复制代码

    原理就是和昨天分析的一样,用的是fork(),创建子线程,最后开放出API:
    /* aof.c 中的API */  
    void aofRewriteBufferReset(void) /* 释放server中旧的buffer,并创建一份新的buffer */  
    unsigned long aofRewriteBufferSize(void) /* 返回当前AOF的buffer的总大小 */  
    void aofRewriteBufferAppend(unsigned char *s, unsigned long len) /* 在缓冲区中追加数据,如果超出空间,会新申请一个缓冲块 */  
    ssize_t aofRewriteBufferWrite(int fd) /* 将保存内存中的buffer内容写入到文件中,也是分块分块的写入 */  
    void aof_background_fsync(int fd) /* 开启后台线程进行文件同步操作 */  
    void stopAppendOnly(void) /* 停止追加数据操作,这里用的是一个命令模式 */  
    int startAppendOnly(void) /* 开启追加模式 */  
    void flushAppendOnlyFile(int force) /* 刷新缓存区的内容到磁盘中 */  
    sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) /* 根据输入的字符串,进行参数包装,再次输出 */  
    sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) /* 将过期等的命令都转化为PEXPIREAT命令,把时间转化为了绝对时间 */  
    void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) /* 根据cmd的不同操作,进行命令的不同转化 */  
    struct redisClient *createFakeClient(void) /* 命令总是被客户端所执行的,因此要引入客户端的方法 */  
    void freeFakeClientArgv(struct redisClient *c) /* 释放客户端参数操作 */  
    void freeFakeClient(struct redisClient *c) /* 释放客户端参数操作 */  
    int loadAppendOnlyFile(char *filename) /* 加载AOF文件内容 */  
    int rioWriteBulkObject(rio *r, robj *obj) /* 写入bulk对象,分为LongLong对象,和普通的String对象 */  
    int rewriteListObject(rio *r, robj *key, robj *o) /* 写入List列表对象,分为ZIPLIST压缩列表和LINEDLIST普通链表操作 */  
    int rewriteSetObject(rio *r, robj *key, robj *o) /* 写入set对象数据 */  
    int rewriteSortedSetObject(rio *r, robj *key, robj *o) /* 写入排序好的set对象 */  
    static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) /* 写入哈希迭代器当前指向的对象 */  
    int rewriteHashObject(rio *r, robj *key, robj *o) /* 写入哈希字典对象 */  
    int rewriteAppendOnlyFile(char *filename) /* 将数据库的内容按照键值,再次完全重写入文件中 */  
    int rewriteAppendOnlyFileBackground(void) /* 后台进行AOF数据文件写入操作 */  
    void bgrewriteaofCommand(redisClient *c) /* 后台写AOF文件操作命令模式 */  
    void aofRemoveTempFile(pid_t childpid) /* 移除某次子线程ID为childpid所生产的aof文件 */  
    void aofUpdateCurrentSize(void) /* 更新当前aof文件的大小 */  
    void backgroundRewriteDoneHandler(int exitcode, int bysignal) /* 后台子线程写操作完成后的回调方法 */  





    转自:http://blog.csdn.net/androidlushangderen/article/details/40304889
    上一篇:Redis源码分析(十四)--- rdb.c本地数据库操作
    下一篇:Redis源码解析(十六)--- config配置文件

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    Archiver|手机版|小黑屋|Redis中国用户组 ( 京ICP备15003959号

    GMT+8, 2019-11-20 08:20 , Processed in 0.091881 second(s), 31 queries .

    Powered by Discuz! X3.2

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表