/* Remove events scheduled for deletion. */ /** * 惰性删除的体现 * 当初提供删除定时任务的api并没有执行物理删除 仅仅是把id打成了-1标识 逻辑删除 * 现在这个场景遍历使用定时任务 发现id为-1的判定为删除状态的节点 进行真正的删除判定 */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ /** * 这个引用计数是该定时任务正在被调度执行的次数 也就是定时任务还在运行中 不能删除 * 继续把删除动作延迟 放到以后的某个时机再去删除 */ if (te->refcount) { te = next; continue; } // 经典的从双链表上删除某个节点 删除te节点 if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; // 准备回收te 执行回调 定制化处理析构逻辑 if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } zfree(te); te = next; continue; }
/* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */ /** * 对于正常的定时任务 就是那些id!=-1的 * 在遍历这个链表之前已经给id的上限打了个快照maxId 也就是说在遍历过程中不可能遇到某个节点的id是>maxId的 * 这个地方单纯低防御性编程 */ if (te->id > maxId) { te = te->next; continue; }
/** * 找到可以被调度的定时任务 */ if (te->when <= now) { int retval;
/* Note that we want to call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ /** * 这个条件判断是什么意思 * <ul> * <li>IO任务的maxfd!=-1意味着eventLoop管理者IO任务</li> * <li>调度策略指定<ul> * <li>需要调度定时任务</li> * <li>需要利用多路复用器的超时阻塞机制</li></ul> * </li> * </ul> * 也就是说多路复用器的机制和能力被用于 * <ul> * <li>注册IO事件到系统多路复用器上 多路复用器管理和告知用户就绪事件</li> * <li>利用多路复用器的超时阻塞机制实现精准定时功能</li> * </ul> * 这个if判断就是看看是不是需要使用多路复用器 */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; structtimevaltv, *tvp; int64_t usUntilTimer = -1;
if (usUntilTimer >= 0) { tv.tv_sec = usUntilTimer / 1000000; tv.tv_usec = usUntilTimer % 1000000; tvp = &tv; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 多路复用器poll的timeout是0 if (eventLoop->flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; }
/* Call the multiplexing API, will return only on timeout or when * some event fires. */ /** * 发起多路复用器的poll调用 * 根据tvp超时标识实现阻塞与否以及控制超时时间是多久 * <ul> * <li>tvp是null 标识阻塞式调用 直到有就绪事件</li> * <li>tvp是0 相当于立马返回 非阻塞式调用</li> * <li>tvp非0 阻塞相应时间</li> * </ul> * 这样设计的根因在于兼顾定时任务的处理 提高整个系统的吞吐 */ numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */ // 多路复用器poll调用之后 执行回调的时机 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) { // 就绪的事件 当初注册在eventLoop时的IO任务 int fd = eventLoop->fired[j].fd; aeFileEvent *fe = &eventLoop->events[fd]; /** * IO任务的就绪状态 * <ul> * <li>1 可读</li> * <li>2 可写</li> * </ul> */ int mask = eventLoop->fired[j].mask; // 计数被调度执行IO任务 int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsyncing a file to disk, * before replying to a client. */ /** * 当初注册任务事件的时候指定了监听的事件类型 * 对于系统的多路复用器而言 只有可读可写 * <ul> * <li>关注可读事件</li> * <li>关注可写事件</li> * </ul> * 但是redis在实现的是为了某些场景的高性能 对客户端暴露了指定先写后读的顺序 * 正常的读写顺序是先读后写 * 客户端可以通过AE_BARRIER标识指定先写后读 * 下面即将对就绪的fd进行读写操作 因此要先判断好读写顺序 * * 比较巧妙的设计 * 对于读写顺序而言 要么是先写后读 要么是先读后写 * 以下代码编排的就很优雅 * 不是通过 * if(先读后写) * { * read(); * write(); * } * else * { * write(); * read(); * } * 而是直接将写fd的逻辑固定在中间 再将读fd的逻辑固定在前后 然后通过if条件是走前面的读逻辑还是后面的读逻辑 */ int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ // 先读后写的顺序 if (!invert && fe->mask & mask & AE_READABLE) { // 回调读fd的函数 fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 回调执行 可读 fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ }
/* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { // 回调写fd的函数 fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
/* If we have to invert the call, fire the readable event now * after the writable one. */ // 先写后读的顺序 if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { // 回调读fd的函数 fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
processed++; } } /* Check time events */ /** * 调度策略指定了需要调度定时任务 */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */ }