请选择 进入手机版 | 继续访问电脑版

Redis中国用户组(CRUG)论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

搜索
热搜: 活动 交友 discuz
查看: 594|回复: 0

Redis源码分析(三十)--- pubsub发布订阅模式

[复制链接]
  • TA的每日心情
    开心
    2016-10-27 12:14
  • 签到天数: 89 天

    [LV.6]常住居民II

    357

    主题

    456

    帖子

    3590

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    3590

    最佳新人活跃会员宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2016-4-13 15:53:10 | 显示全部楼层 |阅读模式

    今天学习了Redis中比较高大上的名词,“发布订阅模式”,发布订阅模式这个词在我最开始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说,就是我订阅了这类消息,当只有这类的消息进行广播发送的时候,我才会,其他的消息直接过滤,保证了一个高效的传输效率。下面切入正题,学习一下Redis是如何实现这个发布订阅模式的。先看看里面的简单的API构造;

    1. <span style="background-color: inherit; line-height: 26px;"><font face="Arial">/*-----------------------------------------------------------------------------
    2. * Pubsub low level API
    3. *----------------------------------------------------------------------------*/  
    4. void freePubsubPattern(void *p) /* 释放发布订阅的模式 */  
    5. int listMatchPubsubPattern(void *a, void *b) /* 发布订阅模式是否匹配 */  
    6. int clientSubscriptionsCount(redisClient *c) /* 返回客户端的所订阅的数量,包括channels + patterns管道和模式 */  
    7. int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */  
    8. int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */  
    9. int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Client客户端订阅一种模式 */  
    10. int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Client客户端取消订阅pattern模式 */  
    11. int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* 客户端取消自身订阅的所有Channel */  
    12. int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* 客户端取消订阅所有的pattern模式 */  
    13. int pubsubPublishMessage(robj *channel, robj *message) /* 为所有订阅了Channel的Client发送消息message */  
    14.   
    15. /* ------------PUB/SUB API ---------------- */  
    16. void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */  
    17. void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */  
    18. void psubscribeCommand(redisClient *c) /* 订阅模式命令 */  
    19. void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */  
    20. void publishCommand(redisClient *c) /* 发布消息命令 */  
    21. void pubsubCommand(redisClient *c) /* 发布订阅命令 */  </font></span><span style="font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px; font-size: 9px; background-color: inherit;"> </span>
    复制代码

    在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比较别扭),也就是说,后续所有的关于发布订阅的东东都是基于这2者展开进行的。现在大致讲解一下在Redis中是如何实现此中模式的:

    1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道

    2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients

    3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。

        我们可以简单的看一下Redis订阅一个Channel的方法实现;

    1. /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
    2. * 0 if the client was already subscribed to that channel. */  
    3. /* Client订阅一个Channel管道 */  
    4. int pubsubSubscribeChannel(redisClient *c, robj *channel) {  
    5.     struct dictEntry *de;  
    6.     list *clients = NULL;  
    7.     int retval = 0;  
    8.   
    9.     /* Add the channel to the client -> channels hash table */  
    10.     //在Client的字典pubsub_channels中添加Channel  
    11.     if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {  
    12.         retval = 1;  
    13.         incrRefCount(channel);  
    14.         /* Add the client to the channel -> list of clients hash table */  
    15.         //添加Clietn到server中的pubsub_channels,对应的列表中  
    16.         de = dictFind(server.pubsub_channels,channel);  
    17.         if (de == NULL) {  
    18.             //如果此频道的Client列表为空,则创建新列表并添加  
    19.             clients = listCreate();  
    20.             dictAdd(server.pubsub_channels,channel,clients);  
    21.             incrRefCount(channel);  
    22.         } else {  
    23.             //否则,获取这个频道的客户端列表,在尾部添加新的客户端  
    24.             clients = dictGetVal(de);  
    25.         }  
    26.         listAddNodeTail(clients,c);  
    27.     }  
    28.     /* Notify the client */  
    29.     //添加给回复客户端  
    30.     addReply(c,shared.mbulkhdr[3]);  
    31.     addReply(c,shared.subscribebulk);  
    32.     addReplyBulk(c,channel);  
    33.     addReplyLongLong(c,clientSubscriptionsCount(c));  
    34.     return retval;  
    35. }  
    复制代码
    添加操作主要分2部,Client自身的内部维护的pubsub_channels的添加,是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的添加。在进行Channel频道的删除的时候,也是执行的这2步骤操作:
    1. /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
    2. * 0 if the client was not subscribed to the specified channel. */  
    3. /* 取消订阅Client中的Channel */  
    4. int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {  
    5.     struct dictEntry *de;  
    6.     list *clients;  
    7.     listNode *ln;  
    8.     int retval = 0;  
    9.   
    10.     /* Remove the channel from the client -> channels hash table */  
    11.     incrRefCount(channel); /* channel may be just a pointer to the same object
    12.                             we have in the hash tables. Protect it... */  
    13.     //字典删除Client中pubsub_channels中的Channel  
    14.     if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {  
    15.         retval = 1;  
    16.         /* Remove the client from the channel -> clients list hash table */  
    17.         //再移除Channel对应的Client列表  
    18.         de = dictFind(server.pubsub_channels,channel);  
    19.         redisAssertWithInfo(c,NULL,de != NULL);  
    20.         clients = dictGetVal(de);  
    21.         ln = listSearchKey(clients,c);  
    22.         redisAssertWithInfo(c,NULL,ln != NULL);  
    23.         listDelNode(clients,ln);  
    24.         if (listLength(clients) == 0) {  
    25.             /* Free the list and associated hash entry at all if this was
    26.              * the latest client, so that it will be possible to abuse
    27.              * Redis PUBSUB creating millions of channels. */  
    28.             dictDelete(server.pubsub_channels,channel);  
    29.         }  
    30.     }  
    31.     /* Notify the client */  
    32.     if (notify) {  
    33.         addReply(c,shared.mbulkhdr[3]);  
    34.         addReply(c,shared.unsubscribebulk);  
    35.         addReplyBulk(c,channel);  
    36.         addReplyLongLong(c,dictSize(c->pubsub_channels)+  
    37.                        listLength(c->pubsub_patterns));  
    38.   
    39.     }  
    40.     decrRefCount(channel); /* it is finally safe to release it */  
    41.     return retval;  
    42. }  
    复制代码

    里面还有对应的模式的订阅和取消订阅的操作,原理和channel完全一致,二者的区别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:
    1. /* Publish a message */  
    2. /* 为所有订阅了Channel的Client发送消息message */  
    3. int pubsubPublishMessage(robj *channel, robj *message) {  
    4.     int receivers = 0;  
    5.     struct dictEntry *de;  
    6.     listNode *ln;  
    7.     listIter li;  
    8.   
    9.     /* Send to clients listening for that channel */  
    10.     //找到Channel所对应的dictEntry  
    11.     de = dictFind(server.pubsub_channels,channel);  
    12.     if (de) {  
    13.         //获取此Channel对应的客户单列表  
    14.         list *list = dictGetVal(de);  
    15.         listNode *ln;  
    16.         listIter li;  
    17.   
    18.         listRewind(list,&li);  
    19.         while ((ln = listNext(&li)) != NULL) {  
    20.             //依次取出List中的客户单,添加消息回复  
    21.             redisClient *c = ln->value;  
    22.   
    23.             addReply(c,shared.mbulkhdr[3]);  
    24.             addReply(c,shared.messagebulk);  
    25.             addReplyBulk(c,channel);  
    26.             //添加消息回复  
    27.             addReplyBulk(c,message);  
    28.             receivers++;  
    29.         }  
    30.     }  
    31.     /* Send to clients listening to matching channels */  
    32.     /* 发送给尝试匹配该Channel的客户端消息 */  
    33.     if (listLength(server.pubsub_patterns)) {  
    34.         listRewind(server.pubsub_patterns,&li);  
    35.         channel = getDecodedObject(channel);  
    36.         while ((ln = listNext(&li)) != NULL) {  
    37.             pubsubPattern *pat = ln->value;  
    38.               
    39.             //客户端的模式如果匹配了Channel,也会发送消息  
    40.             if (stringmatchlen((char*)pat->pattern->ptr,  
    41.                                 sdslen(pat->pattern->ptr),  
    42.                                 (char*)channel->ptr,  
    43.                                 sdslen(channel->ptr),0)) {  
    44.                 addReply(pat->client,shared.mbulkhdr[4]);  
    45.                 addReply(pat->client,shared.pmessagebulk);  
    46.                 addReplyBulk(pat->client,pat->pattern);  
    47.                 addReplyBulk(pat->client,channel);  
    48.                 addReplyBulk(pat->client,message);  
    49.                 receivers++;  
    50.             }  
    51.         }  
    52.         decrRefCount(channel);  
    53.     }  
    54.     return receivers;  
    55. }  
    复制代码

    pattern的作用就在上面体现了,如果某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每一个pattern只对应一个Client,就是上面的pat->client,这一点和Channel还是有本质的区别的。讲完发布订阅模式的基本操作后,顺便把与此相关的notify通知类也稍稍讲讲,通知只有3个方法,
    1. /* ----------------- API ------------------- */  
    2. int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为对应的Class类型 */  
    3. sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/  
    4. void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */  
    复制代码

    涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;
    1. /* Turn a string representing notification classes into an integer
    2. * representing notification classes flags xored.
    3. *
    4. * The function returns -1 if the input contains characters not mapping to
    5. * any class. */  
    6. /* 键值字符类型转为对应的Class类型 */  
    7. int keyspaceEventsStringToFlags(char *classes) {  
    8.     char *p = classes;  
    9.     int c, flags = 0;  
    10.   
    11.     while((c = *p++) != '\0') {  
    12.         switch(c) {  
    13.         case 'A': flags |= REDIS_NOTIFY_ALL; break;  
    14.         case 'g': flags |= REDIS_NOTIFY_GENERIC; break;  
    15.         case '
    16. [color=#000][font=Arial]应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法还有一个event事件的通知方法:[/font][/color]
    17. [code]/* The API provided to the rest of the Redis core is a simple function:
    18. *
    19. * notifyKeyspaceEvent(char *event, robj *key, int dbid);
    20. *
    21. * 'event' is a C string representing the event name.
    22. * 'key' is a Redis object representing the key name.
    23. * 'dbid' is the database ID where the key lives.  */  
    24. /* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */   
    25. void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {  
    26.     sds chan;  
    27.     robj *chanobj, *eventobj;  
    28.     int len = -1;  
    29.     char buf[24];  
    30.   
    31.     /* If notifications for this class of events are off, return ASAP. */  
    32.     if (!(server.notify_keyspace_events & type)) return;  
    33.   
    34.     eventobj = createStringObject(event,strlen(event));  
    35.       
    36.     //2种的通知形式,略有差别  
    37.     /* __keyspace@<db>__:<key> <event> notifications. */  
    38.     if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {  
    39.         chan = sdsnewlen("__keyspace@",11);  
    40.         len = ll2string(buf,sizeof(buf),dbid);  
    41.         chan = sdscatlen(chan, buf, len);  
    42.         chan = sdscatlen(chan, "__:", 3);  
    43.         chan = sdscatsds(chan, key->ptr);  
    44.         chanobj = createObject(REDIS_STRING, chan);  
    45.         //上述几步操作,组件格式字符串,最后发布消息,下面keyEvent的通知同理  
    46.         pubsubPublishMessage(chanobj, eventobj);  
    47.         decrRefCount(chanobj);  
    48.     }  
    49.   
    50.     /* __keyevente@<db>__:<event> <key> notifications. */  
    51.     if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {  
    52.         chan = sdsnewlen("__keyevent@",11);  
    53.         if (len == -1) len = ll2string(buf,sizeof(buf),dbid);  
    54.         chan = sdscatlen(chan, buf, len);  
    55.         chan = sdscatlen(chan, "__:", 3);  
    56.         chan = sdscatsds(chan, eventobj->ptr);  
    57.         chanobj = createObject(REDIS_STRING, chan);  
    58.         pubsubPublishMessage(chanobj, key);  
    59.         decrRefCount(chanobj);  
    60.     }  
    61.     decrRefCount(eventobj);  
    62. }  
    复制代码

    有keySpace和keyEvent的2种事件通知。具体怎么用,等后面碰到的时候在看看。


    转自:http://blog.csdn.net/androidlushangderen/article/details/40780019
    上一篇:Redis源码分析(二十九)--- bio后台I/O服务的实现
    下一篇:

    : flags |= REDIS_NOTIFY_STRING; break;  
            case 'l': flags |= REDIS_NOTIFY_LIST; break;  
            case 's': flags |= REDIS_NOTIFY_SET; break;  
            case 'h': flags |= REDIS_NOTIFY_HASH; break;  
            case 'z': flags |= REDIS_NOTIFY_ZSET; break;  
            case 'x': flags |= REDIS_NOTIFY_EXPIRED; break;  
            case 'e': flags |= REDIS_NOTIFY_EVICTED; break;  
            case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break;  
            case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break;  
            default: return -1;  
            }  
        }  
        return flags;  
    }  [/code]
    应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法还有一个event事件的通知方法:
    [        DISCUZ_CODE_25        ]
    有keySpace和keyEvent的2种事件通知。具体怎么用,等后面碰到的时候在看看。


    转自:http://blog.csdn.net/androidlushangderen/article/details/40780019
    上一篇:Redis源码分析(二十九)--- bio后台I/O服务的实现
    下一篇:Redis源码分析(三十一)--- latency延迟分析处理

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    阿里云
    阿里云

    Archiver|手机版|小黑屋|Redis中国用户组 ( 京ICP备15003959号

    GMT+8, 2017-1-17 05:45 , Processed in 0.106943 second(s), 32 queries .

    Powered by Discuz! X3.2

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表