请选择 进入手机版 | 继续访问电脑版

Redis中国用户组(CRUG)论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

搜索
热搜: 活动 交友 discuz
查看: 407|回复: 0

Redis源码分析(二十二)--- networking网络协议传输

[复制链接]
  • TA的每日心情
    开心
    2017-3-20 10:39
  • 签到天数: 90 天

    [LV.6]常住居民II

    358

    主题

    462

    帖子

    3650

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    3650

    最佳新人活跃会员宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2016-4-8 10:25:59 | 显示全部楼层 |阅读模式

    上次我只分析了Redis网络部分的代码一部分,今天我把networking的代码实现部分也学习了一遍,netWorking的代码更多偏重的是Client客户端的操作。里面addReply()系列的方法操作是主要的部分。光光这个系列的方法,应该占据了一半的API的数量。我把API分成了3个部分:

    1. /* ------------ API ---------------------- */  
    2. void *dupClientReplyValue(void *o)  /* 复制value一份 */  
    3. int listMatchObjects(void *a, void *b) /* 比价2个obj是否相等 */  
    4. robj *dupLastObjectIfNeeded(list *reply) /* 返回回复列表中最后一个元素对象 */  
    5. void copyClientOutputBuffer(redisClient *dst, redisClient *src) /* 将源Client的输出buffer复制给目标Client */  
    6. static void acceptCommonHandler(int fd, int flags) /* 网络连接后的调用方法 */  
    7. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)  
    8. void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)  
    9. void disconnectSlaves(void) /* 使server的slave失去连接 */  
    10. void replicationHandleMasterDisconnection(void)  
    11. void flushSlavesOutputBuffers(void) /* 从方法将会在freeMemoryIfNeeded(),释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */  
    12. int processEventsWhileBlocked(void)  
    13.   
    14. /* ------------- addReply API -----------------   */  
    15. int _addReplyToBuffer(redisClient *c, char *s, size_t len) /* 往客户端缓冲区中添加内容 */  
    16. void _addReplyObjectToList(redisClient *c, robj *o) /* robj添加到reply的列表中 */  
    17. void _addReplySdsToList(redisClient *c, sds s) /* 在回复列表中添加Sds字符串对象 */  
    18. void _addReplyStringToList(redisClient *c, char *s, size_t len) /* 在回复列表中添加字符串对象,参数中已经给定字符的长度 */  
    19. void addReply(redisClient *c, robj *obj) /* 在redisClient的buffer中写入数据,数据存在obj->ptr的指针中 */  
    20. void addReplySds(redisClient *c, sds s) /* 在回复中添加Sds字符串,下面的额addReply()系列方法原理基本类似 */  
    21. void addReplyString(redisClient *c, char *s, size_t len)  
    22. void addReplyErrorLength(redisClient *c, char *s, size_t len)  
    23. void addReplyError(redisClient *c, char *err) /* 往Reply中添加error类的信息 */  
    24. void addReplyErrorFormat(redisClient *c, const char *fmt, ...)  
    25. void addReplyStatusLength(redisClient *c, char *s, size_t len)  
    26. void addReplyStatus(redisClient *c, char *status)  
    27. void addReplyStatusFormat(redisClient *c, const char *fmt, ...)  
    28. void *addDeferredMultiBulkLength(redisClient *c) /* 在reply list 中添加一个空的obj对象 */  
    29. void setDeferredMultiBulkLength(redisClient *c, void *node, long length)  
    30. void addReplyDouble(redisClient *c, double d) /* 在bulk reply中添加一个double类型值,bulk的意思为大块的,bulk reply的意思为大数据量的回复 */  
    31. void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix)  
    32. void addReplyLongLong(redisClient *c, long long ll)  
    33. void addReplyMultiBulkLen(redisClient *c, long length)  
    34. void addReplyBulkLen(redisClient *c, robj *obj) /* 添加bulk 大块的数据的长度 */  
    35. void addReplyBulk(redisClient *c, robj *obj) /* 将一个obj的数据,拆分成大块数据的添加 */  
    36. void addReplyBulkCBuffer(redisClient *c, void *p, size_t len)  
    37. void addReplyBulkCString(redisClient *c, char *s)  
    38. void addReplyBulkLongLong(redisClient *c, long long ll)  
    39.   
    40. /* ------------- Client API -----------------   */   
    41. redisClient *createClient(int fd) /* 创建redisClient客户端,1.建立连接,2.设置数据库,3.属性设置 */  
    42. int prepareClientToWrite(redisClient *c) /* 此方法将会被调用于Client准备接受新数据之前调用,在fileEvent为客户端设定writer的handler处理事件 */  
    43. static void freeClientArgv(redisClient *c)  
    44. void freeClient(redisClient *c) /* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */  
    45. void freeClientAsync(redisClient *c)  
    46. void freeClientsInAsyncFreeQueue(void) /* 异步的free客户端 */  
    47. void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 将Client中的reply数据存入文件中 */  
    48. void resetClient(redisClient *c)  
    49. int processInlineBuffer(redisClient *c) /* 处理redis Client的内链的buffer,就是c->querybuf */  
    50. static void setProtocolError(redisClient *c, int pos)  
    51. int processMultibulkBuffer(redisClient *c) /* 处理大块的buffer */  
    52. void processInputBuffer(redisClient *c) /* 处理redisClient的查询buffer */  
    53. void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 从Client获取查询query语句 */  
    54. void getClientsMaxBuffers(unsigned long *longest_output_list,  
    55.                           unsigned long *biggest_input_buffer) /* 获取Client中输入buffer和输出buffer的最大长度值 */  
    56. void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) /* 格式化ip,port端口号的输出,ip:port */  
    57. int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) /* 获取Client客户端的ip,port地址信息 */  
    58. char *getClientPeerId(redisClient *c) /* 获取c->peerid客户端的地址信息 */  
    59. sds catClientInfoString(sds s, redisClient *client) /* 格式化的输出客户端的属性信息,直接返回一个拼接好的字符串 */  
    60. sds getAllClientsInfoString(void) /* 获取所有Client客户端的属性信息,并连接成一个总的字符串并输出 */  
    61. void clientCommand(redisClient *c) /* 执行客户端的命令的作法 */  
    62. void rewriteClientCommandVector(redisClient *c, int argc, ...) /* 重写客户端的命令集合,旧的命令集合的应用计数减1,新的Command  Vector的命令集合增1 */  
    63. void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) /* 重写Client中的第i个参数 */  
    64. unsigned long getClientOutputBufferMemoryUsage(redisClient *c) /* 获取Client中已经用去的输出buffer的大小 */  
    65. int getClientType(redisClient *c)  
    66. int getClientTypeByName(char *name) /* Client中的名字的3种类型,normal,slave,pubsub */  
    67. char *getClientTypeName(int class)  
    68. int checkClientOutputBufferLimits(redisClient *c) /* 判断Clint的输出缓冲区的已经占用大小是否超过软限制或是硬限制 */  
    69. void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) /* 异步的关闭Client,如果缓冲区中的软限制或是硬限制已经到达的时候,缓冲区超出限制的结果会导致释放不安全, */  
    复制代码

    我们从最简单的_addReplyToBuffer在缓冲区中添加回复数据开始说起,因为后面的各种addReply的方法都或多或少的调用了和这个歌方法。
    1. /* -----------------------------------------------------------------------------
    2. * Low level functions to add more data to output buffers.
    3. * -------------------------------------------------------------------------- */  
    4. /* 往客户端缓冲区中添加内容 */  
    5. int _addReplyToBuffer(redisClient *c, char *s, size_t len) {  
    6.     size_t available = sizeof(c->buf)-c->bufpos;  
    7.   
    8.     if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;  
    9.   
    10.     /* If there already are entries in the reply list, we cannot
    11.      * add anything more to the static buffer. */  
    12.      //如果当前的reply已经存在内容,则操作出错  
    13.     if (listLength(c->reply) > 0) return REDIS_ERR;  
    14.   
    15.     /* Check that the buffer has enough space available for this string. */  
    16.     if (len > available) return REDIS_ERR;  
    17.   
    18.     memcpy(c->buf+c->bufpos,s,len);  
    19.     c->bufpos+=len;  
    20.     return REDIS_OK;  
    21. }  
    复制代码

    最直接影响的一句话,就是memcpy(c->buf+c->bufpos,s,len);所以内容是加到c->buf中的,这也就是客户端的输出buffer,添加操作还有另外一种形式是添加对象类型:
    1. /* robj添加到reply的列表中 */  
    2. void _addReplyObjectToList(redisClient *c, robj *o) {  
    3.     robj *tail;  
    4.   
    5.     if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;  
    6.   
    7.     if (listLength(c->reply) == 0) {  
    8.         incrRefCount(o);  
    9.         //在回复列表汇总添加robj内容  
    10.         listAddNodeTail(c->reply,o);  
    11.         c->reply_bytes += zmalloc_size_sds(o->ptr);  
    12.     } else {  
    13.         tail = listNodeValue(listLast(c->reply));  
    14.   
    15.         /* Append to this object when possible. */  
    16.         if (tail->ptr != NULL &&  
    17.             sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)  
    18.         {  
    19.             c->reply_bytes -= zmalloc_size_sds(tail->ptr);  
    20.             tail = dupLastObjectIfNeeded(c->reply);  
    21.             tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));  
    22.             c->reply_bytes += zmalloc_size_sds(tail->ptr);  
    23.         } else {  
    24.             incrRefCount(o);  
    25.             listAddNodeTail(c->reply,o);  
    26.             c->reply_bytes += zmalloc_size_sds(o->ptr);  
    27.         }  
    28.     }  
    29.     asyncCloseClientOnOutputBufferLimitReached(c);  
    30. }  
    复制代码

    把robj对象加载reply列表中,并且改变reply的byte大小,最后还调用了一个asyncCloseClientOnOutputBufferLimitReached(c);方法,这个方法我是在这个文件的最底部找到的,一开始还真不知道什么意思,作用就是当添加完数据后,当客户端的输出缓冲的大小超出限制时,会被异步关闭:
    1. /* Asynchronously close a client if soft or hard limit is reached on the
    2. * output buffer size. The caller can check if the client will be closed
    3. * checking if the client REDIS_CLOSE_ASAP flag is set.
    4. *
    5. * Note: we need to close the client asynchronously because this function is
    6. * called from contexts where the client can't be freed safely, i.e. from the
    7. * lower level functions pushing data inside the client output buffers. */  
    8. /* 异步的关闭Client,如果缓冲区中的软限制或是硬限制已经到达的时候,缓冲区超出限制的结果会导致释放不安全, */  
    9. void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {  
    10.     redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));  
    11.     if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;  
    12.     if (checkClientOutputBufferLimits(c)) {  
    13.         sds client = catClientInfoString(sdsempty(),c);  
    14.   
    15.         freeClientAsync(c);  
    16.         redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);  
    17.         sdsfree(client);  
    18.     }  
    19. }  
    复制代码

    在addReply方法调用的时候,有时是需要一个前提的,我说的是在写数据事件发生的时候,你得先对写的文件创建一个监听事件:
    1. /* 在回复中添加Sds字符串 */  
    2. void addReplySds(redisClient *c, sds s) {  
    3.     //在调用添加操作之前,都要先执行prepareClientToWrite(c),设置文件事件的写事件  
    4.     if (prepareClientToWrite(c) != REDIS_OK) {  
    5.         /* The caller expects the sds to be free'd. */  
    6.         sdsfree(s);  
    7.         return;  
    8.     }  
    9.     if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {  
    10.         sdsfree(s);  
    11.     } else {  
    12.         /* This method free's the sds when it is no longer needed. */  
    13.         _addReplySdsToList(c,s);  
    14.     }  
    15. }  
    复制代码

    在这个prepareClientToWrite()里面是干嘛的呢?
    1. /* This function is called every time we are going to transmit new data
    2. * to the client. The behavior is the following:
    3. *
    4. * If the client should receive new data (normal clients will) the function
    5. * returns REDIS_OK, and make sure to install the write handler in our event
    6. * loop so that when the socket is writable new data gets written.
    7. *
    8. * If the client should not receive new data, because it is a fake client,
    9. * a master, a slave not yet online, or because the setup of the write handler
    10. * failed, the function returns REDIS_ERR.
    11. *
    12. * Typically gets called every time a reply is built, before adding more
    13. * data to the clients output buffers. If the function returns REDIS_ERR no
    14. * data should be appended to the output buffers. */  
    15. /* 此方法将会被调用于Client准备接受新数据之前调用,在fileEvent为客户端设定writer的handler处理事件 */  
    16. int prepareClientToWrite(redisClient *c) {  
    17.     if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;  
    18.     if ((c->flags & REDIS_MASTER) &&  
    19.         !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;  
    20.     if (c->fd <= 0) return REDIS_ERR; /* Fake client */  
    21.     if (c->bufpos == 0 && listLength(c->reply) == 0 &&  
    22.         (c->replstate == REDIS_REPL_NONE ||  
    23.          c->replstate == REDIS_REPL_ONLINE) &&  
    24.         //在这里创建写的文件事件  
    25.         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,  
    26.         sendReplyToClient, c) == AE_ERR) return REDIS_ERR;  
    27.     return REDIS_OK;  
    28. }  
    复制代码

    在addReply的方法里提到了一个addReplyBulk类型方法,Bulk的中文意思为大块的,说明addReplyBulk添加的都是一些比较大块的数据,找一个方法看看:
    1. /* Add a Redis Object as a bulk reply */  
    2. /* 将一个obj的数据,拆分成大块数据的添加 */  
    3. void addReplyBulk(redisClient *c, robj *obj) {  
    4.     //reply添加长度  
    5.     addReplyBulkLen(c,obj);  
    6.     //reply添加对象  
    7.     addReply(c,obj);  
    8.     addReply(c,shared.crlf);  
    9. }  
    复制代码

    将原本一个robj的数据拆分成可3个普通的addReply的方法调用。就变成了数据量变大了的数据。大数据的回复一个比较不好的地方是到时解析的时候或者是Data的复制的时候会比较耗时。在networking的方法里还提供了freeClient()的操作:
    1. /* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */  
    2. void freeClient(redisClient *c) {  
    3.     listNode *ln;  
    4.   
    5.     /* If this is marked as current client unset it */  
    6.     if (server.current_client == c) server.current_client = NULL;  
    7.   
    8.     /* If it is our master that's beging disconnected we should make sure
    9.      * to cache the state to try a partial resynchronization later.
    10.      *
    11.      * Note that before doing this we make sure that the client is not in
    12.      * some unexpected state, by checking its flags. */  
    13.     if (server.master && c->flags & REDIS_MASTER) {  
    14.         redisLog(REDIS_WARNING,"Connection with master lost.");  
    15.         if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|  
    16.                           REDIS_CLOSE_ASAP|  
    17.                           REDIS_BLOCKED|  
    18.                           REDIS_UNBLOCKED)))  
    19.         {  
    20.             //如果是Master客户端,需要做缓存Client的处理,可以迅速重新启用  
    21.             replicationCacheMaster(c);  
    22.             return;  
    23.         }  
    24.     }  
    复制代码

    ...后面代码略去了

       当Client中的输出buffer数据渐渐变多了的时候就要准备持久化到磁盘文件了,要调用下面这个方法了,

    1. /* Helper function used by freeMemoryIfNeeded() in order to flush slave
    2. * output buffers without returning control to the event loop. */  
    3. /* 从方法将会在freeMemoryIfNeeded(),释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */  
    4. void flushSlavesOutputBuffers(void) {  
    5.     listIter li;  
    6.     listNode *ln;  
    7.   
    8.     listRewind(server.slaves,&li);  
    9.     while((ln = listNext(&li))) {  
    10.         redisClient *slave = listNodeValue(ln);  
    11.         int events;  
    12.   
    13.         events = aeGetFileEvents(server.el,slave->fd);  
    14.         if (events & AE_WRITABLE &&  
    15.             slave->replstate == REDIS_REPL_ONLINE &&  
    16.             listLength(slave->reply))  
    17.         {  
    18.             //在这里调用了write的方法  
    19.             sendReplyToClient(server.el,slave->fd,slave,0);  
    20.         }  
    21.     }  
    22. }  
    复制代码

    这个方法的核心调用又在sendReplyToClient()方法,就是把Client的reply内容和buf内容存入文件。以上就是我的理解了,代码量有点大,的确看的我头有点大。


    转自:http://blog.csdn.net/androidlushangderen/article/details/40541517
    上一篇:Redis源码分析(二十一)--- anet网络通信的封装
    下一篇:Redis源码分析(二十三)--- CRC循环冗余算法和RAND随机数算法



    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    阿里云
    阿里云

    Archiver|手机版|小黑屋|Redis中国用户组 ( 京ICP备15003959号

    GMT+8, 2017-3-29 15:20 , Processed in 0.116193 second(s), 32 queries .

    Powered by Discuz! X3.2

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表