1 NioEventLoopGroup
1 2 3 4 5 6 7 8 9 10
|
public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider()); }
|
1 2 3
| public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public NioEventLoopGroup(int nThreads, Executor executor, // null final SelectorProvider selectorProvider, // 创建Java的NIO复用器的实现 final SelectStrategyFactory selectStrategyFactory // select策略 在Netty中NioEventLoop这个工作线程需要关注的事件包括了IO任务和普通任务 将来线程会阻塞在Selector多路复用器上 执行一次select调用怎么筛选IO任务普通任务 ) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
protected MultithreadEventLoopGroup(int nThreads, Executor executor, // null Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, // null Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
protected MultithreadEventExecutorGroup(int nThreads, // 标识着group中有几个EventLoop Executor executor, // null EventExecutorChooserFactory chooserFactory, // DefaultEventExecutorChooserFactory.INSTANCE Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) {
if (executor == null) executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
this.children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try {
children[i] = this.newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } }
this.chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) terminationFuture.setSuccess(null); } };
for (EventExecutor e: children) e.terminationFuture().addListener(terminationListener);
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
|
一般而言,bossGroup和workerGroup的区别在于nThreads的指定
2 公共成员\方法\组件
SelectorProvider是java提供的类,屏蔽了OS的平台差异,对于我们用户而言,可以将其当成黑盒直接使用。
SelectorProvider::provider提供了一个多路复用器的具体实现。
2.1.1 提供器
1 2 3 4 5 6 7 8 9 10 11
|
public static SelectorProvider provider() { return Holder.INSTANCE; }
|
2.1.2 创建Selector
1 2
| public abstract AbstractSelector openSelector() throws IOException;
|
2.2 SelectStrategyFactory
DefaultSelectStrategyFactory.INSTANCE
select策略,在Netty中NioEventLoop这个工作线程需要关注的事件包括了IO任务和普通任务,将来线程会阻塞在Selector多路复用器上,执行一次select调用怎么筛选IO任务普通任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Override public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }
|
2.3 RejectedExecutionHandlers
拒绝策略,taskQueue队列中任务满了直接上抛异常。
1 2 3
| public static RejectedExecutionHandler reject() { return REJECT; }
|
1 2 3 4 5 6
| private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() { @Override public void rejected(Runnable task, SingleThreadEventExecutor executor) { throw new RejectedExecutionException(); } };
|
2.4 EventExecutorChooserFactory
线程选择器,从NioEventLoopGroup的children数组中选择一个NioEventLoop实例,达到负载均衡效果。
1 2 3 4 5 6 7 8 9
|
private static boolean isPowerOfTwo(int val) { return (val & -val) == val; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
|
2.4.1 PowerOfTwoEventExecutorChooser
1 2 3 4 5 6 7 8
|
@Override public EventExecutor next() { return this.executors[idx.getAndIncrement() & this.executors.length - 1]; }
|
2.4.2 GenericEventExecutorChooser
1 2 3 4 5 6 7
|
@Override public EventExecutor next() { return this.executors[(int) Math.abs(idx.getAndIncrement() % this.executors.length)]; }
|
2.5 ThreadPerTaskExecutor
用于执行NioEventLoop中taskQueue里面的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@Override public void execute(Runnable command) {
threadFactory.newThread(command).start(); }
|
2.6 newChild方法
主要就是用来实例化NioEventLoop。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; EventLoopTaskQueueFactory taskQueueFactory = null; EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; if (argsLength > 4) tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; return new NioEventLoop(this, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory ); }
|
3 NioEventLoop
1 2 3 4 5 6 7 8 9 10 11 12
| private static Queue<Runnable> newTaskQueue( EventLoopTaskQueueFactory queueFactory) { if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); } return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); }
|
3.2 构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory ) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler );
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = this.openSelector();
this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler ) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
|
1 2 3
| protected AbstractEventExecutor(EventExecutorGroup parent) { this.parent = parent; }
|
4 组件示意图