前文已经了解过了NioEventLoopGroup 和NioEventLoop 。
在Netty中是用的是Reactor线程模型(IO多路复用器+多个线程),真正处理业务流程的worker线程都是单个线程,一个线程处理多个Channel,一个Channel始终都是由特定的线程进行处理。
在这样的情况下,如果某个Channel的业务流程耗时较久或者阻塞,那么绑定在当前线程上所有的任务都会受到影响,这样的场景如何处理呢?
再看一下EventLoop的继承关系:
从名字就可以看出来EventLoop实现分为两类:
非IO事件循环线程 DefaultEventLoop
IO事件循环线程,在非IO事件循环基础之上增加了对网络IO多路复用器的支持
NioEventLoop 从API上看就是多了register(…)Channel的支持
KQueueEventLoop
EpollEventLoop
…
NioEventLoop的具体实现依赖操作系统
MacOSX -> KQueueEventLoop
Linux -> EPollEventLoop
Windows -> PollEventLoop
一 Demo 将一整条业务流程上比较耗时的部分拆开,使用适当的EventLoop来处理,尽量让每个线程处理的内容都短小,提升处理效率。
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package io.netty.example.basic.eventloop;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class EventLoopGroupTest01 { public static void main (String[] args) throws InterruptedException { EventLoopGroup bizGroup = new DefaultEventLoopGroup (); new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline() .addLast("handler1" , new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } }) .addLast(bizGroup, "handler1" , new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { } }); } }) .bind(8080 ) .sync(); } }
二 DefaultEventLoop java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package io.netty.example.basic.eventloop;import io.netty.channel.DefaultEventLoopGroup;import io.netty.channel.EventLoopGroup;public class EventLoopGroupTest00 { public static void main (String[] args) { EventLoopGroup group = new DefaultEventLoopGroup (); group.next().execute(()-> System.out.println("execute..." )); System.out.println(); } }
1 DefaultEventLoopGroup构造方法 NioEventLoop的实现是比DefaultEventLoop更丰富的,因此跟踪DefaultEventLoop源码就会简单很多。
java 1 2 3 4 public DefaultEventLoopGroup () { this (0 ); }
java 1 2 3 public DefaultEventLoopGroup (int nThreads) { this (nThreads, (ThreadFactory) null ); }
java 1 2 3 public DefaultEventLoopGroup (int nThreads, ThreadFactory threadFactory) { super (nThreads, threadFactory); }
java 1 2 3 4 protected MultithreadEventLoopGroup (int nThreads, ThreadFactory threadFactory, Object... args) { super (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }
java 1 2 3 4 protected MultithreadEventExecutorGroup (int nThreads, ThreadFactory threadFactory, Object... args) { this (nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor (threadFactory), args); }
java 1 2 3 4 5 6 7 protected MultithreadEventExecutorGroup (int nThreads, Executor executor, // null Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) { this (nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 protected MultithreadEventExecutorGroup (int nThreads, // 标识着group中有几个EventLoop Executor executor, // null EventExecutorChooserFactory chooserFactory, // DefaultEventExecutorChooserFactory.INSTANCE Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers] ) { if (executor == null ) executor = new ThreadPerTaskExecutor (this .newDefaultThreadFactory()); this .children = new EventExecutor [nThreads]; for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = this .newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } finally { if (!success) { for (int j = 0 ; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0 ; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break ; } } } } } this .chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener <Object>() { @Override public void operationComplete (Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) terminationFuture.setSuccess(null ); } }; for (EventExecutor e: children) e.terminationFuture().addListener(terminationListener); Set<EventExecutor> childrenSet = new LinkedHashSet <EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
MultithreadEventExecutorGroup是父类,因此整体流程都是一样的,区别在于创建EventLoop的实现上。
java 1 2 3 protected abstract EventExecutor newChild (Executor executor, Object... args) throws Exception;
java 1 2 3 4 5 @Override protected EventExecutor newChild (Executor executor, Object... args) throws Exception { return new DefaultEventExecutor (this , executor, (Integer) args[0 ], (RejectedExecutionHandler) args[1 ]); }
java 1 2 3 4 5 public DefaultEventExecutor (EventExecutorGroup parent, Executor executor, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super (parent, executor, true , maxPendingTasks, rejectedExecutionHandler); }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected SingleThreadEventExecutor (EventExecutorGroup parent, // EventLoop线程归属的管理器 Executor executor, // 线程执行器 boolean addTaskWakesUp, // EventLoop是单线程 不能让一个线程没有任务时候处于空转状态 以事件响应机制来驱动线程执行 所以需要一定机制让那个线程阻塞/唤起 在NioEventLoop中利用IO多路复用器机制实现 在DefaultEventLoop中使用阻塞队列机制实现 addTaskWakesUp为true 表示使用阻塞队列实现 int maxPendingTasks, RejectedExecutionHandler rejectedHandler ) { super (parent); this .addTaskWakesUp = addTaskWakesUp; this .maxPendingTasks = Math.max(16 , maxPendingTasks); this .executor = ThreadExecutorMap.apply(executor, this ); this .taskQueue = this .newTaskQueue(this .maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler" ); }
java 1 2 3 4 protected Queue<Runnable> newTaskQueue (int maxPendingTasks) { return new LinkedBlockingQueue <Runnable>(maxPendingTasks); }
NioEventLoop中队列 实现跟DefaultEventLoop中队列实现不同
2 线程轮询 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override protected void run () { for (;;) { Runnable task = this .takeTask(); if (task != null ) { task.run(); updateLastExecutionTime(); } if (confirmShutdown()) { break ; } } }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 protected Runnable takeTask () { assert inEventLoop () ; if (!(taskQueue instanceof BlockingQueue)) { throw new UnsupportedOperationException (); } BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this .taskQueue; for (;;) { ScheduledFutureTask<?> scheduledTask = super .peekScheduledTask(); if (scheduledTask == null ) { Runnable task = null ; try { task = taskQueue.take(); if (task == WAKEUP_TASK) { task = null ; } } catch (InterruptedException e) { } return task; } else { long delayNanos = scheduledTask.delayNanos(); Runnable task = null ; if (delayNanos > 0 ) { try { task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { return null ; } } if (task == null ) { fetchFromScheduledTaskQueue(); task = taskQueue.poll(); } if (task != null ) { return task; } } } }
三 工作流程图