请选择 进入手机版 | 继续访问电脑版

Redis中国用户组(CRUG)论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

搜索
热搜: 活动 交友 discuz
查看: 465|回复: 0

Redis源码分析(二十)--- ae事件驱动

[复制链接]
  • TA的每日心情
    开心
    2017-3-20 10:39
  • 签到天数: 90 天

    [LV.6]常住居民II

    358

    主题

    462

    帖子

    3650

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    3650

    最佳新人活跃会员宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2016-4-7 10:22:25 | 显示全部楼层 |阅读模式

    事件驱动这个名词出现的越来越频繁了,听起来非常高大上,今天本人把Redis内部的驱动模型研究了一番,感觉收获颇丰啊。一个ae.c主程序,加上4个事件类型的文件,让你彻底弄清楚,Redis是如何处理这些事件的。在Redis的事件处理中,用到了epoll,select,kqueue和evport,evport可能大家会陌生许多。前面3个都是非常常见的事件,在libevent的事件网络库中也都有出现。作者在写这个事件驱动模型的时候,也说了,这只是为了简单的复用了,设计的一个小小的处理模型:


    1. <pre name="code" class="cpp" style="white-space: pre-wrap;">/* A simple event-driven programming library. Originally I wrote this code
    2. * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
    3. * it in form of a library for easy reuse.
    4. *
    5. * ae是作者写的一个简单的事件驱动库,后面进行了转化,变得更为简单的复用</pre><p><span style="font-family: Arial;">所以不是很复杂。在了解整个事件驱动的模型前,有先了解一些定义的事件结构体,事件类型总共2个一个FileEvent,TimeEvent:</span></p><p style="font-family: Arial;"></p><p style="font-family: Arial;"></p><pre name="code" class="cpp" style="white-space: pre-wrap;">/* File event structure */
    6. /* 文件事件结构体 */
    7. typedef struct aeFileEvent {
    8.         //只为读事件或者写事件中的1种
    9.     int mask; /* one of AE_(READABLE|WRITABLE) */
    10.     //读方法
    11.     aeFileProc *rfileProc;
    12.     //写方法
    13.     aeFileProc *wfileProc;
    14.     //客户端数据
    15.     void *clientData;
    16. } aeFileEvent;

    17. /* Time event structure */
    18. /* 时间事件结构体 */
    19. typedef struct aeTimeEvent {
    20.         //时间事件id
    21.     long long id; /* time event identifier. */
    22.     //时间秒数
    23.     long when_sec; /* seconds */
    24.     //时间毫秒
    25.     long when_ms; /* milliseconds */
    26.     //时间事件中的处理函数
    27.     aeTimeProc *timeProc;
    28.     //被删除的时候将会调用的方法
    29.     aeEventFinalizerProc *finalizerProc;
    30.     //客户端数据
    31.     void *clientData;
    32.     //时间结构体内的下一个结构体
    33.     struct aeTimeEvent *next;
    34. } aeTimeEvent;

    35. /* A fired event */
    36. /* fired结构体,用来表示将要被处理的文件事件 */
    37. typedef struct aeFiredEvent {
    38.         //文件描述符id
    39.     int fd;
    40.     int mask;
    41. } aeFiredEvent;</pre>
    复制代码

    FireEvent只是用来标记要处理的文件Event。


    这些事件都存在于一个aeEventLoop的结构体内:


    1. /* State of an event based program */
    2. typedef struct aeEventLoop {
    3.         //目前创建的最高的文件描述符
    4.     int maxfd;   /* highest file descriptor currently registered */
    5.     int setsize; /* max number of file descriptors tracked */
    6.     //下一个时间事件id
    7.     long long timeEventNextId;
    8.     time_t lastTime;     /* Used to detect system clock skew */
    9.     //3种事件类型
    10.     aeFileEvent *events; /* Registered events */
    11.     aeFiredEvent *fired; /* Fired events */
    12.     aeTimeEvent *timeEventHead;
    13.     //事件停止标志符
    14.     int stop;
    15.     //这里存放的是event API的数据,包括epoll,select等事件
    16.     void *apidata; /* This is used for polling API specific data */
    17.     aeBeforeSleepProc *beforesleep;
    18. } aeEventLoop;
    复制代码

    在每种事件内部,都有定义相应的处理函数,把函数当做变量一样存在结构体中。下面看下ae.c中的一些API的组成:



    1. /* Prototypes */
    2. aeEventLoop *aeCreateEventLoop(int setsize); /* 创建aeEventLoop,内部的fileEvent和Fired事件的个数为setSize个 */
    3. void aeDeleteEventLoop(aeEventLoop *eventLoop); /* 删除EventLoop,释放相应的事件所占的空间 */
    4. void aeStop(aeEventLoop *eventLoop); /* 设置eventLoop中的停止属性为1 */
    5. int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    6.         aeFileProc *proc, void *clientData); /* 在eventLoop中创建文件事件 */
    7. void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); /* 删除文件事件 */
    8. int aeGetFileEvents(aeEventLoop *eventLoop, int fd); //根据文件描述符id,找出文件的属性,是读事件还是写事件
    9. long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
    10.         aeTimeProc *proc, void *clientData,
    11.         aeEventFinalizerProc *finalizerProc); /* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */
    12. int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); //根据时间id,删除时间事件,涉及链表的操作
    13. int aeProcessEvents(aeEventLoop *eventLoop, int flags); /* 处理eventLoop中的所有类型事件 */
    14. int aeWait(int fd, int mask, long long milliseconds); /* 让某事件等待 */
    15. void aeMain(aeEventLoop *eventLoop); /* ae事件执行主程序 */
    16. char *aeGetApiName(void);
    17. void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); /* 每次eventLoop事件执行完后又重新开始执行时调用 */
    18. int aeGetSetSize(aeEventLoop *eventLoop); /* 获取eventLoop的大小 */
    19. int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); /* EventLoop重新调整大小 */
    复制代码

    无非涉及一些文件,时间事件的添加,修改等,都是在eventLoop内部的修改,我们来看下最主要,最核心的方法:



    1. /* ae事件执行主程序 */
    2. void aeMain(aeEventLoop *eventLoop) {
    3.     eventLoop->stop = 0;
    4.     //如果eventLoop中的stop标志位不为1,就循环处理
    5.     while (!eventLoop->stop) {
    6.             //每次eventLoop事件执行完后又重新开始执行时调用
    7.         if (eventLoop->beforesleep != NULL)
    8.             eventLoop->beforesleep(eventLoop);
    9.         //while循环处理所有的evetLoop的事件
    10.         aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    11.     }
    12. }
    复制代码

    道理很简单通过,while循环,处理eventLoop中的所有类型事件,截取部分processEvents()代码:



    1. numevents = aeApiPoll(eventLoop, tvp);
    2.         for (j = 0; j < numevents; j++) {
    3.             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    4.             int mask = eventLoop->fired[j].mask;
    5.             int fd = eventLoop->fired[j].fd;
    6.             int rfired = 0;

    7.             /* note the fe->mask & mask & ... code: maybe an already processed
    8.              * event removed an element that fired and we still didn't
    9.              * processed, so we check if the event is still valid. */
    10.             if (fe->mask & mask & AE_READABLE) {
    11.                 rfired = 1;
    12.                 //根据掩码计算判断是否为ae读事件,调用时间中的读的处理方法
    13.                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    14.             }
    15.             if (fe->mask & mask & AE_WRITABLE) {
    16.                 if (!rfired || fe->wfileProc != fe->rfileProc)
    17.                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    18.             }
    19.             processed++;
    20.         }
    21.     }
    复制代码

    ae中创建时间事件都是以当前时间为基准创建的;



    1. /* 在eventLoop中添加时间事件,创建的时间为当前时间加上自己传入的时间 */
    2. long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
    3.         aeTimeProc *proc, void *clientData,
    4.         aeEventFinalizerProc *finalizerProc)
    5. {
    6.     long long id = eventLoop->timeEventNextId++;
    7.     aeTimeEvent *te;

    8.     te = zmalloc(sizeof(*te));
    9.     if (te == NULL) return AE_ERR;
    10.     te->id = id;
    11.     aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    12.     te->timeProc = proc;
    13.     te->finalizerProc = finalizerProc;
    14.     te->clientData = clientData;
    15.     //新加的变为timeEvent的头部
    16.     te->next = eventLoop->timeEventHead;
    17.     eventLoop->timeEventHead = te;
    18.    
    19.     //返回新创建的时间事件的id
    20.     return id;
    21. }
    复制代码

        下面说说如何调用事件API库里的方法呢。首先隆重介绍什么是epoll,poll,select,kqueu和evport。这些都是一种事件模型。


    select事件的模型

    (1)创建所关注的事件的描述符集合(fd_set),对于一个描述符,可以关注其上面的读(read)、写(write)、异常(exception)事件,所以通常,要创建三个fd_set, 一个用来收集关注读事件的描述符,一个用来收集关注写事件的描述符,另外一个用来收集关注 异常事件的描述符集合。
    (2)轮询所有fd_set中的每一个fd ,检查是否有相应的事件发生,如果有,就进行处理。
       
    poll和上面的区别是可以复用文件描述符,上面对一个文件需要轮询3个文件描述符集合,而poll只需要一个,效率更高
    epoll是poll的升级版本,把描述符列表交给内核,一旦有事件发生,内核把发生事件的描述符列表通知给进程,这样就避免了轮询整个描述符列表。效率极大提高

    evport这个出现的比较少,大致意思是evport将某一个对象的特定 event 与 Event port 相关联:

    在了解了3种事件模型的原理之后,我们看看ae.c在Redis中是如何调用的呢,


    //这里存放的是event API的数据,包括epoll,select等事件   
    1. void *apidata; /* This is used for polling API specific data */
    复制代码

    就是上面这个属性,在上面的4种事件中,分别对应着3个文件,分别为ae_poll.c,ae_select.c,但是他们的API结构是类似的,我举其中一个例子,epoll的例子,首先都会有此事件特定的结构体:



    1. typedef struct aeApiState {
    2.     int epfd;
    3.     struct epoll_event *events;
    4. } aeApiState;
    复制代码

    还有共同套路的模板方法:



    1. static int aeApiCreate(aeEventLoop *eventLoop)
    2. static int aeApiResize(aeEventLoop *eventLoop, int setsize)
    3. static void aeApiFree(aeEventLoop *eventLoop)
    4. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
    5. static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
    6. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
    7. static char *aeApiName(void)
    复制代码

    在创建的时候赋值到eventloop的API data里面去:




    1. <pre name="code" class="cpp" style="white-space: pre-wrap;"> state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    2.     if (state->epfd == -1) {
    3.         zfree(state->events);
    4.         zfree(state);
    5.         return -1;
    6.     }
    7.     //最后将state的数据赋值到eventLoop的API data中
    8.     eventLoop->apidata = state;
    9.     return 0;</pre><pre name="code" class="cpp" style="white-space: pre-wrap;"></pre>
    复制代码

    在取出事件的poll方法的时候是这些方法的一个区分点:



    1. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    2.     aeApiState *state = eventLoop->apidata;
    3.     int retval, numevents = 0;

    4.     retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
    5.             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    6.     if (retval > 0) {
    7. .....
    复制代码

    而在select中的poll方法是这样的:



    1. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    2.     aeApiState *state = eventLoop->apidata;
    3.     int retval, j, numevents = 0;

    4.     memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    5.     memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    6.     retval = select(eventLoop->maxfd+1,
    7.                 &state->_rfds,&state->_wfds,NULL,tvp);
    8. ......
    复制代码

    最后都是基于state中的事件和eventLoop之间的转化实现操作。传入eventLoop中的信息,传入state的信息,经过内部的处理得出终的事件结果。调用就这么简单。


    转自:http://blog.csdn.net/androidlushangderen/article/details/40474815
    上一篇:Redis源码分析(十九)--- replication主从数据复制的实现
    下一篇:Redis源码分析(二十一)--- anet网络通信的封装
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    阿里云
    阿里云

    Archiver|手机版|小黑屋|Redis中国用户组 ( 京ICP备15003959号

    GMT+8, 2017-3-29 15:20 , Processed in 0.110971 second(s), 32 queries .

    Powered by Discuz! X3.2

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表