请选择 进入手机版 | 继续访问电脑版

Redis中国用户组(CRUG)论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

搜索
热搜: 活动 交友 discuz
查看: 657|回复: 0

Redis源码分析(二十九)--- bio后台I/O服务的实现

[复制链接]
  • TA的每日心情
    开心
    2016-10-27 12:14
  • 签到天数: 89 天

    [LV.6]常住居民II

    357

    主题

    456

    帖子

    3594

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    3594

    最佳新人活跃会员宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2016-4-12 10:35:48 | 显示全部楼层 |阅读模式

    在Redis系统中也存在后台服务的概念,background Service,后台线程在Redis中的表现主要为background I/O Service,有了后台线程的支持,系统在执行的效率上也势必会有不一样的提高。在Redis代码中,描述了此功能的文件为bio.c,同样借此机会学习一下,在C语言中的多线程编程到底是怎么一回事。我们先来看看,在Redis中的background job的工作形式;

    1. /* Background I/O service for Redis.  
    2. *  
    3. * 后台I/O服务  
    4. * This file implements operations that we need to perform in the background.  
    5. * Currently there is only a single operation, that is a background close(2)  
    6. * system call. This is needed as when the process is the last owner of a  
    7. * reference to a file closing it means unlinking it, and the deletion of the  
    8. * file is slow, blocking the server.  
    9. *  
    10. * In the future we'll either continue implementing new things we need or  
    11. * we'll switch to libeio. However there are probably long term uses for this  
    12. * file as we may want to put here Redis specific background tasks (for instance  
    13. * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL  
    14. * implementation).  
    15. *  
    16. * DESIGN  
    17. * ------  
    18. *  
    19. * The design is trivial, we have a structure representing a job to perform  
    20. * and a different thread and job queue for every job type.  
    21. * Every thread wait for new jobs in its queue, and process every job  
    22. * sequentially.  
    23. *  
    24. * Jobs of the same type are guaranteed to be processed from the least  
    25. * recently inserted to the most recently inserted (older jobs processed  
    26. * first).  
    27. *  
    28. * Currently there is no way for the creator of the job to be notified about  
    29. * the completion of the operation, this will only be added when/if needed.  
    30. *  
    31. * 作者定义了一个结构体代表一个工作,每个线程等待从相应的job Type工作队列中获取一个job,每个job的排列的都按照时间  
    32. * 有序排列的  
    33. * ----------------------------------------------------------------------------
    复制代码

    这里总共与2种Background I/O Type:
    1. /* Background job opcodes */  
    2. /* 定义了2种后台工作的类别 */  
    3. #define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall.文件的关闭 */  
    4. #define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync.AOF文件的同步 */   
    5. /* BIO后台操作类型总数为2个 */  
    6. #define REDIS_BIO_NUM_OPS       2  
    复制代码

    一个是AOF文件的同步操作,AOF就是“Append ONLY File”的缩写,记录每次的数据改变的写操作,用于数据的恢复。还有一个我好像没碰到过,CLOSE FILE,难道是异步关闭文件的意思。
    1. static pthread_t bio_threads[REDIS_BIO_NUM_OPS]; /* 定义了bio线程组变量 */  
    2. static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; /* 线程相对应的mutex变量,用于同步操作 */  
    3. static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];  
    4. static list *bio_jobs[REDIS_BIO_NUM_OPS]; /* 每种job类型都是一个列表 */  
    5. /* The following array is used to hold the number of pending jobs for every
    6. * OP type. This allows us to export the bioPendingJobsOfType() API that is
    7. * useful when the main thread wants to perform some operation that may involve
    8. * objects shared with the background thread. The main thread will just wait
    9. * that there are no longer jobs of this type to be executed before performing
    10. * the sensible operation. This data is also useful for reporting. */  
    11. static unsigned long long bio_pending[REDIS_BIO_NUM_OPS];   /* 此类型job等待执行的数量 */  
    12.   
    13. /* This structure represents a background Job. It is only used locally to this
    14. * file as the API does not expose the internals at all. */  
    15. /* background Job结构体 */  
    16. struct bio_job {  
    17.     //job创建的时间  
    18.     time_t time; /* Time at which the job was created. */  
    19.     /* Job specific arguments pointers. If we need to pass more than three
    20.      * arguments we can just pass a pointer to a structure or alike. */  
    21.     /* job特定参数指针 */  
    22.     void *arg1, *arg2, *arg3;  
    23. };  
    复制代码

    上面声明了一些变量,包括bio_threads线程数组,总数2个,bio_jobs列表数组,存放每种Type的job。下面我们看主要的一些方法:
    1. /* Exported API */  
    2. void bioInit(void); /* background I/O初始化操作 */  
    3. void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); /* 创建后台job,通过传入的3个参数初始化 */  
    4. unsigned long long bioPendingJobsOfType(int type); /* 返回type类型的job正在等待被执行的个数 */  
    5. void bioWaitPendingJobsLE(int type, unsigned long long num); /* 返回type类型的job正在等待被执行的个数 */  
    6. time_t bioOlderJobOfType(int type);   
    7. void bioKillThreads(void); /* 杀死后台所有线程 */  
    复制代码

    首先看初始化操作;
    1. /* Initialize the background system, spawning the thread. */  
    2. /* background I/O初始化操作 */  
    3. void bioInit(void) {  
    4.     pthread_attr_t attr;  
    5.     pthread_t thread;  
    6.     size_t stacksize;  
    7.     int j;  
    8.   
    9.     /* Initialization of state vars and objects */  
    10.     for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {  
    11.         pthread_mutex_init(&bio_mutex[j],NULL);  
    12.         pthread_cond_init(&bio_condvar[j],NULL);  
    13.         //创建每个job类型的List列表  
    14.         bio_jobs[j] = listCreate();  
    15.         bio_pending[j] = 0;  
    16.     }  
    17.   
    18.     /* Set the stack size as by default it may be small in some system */  
    19.     //设置线程栈空间  
    20.     pthread_attr_init(&attr);  
    21.     pthread_attr_getstacksize(&attr,&stacksize);  
    22.     if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */  
    23.     while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;  
    24.     pthread_attr_setstacksize(&attr, stacksize);  
    25.   
    26.     /* Ready to spawn our threads. We use the single argument the thread
    27.      * function accepts in order to pass the job ID the thread is
    28.      * responsible of. */  
    29.     for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {  
    30.         void *arg = (void*)(unsigned long) j;  
    31.         //创建2个线程,专门运行相应类型的job  
    32.         if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {  
    33.             redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");  
    34.             exit(1);  
    35.         }  
    36.         //赋值到相应的Thread中  
    37.         bio_threads[j] = thread;  
    38.     }  
    39. }  
    复制代码

    也就是说,执行完上述的操作之后,在bio_threads线程中就运行着2个线程,从各自的job列表中取出相应的等待执行的jo;
    1. /* 创建后台job,通过传入的3个参数初始化 */  
    2. void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {  
    3.     struct bio_job *job = zmalloc(sizeof(*job));  
    4.   
    5.     job->time = time(NULL);  
    6.     job->arg1 = arg1;  
    7.     job->arg2 = arg2;  
    8.     job->arg3 = arg3;  
    9.     pthread_mutex_lock(&bio_mutex[type]);  
    10.     //加入相对应的job type列表  
    11.     listAddNodeTail(bio_jobs[type],job);  
    12.     //等待的job数量增加1  
    13.     bio_pending[type]++;  
    14.     pthread_cond_signal(&bio_condvar[type]);  
    15.     pthread_mutex_unlock(&bio_mutex[type]);  
    16. }  
    复制代码

    简洁的创建background job操作,上面利用了mutex变量实现了线程同步操作,保证线程安全。下面看一下最重要的执行background Job的操作实现(省略了部分代码):
    1. /* 执行后台的job,参数内包含着哪种type */  
    2. void *bioProcessBackgroundJobs(void *arg) {  
    3.     ......  
    4.     while(1) {  
    5.         listNode *ln;  
    6.   
    7.         /* The loop always starts with the lock hold. */  
    8.         if (listLength(bio_jobs[type]) == 0) {  
    9.             pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);  
    10.             continue;  
    11.         }  
    12.         /* Pop the job from the queue. */  
    13.         //从工作列表中取出第一个job  
    14.         ln = listFirst(bio_jobs[type]);  
    15.         job = ln->value;  
    16.         /* It is now possible to unlock the background system as we know have
    17.          * a stand alone job structure to process.*/  
    18.         pthread_mutex_unlock(&bio_mutex[type]);  
    19.   
    20.         /* Process the job accordingly to its type. */  
    21.         //执行具体的工作  
    22.         if (type == REDIS_BIO_CLOSE_FILE) {  
    23.             close((long)job->arg1);  
    24.         } else if (type == REDIS_BIO_AOF_FSYNC) {  
    25.             aof_fsync((long)job->arg1);  
    26.         } else {  
    27.             redisPanic("Wrong job type in bioProcessBackgroundJobs().");  
    28.         }  
    29.         zfree(job);  
    30.   
    31.         /* Lock again before reiterating the loop, if there are no longer
    32.          * jobs to process we'll block again in pthread_cond_wait(). */  
    33.         pthread_mutex_lock(&bio_mutex[type]);  
    34.         listDelNode(bio_jobs[type],ln);  
    35.         bio_pending[type]--;  
    36.     }  
    37. }  
    复制代码

    while循环,从队列中取出一个,执行一个操作。当然,如果想马上停止一切后台线程,可以执行下面的方法,调用
    pthread_cancel:

    1. /* Kill the running bio threads in an unclean way. This function should be
    2. * used only when it's critical to stop the threads for some reason.
    3. * Currently Redis does this only on crash (for instance on SIGSEGV) in order
    4. * to perform a fast memory check without other threads messing with memory. */  
    5. /* 杀死后台所有线程 */  
    6. void bioKillThreads(void) {  
    7.     int err, j;  
    8.   
    9.     for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {  
    10.         //调用pthread_cancel方法kill当前的后台线程  
    11.         if (pthread_cancel(bio_threads[j]) == 0) {  
    12.             if ((err = pthread_join(bio_threads[j],NULL)) != 0) {  
    13.                 redisLog(REDIS_WARNING,  
    14.                     "Bio thread for job type #%d can be joined: %s",  
    15.                         j, strerror(err));  
    16.             } else {  
    17.                 redisLog(REDIS_WARNING,  
    18.                     "Bio thread for job type #%d terminated",j);  
    19.             }  
    20.         }  
    21.     }  
    22. }  
    复制代码


    转自:http://blog.csdn.net/androidlushangderen/article/details/40737093
    上一篇:Redis源码分析(二十八)--- object创建和释放redisObject对象
    下一篇:

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    阿里云
    阿里云

    Archiver|手机版|小黑屋|Redis中国用户组 ( 京ICP备15003959号

    GMT+8, 2017-2-20 05:03 , Processed in 0.109584 second(s), 32 queries .

    Powered by Discuz! X3.2

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表