请选择 进入手机版 | 继续访问电脑版

Redis中国用户组(CRUG)论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

搜索
热搜: 活动 交友 discuz
查看: 566|回复: 0

Redis源码分析(二十七)--- rio系统I/O的封装

[复制链接]
  • TA的每日心情
    开心
    前天 12:10
  • 签到天数: 91 天

    [LV.6]常住居民II

    359

    主题

    464

    帖子

    3683

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    3683

    最佳新人活跃会员宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2016-4-11 10:09:06 | 显示全部楼层 |阅读模式

    I/O操作对于每个系统来说都是必不可少的一部分。而且I/O操作的好坏,在一定程度上也会影响着系统的效率问题。今天我学习了一下在Redis中的I/O是怎么处理的,同样的,Redis在他自己的系统中,也封装了一个I/O层。简称RIO。得先看看RIO中有什么东西喽:

    1. struct _rio {  
    2.     /* Backend functions.
    3.      * Since this functions do not tolerate short writes or reads the return
    4.      * value is simplified to: zero on error, non zero on complete success. */  
    5.     /* 数据流的读方法 */  
    6.     size_t (*read)(struct _rio *, void *buf, size_t len);  
    7.     /* 数据流的写方法 */  
    8.     size_t (*write)(struct _rio *, const void *buf, size_t len);  
    9.     /* 获取当前的读写偏移量 */  
    10.     off_t (*tell)(struct _rio *);  
    11.     /* The update_cksum method if not NULL is used to compute the checksum of
    12.      * all the data that was read or written so far. The method should be
    13.      * designed so that can be called with the current checksum, and the buf
    14.      * and len fields pointing to the new block of data to add to the checksum
    15.      * computation. */  
    16.     /* 当读入新的数据块的时候,会更新当前的校验和 */  
    17.     void (*update_cksum)(struct _rio *, const void *buf, size_t len);  
    18.   
    19.     /* The current checksum */  
    20.     /* 当前的校验和 */  
    21.     uint64_t cksum;  
    22.   
    23.     /* number of bytes read or written */  
    24.     /* 当前读取的或写入的字节大小 */  
    25.     size_t processed_bytes;  
    26.   
    27.     /* maximum single read or write chunk size */  
    28.     /* 最大的单次读写的大小 */  
    29.     size_t max_processing_chunk;  
    30.   
    31.     /* Backend-specific vars. */  
    32.     /* rio中I/O变量 */  
    33.     union {  
    34.         //buffer结构体  
    35.         struct {  
    36.             //buffer具体内容  
    37.             sds ptr;  
    38.             //偏移量  
    39.             off_t pos;  
    40.         } buffer;  
    41.         //文件结构体  
    42.         struct {  
    43.             FILE *fp;  
    44.             off_t buffered; /* Bytes written since last fsync. */  
    45.             //同步的最小大小  
    46.             off_t autosync; /* fsync after 'autosync' bytes written. */  
    47.         } file;  
    48.     } io;  
    49. };  
    复制代码

    里面除了3个必须的方法,read,write方法,还有获取偏移量的tell方法,还有2个结构体变量,一个buffer结构体,一个file结构体,作者针对不同的I/O情况,做了不同的处理,当执行临时的I/O操作时,都与rio.buffer打交道,当与文件进行I/O操作时,则执行与rio.file之间的操作。下面看看rio统一定义的读写方法:
    1. /* The following functions are our interface with the stream. They'll call the
    2. * actual implementation of read / write / tell, and will update the checksum
    3. * if needed. */  
    4. /* rio的写方法 */  
    5. static inline size_t rioWrite(rio *r, const void *buf, size_t len) {  
    6.     while (len) {  
    7.         //判断当前操作字节长度是否超过最大长度  
    8.         size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;  
    9.         //写入新的数据时,更新校验和  
    10.         if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);  
    11.         //执行写方法  
    12.         if (r->write(r,buf,bytes_to_write) == 0)  
    13.             return 0;  
    14.         buf = (char*)buf + bytes_to_write;  
    15.         len -= bytes_to_write;  
    16.         //操作字节数增加  
    17.         r->processed_bytes += bytes_to_write;  
    18.     }  
    19.     return 1;  
    20. }  
    21.   
    22. /* rio的读方法 */  
    23. static inline size_t rioRead(rio *r, void *buf, size_t len) {  
    24.     while (len) {  
    25.         //判断当前操作字节长度是否超过最大长度  
    26.         size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;  
    27.         //读数据方法  
    28.         if (r->read(r,buf,bytes_to_read) == 0)  
    29.             return 0;  
    30.         //读数据时,更新校验和  
    31.         if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);  
    32.         buf = (char*)buf + bytes_to_read;  
    33.         len -= bytes_to_read;  
    34.         r->processed_bytes += bytes_to_read;  
    35.     }  
    36.     return 1;  
    37. }  
    复制代码

    这里有一个比较不错的地方,每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体:
    1. /* 根据上面描述的方法,定义了BufferRio */  
    2. static const rio rioBufferIO = {  
    3.     rioBufferRead,  
    4.     rioBufferWrite,  
    5.     rioBufferTell,  
    6.     NULL,           /* update_checksum */  
    7.     0,              /* current checksum */  
    8.     0,              /* bytes read or written */  
    9.     0,              /* read/write chunk size */  
    10.     { { NULL, 0 } } /* union for io-specific vars */  
    11. };  
    12.   
    13. /* 根据上面描述的方法,定义了FileRio */  
    14. static const rio rioFileIO = {  
    15.     rioFileRead,  
    16.     rioFileWrite,  
    17.     rioFileTell,  
    18.     NULL,           /* update_checksum */  
    19.     0,              /* current checksum */  
    20.     0,              /* bytes read or written */  
    21.     0,              /* read/write chunk size */  
    22.     { { NULL, 0 } } /* union for io-specific vars */  
    23. };  
    复制代码

    里面分别定义了相对应的读写方法,比如buffer的Read方法和File的Read方法:
    1. /* Returns 1 or 0 for success/failure. */  
    2. /* 读取rio中的buffer内容到传入的参数 */  
    3. static size_t rioBufferRead(rio *r, void *buf, size_t len) {  
    4.     if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)  
    5.         return 0; /* not enough buffer to return len bytes. */  
    6.     memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);  
    7.     r->io.buffer.pos += len;  
    8.     return 1;  
    9. }  
    复制代码
    1. /* Returns 1 or 0 for success/failure. */  
    2. /* 读取rio中的fp文件内容 */  
    3. static size_t rioFileRead(rio *r, void *buf, size_t len) {  
    4.     return fread(buf,len,1,r->io.file.fp);  
    5. }
    复制代码

    作用的rio的对象变量不一样,最后在Redis的声明中给出了4种不同类型数据的写入方法:
    1. /* rio写入不同类型数据方法,最终调用的是riowrite方法 */  
    2. size_t rioWriteBulkCount(rio *r, char prefix, int count);  
    3. size_t rioWriteBulkString(rio *r, const char *buf, size_t len);  
    4. size_t rioWriteBulkLongLong(rio *r, long long l);  
    5. size_t rioWriteBulkDouble(rio *r, double d);
    复制代码

    举其中的一个方法实现:
    1. /* Write multi bulk count in the format: "*<count>\r\n". */  
    2. /* rio写入不同类型数据方法,调用的是riowrite方法 */  
    3. size_t rioWriteBulkCount(rio *r, char prefix, int count) {  
    4.     char cbuf[128];  
    5.     int clen;  
    6.   
    7.     cbuf[0] = prefix;  
    8.     clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);  
    9.     cbuf[clen++] = '\r';  
    10.     cbuf[clen++] = '\n';  
    11.     if (rioWrite(r,cbuf,clen) == 0) return 0;  
    12.     return clen;  
    13. }  
    复制代码

    调用的还是里面的rioWrite方法,根据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时,buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件中了。

    1. /* Returns 1 or 0 for success/failure. */  
    2. /* 将buf写入rio中的file文件中 */  
    3. static size_t rioFileWrite(rio *r, const void *buf, size_t len) {  
    4.     size_t retval;  
    5.   
    6.     retval = fwrite(buf,len,1,r->io.file.fp);  
    7.     r->io.file.buffered += len;  
    8.   
    9.     if (r->io.file.autosync &&  
    10.         r->io.file.buffered >= r->io.file.autosync)  
    11.     {  
    12.         //判读是否需要同步  
    13.         fflush(r->io.file.fp);  
    14.         aof_fsync(fileno(r->io.file.fp));  
    15.         r->io.file.buffered = 0;  
    16.     }  
    17.     return retval;  
    18. }  
    复制代码
    转自:http://blog.csdn.net/androidlushangderen/article/details/40706751
    上一篇:Redis源码分析(二十六)--- slowLog和hyperloglog
    下一篇:Redis源码分析(二十八)--- object创建和释放redisObject对象

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    阿里云
    阿里云

    Archiver|手机版|小黑屋|Redis中国用户组 ( 京ICP备15003959号

    GMT+8, 2017-5-26 05:55 , Processed in 0.112084 second(s), 31 queries .

    Powered by Discuz! X3.2

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表