// SingleThreadEventExecutor.java privatevoidexecute(Runnable task, boolean immediate) { /** * NioEventLoop只有一个线程 且它的阻塞点只有在IO多路复用器操作上 * 因此当前添加任务的线程 * - NioEventLoop线程自己给自己添加任务 说明它压根没有被阻塞 而且肯定已经处于运行中状态 * - 这个线程已经被创建执行 那么这个新添加的任务被放到了非IO任务队列中 迟早会被取出来执行 * - 不是NioEventLoop线程 是其他线程往NioEventLoop添加任务 * - 如果NioEventLoop线程还没被创建执行 那么相当于任务裹挟着线程进行延迟创建并执行任务 * - 非IO任务队列没有任务 也没有IO事件到达时 NioEventLoop线程迟早会阻塞在复用器上 * - 阻塞期间有IO事件到达 退出select阻塞继续工作 * - 有定时任务还可能超时退出select NioEventLoop线程继续工作 * - 没有定时任务就永远阻塞 唤醒的方式 只有外部线程往NioEventLoop添加新任务触发selector复用器的wakeup() */ booleaninEventLoop=super.inEventLoop(); this.addTask(task); // 添加任务到taskQueue中 如果任务队列已经满了 就触发拒绝策略(抛异常) if (!inEventLoop) { this.startThread(); // NioEventLoop线程创建启动的时机就是提交进来的第一个异步任务 if (this.isShutdown()) { booleanreject=false; try { if (removeTask(task)) reject = true; } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) reject(); } }
// Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { }
try { // Run all remaining tasks and shutdown hooks. At this point the event loop // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for // graceful shutdown with quietPeriod. for (;;) if (confirmShutdown()) break;
// Now we want to make sure no more tasks can be added from this point. This is // achieved by switching the state. Any new tasks beyond this point will be rejected. for (;;) { intoldState= state; if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) break; }
// We have the final set of tasks in the queue now, no more can be added, run all remaining. // No need to loop here, this is the final pass. confirmShutdown(); } finally { try { cleanup(); } finally { // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify // the future. The user may block on the future and once it unblocks the JVM may terminate // and start unloading classes. // See https://github.com/netty/netty/issues/6596. FastThreadLocal.removeAll();
case SelectStrategy.BUSY_WAIT: // -3 // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // -1 任务队列为空 将线程阻塞在复用器上 唤醒时机有两种情况(阻塞期间有IO事件到达 阻塞指定事件后主动结束阻塞开始执行定时任务) longcurDeadlineNanos=super.nextScheduledTaskDeadlineNanos(); // 定时任务队列中下一个待执行定时任务还有多久可以被唤醒执行 -1表示没有定时任务可以执行 if (curDeadlineNanos == -1L) curDeadlineNanos = NONE; // nothing on the calendar // 边界情况 没有定时任务要执行 this.nextWakeupNanos.set(curDeadlineNanos); // 下一次啥时候将线程唤醒 try { if (!super.hasTasks()) strategy = this.select(curDeadlineNanos); // select()方法阻塞 超时时间是为了执行可能存在的定时任务 如果没有定时任务就将一直阻塞在复用器的select()操作上等待被唤醒 } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; }
// SingleThreadEventExecutor.java protectedvoidwakeup(boolean inEventLoop) { // NioEventLoop覆写了这个方法 有自己的特定实现 if (!inEventLoop) { // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there // is already something in the queue. this.taskQueue.offer(WAKEUP_TASK); } }
if (this.needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
// NioEventLoop.java privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafeunsafe= ch.unsafe(); // Socket上发生的读写最终不是交给Java的Channel处理 而是交给Netty的Channel去处理(Netty的Channel->pipeline->handler) /** * 如果Jdk底层的Channel是不合法的 说明这个channel可能有问题 */ if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; } /** * 执行到这 说明当前的Jdk的Channel是合法的 */ try { intreadyOps= k.readyOps(); // Jdk的Channel发生的事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // Jdk的Channel发生了连接事件 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 intops= k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Jdk的Channel发生了写事件 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } /** * 读事件和连接事件 * 如果当前NioEventLoop是worker线程 这里就是op_read事件 * 如果当前NioEventLoop是boss线程 这里就是op_accept事件 * * 无论处理op_read事件还是op_accept事件 都走的unsafe的read()方法 这里unsafe是通过channel获取到的 * 如果处理的是accept事件 这里的channel是NioServerSocketChannel 与之绑定的是{@link io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#unsafe} * 如果处理的是op_read事件 处理的线程是worker线程 这里的channel是{@link io.netty.channel.socket.nio.NioServerSocketChannel} 与之绑定的unsafe对象是{@link io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe} 会进入{@link AbstractNioByteChannel.NioByteUnsafe#read()}方法 */ // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) unsafe.read(); } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
do { fetchedAll = this.fetchFromScheduledTaskQueue(); // 尝试从定时任务队列中找到所有可执行的定时任务放到非IO任务队列taskQueue中 if (this.runAllTasksFrom(this.taskQueue)) ranAtLeastOne = true; } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
// SingleThreadEventExecutor.java /** * 从定时任务队列中找到可执行的定时任务 * 从定时任务中找到可以执行的任务将任务添加到普通任务队列taskQueue中 */ privatebooleanfetchFromScheduledTaskQueue() { if (this.scheduledTaskQueue == null || this.scheduledTaskQueue.isEmpty()) returntrue; // 从定时任务队列中寻找截止时间为nanoTime的任务 longnanoTime= AbstractScheduledEventExecutor.nanoTime(); for (;;) { RunnablescheduledTask=super.pollScheduledTask(nanoTime); // 可执行的定时任务 if (scheduledTask == null) returntrue; // 添加到普通任务队列过程失败就重新添加回定时任务队列中 if (!this.taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. this.scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); // taskQueue常规任务队列已经满了 再把定时任务放回远处 等待下一轮执行时机 returnfalse; } } }