一 定义 摘自源码JavaDoc:
java
EventExecutor是NioEventLoop的抽象。
NioEventLoopGroup是NioEventLoop管理器。
二 类图 从继承关系上可以看出NioEventLoopGroup具有任务管理功能,类似线程池的一种实现。
三 源码 1 事件循环器选择 1.1 Demo java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package io.netty.example.basic.eventloopgroup;import io.netty.channel.EventLoop;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;public class NioEventLoopGroupTest00 { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup (3 ); for (int i = 0 ; i < 4 ; i++) { EventLoop el = group.next(); System.out.println(el); } } }
执行结果为:
shell 1 2 3 4 io.netty.channel.nio.NioEventLoop@2ac1fdc4 io.netty.channel.nio.NioEventLoop@5f150435 io.netty.channel.nio.NioEventLoop@1c53fd30 io.netty.channel.nio.NioEventLoop@2ac1fdc4
从控制台的输出可以看出NioEventLoopGroup的管理功能:
NioEventLoopGroup构造方法形参可以控制定义EventLoop数量。
next()方法从EventLoop集合中选择一个。
next()方法选择策略具有一定负载均衡功能,通过定义计数器进行集合取模。
1.2 构造方法 1.2.1 NioEventLoopGroup java 1 2 3 public NioEventLoopGroup (int nThreads) { this (nThreads, (Executor) null ); }
java 1 2 3 4 5 6 7 public NioEventLoopGroup (int nThreads, Executor executor) { this (nThreads, executor, SelectorProvider.provider()); }
java 1 2 3 public NioEventLoopGroup (int nThreads, Executor executor, final SelectorProvider selectorProvider) { this (nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); }
java 1 2 3 4 5 6 7 public NioEventLoopGroup (int nThreads, Executor executor, // null final SelectorProvider selectorProvider, // 创建Java的NIO复用器 final SelectStrategyFactory selectStrategyFactory ) { super (nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
1.2.2 父类构造方法 MultithreadEventLoopGroup
java 1 2 3 4 5 6 protected MultithreadEventLoopGroup (int nThreads, Executor executor, // null Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) { super (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
继续调用到MultithreadEventExecutorGroup的构造方法:
java 1 2 3 4 5 6 protected MultithreadEventExecutorGroup (int nThreads, Executor executor, // null Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) { this (nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 protected MultithreadEventExecutorGroup (int nThreads, // 标识着group中有几个EventLoop Executor executor, // null EventExecutorChooserFactory chooserFactory, // DefaultEventExecutorChooserFactory.INSTANCE Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) { if (executor == null ) executor = new ThreadPerTaskExecutor (this .newDefaultThreadFactory()); this .children = new EventExecutor [nThreads]; for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = this .newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } finally { if (!success) { for (int j = 0 ; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0 ; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break ; } } } } }
其中比较重要的几个点:
1.2.3 线程执行器Executor java 1 2 3 if (executor == null ) executor = new ThreadPerTaskExecutor (this .newDefaultThreadFactory());
ThreadFactory仅仅是线程创建的顶层抽象,提供一些必要线程属性:
非守护线程(main线程结束任务线程可以继续执行)。
线程优先级。
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor (ThreadFactory threadFactory) { this .threadFactory = threadFactory; } @Override public void execute (Runnable command) { threadFactory.newThread(command).start(); } }
也就是将来在某个时机触发executor的execute()方法,为任务创建线程并执行。此处的线程并非直接使用的Java实现,而是在此基础上做了一些优化 。
1.2.2 事件循环器 java 1 2 3 4 5 6 7 8 9 10 11 12 13 this .children = new EventExecutor [nThreads];for (int i = 0 ; i < nThreads; i ++) { children[i] = this .newChild(executor, args);
1.2.4 线程选择器 java 1 2 3 4 5 6 7 this .chooser = chooserFactory.newChooser(children);
netty中提供了2种实现
PowerOfTwoEventExecutorChooser
GenericEventExecutorChooser
1.3 next方法 原理比较简单,本质就是对数组长度取模,在一定场景下用位运算替代取模运算。
也就是EventLoopGroup这个管理器按照轮询方式从NioEventLoop数组中选择事件循环器执行操作,起到了一定的负载均衡效果。
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private final EventExecutor[] executors; @Override public EventExecutor next () { return this .executors[idx.getAndIncrement() & this .executors.length - 1 ]; }@Override public EventExecutor next () { return this .executors[(int ) Math.abs(idx.getAndIncrement() % this .executors.length)]; }
2 任务提交 2.1 Demo java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package io.netty.example.basic.eventloopgroup;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import java.util.concurrent.TimeUnit;public class NioEventLoopGroupTest01 { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup (); group.execute(() -> { System.out.println("execute task..." ); }); group.submit(() -> { System.out.println("submit task..." ); }); group.schedule(() -> { System.out.println("schedule task..." ); }, 5_000 , TimeUnit.MILLISECONDS); System.out.println("main thread end" ); } }
执行结果:
shell 1 2 3 main thread end submit task... execute task...
上面已经分析过:
EventLoopGroup仅仅是一个管理器,本身不具备执行任务能力。
管理器暴露next()选择其内部真正的执行单元(事件循环器)进行工作。
每个EventLoop都是一个线程,真正处理任务的执行单元,非守护线程。
NioEventLoop是特定处理NIO多路复用这种场景的具体实现,因此还通过组合了Java中Selector多路复用器实现基于事件的响应模式React。
2.2 任务执行单元 下面这几个方法都是EventLoopGroup管理器通过选择器chooser选择真正的执行单元EventLoop事件循环器来处理的:
execute(…)方法
submit(…)方法
schedule(…)方法
scheduleAtFixedRate(…)方法
scheduleWithFixedDelay(…)方法
2.2.1 execute(…) 最终真正执行execute方法的就是EventLoop。
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 group.execute(() -> { System.out.println("execute task..." ); });public void execute (Runnable command) { this .next().execute(command); }@Override public EventLoop next () { return (EventLoop) super .next(); }@Override public EventExecutor next () { return this .chooser.next(); }