在Channel的初始化过程中会给每个Channel实例构造一个Pipeline,因此研究pipeline的入口就是构造方法。
一 类图
是继承关系上看,pipeline的实现比较简单。
二 JavaDoc
Netty采用Reactor的线程模型,一个IO线程负责读写,至于怎么操作读写的逻辑则定义在不同的handler中,pipline就是handler的栖身之所,pipeline负责组织和管理着。
将数据流转抽象定义为:
- 入站 从Socket流向Netty的Channel。
- 出站 从Netty的Channel流向Socket。
三 Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package io.netty.example.basic.pipeline;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.embedded.EmbeddedChannel;
public class PipelineTest00 {
public static void main(String[] args) { EmbeddedChannel ch = new EmbeddedChannel(); ch.pipeline() .addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1"); ctx.fireChannelRead(msg); } }) .addLast(new ChannelOutboundHandlerAdapter() { @Override public void read(ChannelHandlerContext ctx) throws Exception { System.out.println("2"); ctx.read(); } }) .addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("3"); ctx.fireChannelRegistered(); } }) .addLast(new ChannelOutboundHandlerAdapter() { @Override public void read(ChannelHandlerContext ctx) throws Exception { System.out.println("4"); ctx.read(); } }) .addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("5"); ctx.fireChannelRead(msg); } }); System.out.println("入站"); ch.pipeline().fireChannelRead(""); System.out.println(); System.out.println("出站"); ch.pipeline().read(); } }
|
从demo的输出直观看出:
- 入站事件 pipeline从head发布事件->tail。
- 出站事件 pipeline从tail发布事件->head。
四 构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this); head = new HeadContext(this);
head.next = tail; tail.prev = head; }
|
内部就是一条双向链表,对于hanlder本身标识它对入站事件还是出站事件感兴趣,然后来了事件就顺着链表以职责链设计模式走一遍,哪个handler感兴趣就自己触发回调。
- 入站事件 从head->tail走一遍。
- 出站事件 从tail->head走一遍。
pipeline并没有将handler直接组织成双链表的节点,而是将handler封装成了HandlerContext。
而二者的区别在于:
- Handler - 执行逻辑处理。
- HandlerContext - 干预事件传播机制。
1 HeadContext
1 2 3 4 5 6
| HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); }
|
2 TailContext
1 2 3 4 5 6 7 8 9
| TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class); setAddComplete(); }
|
3 AbstractChannelHandlerContext
怎么标识一个handler是属于入站还是出站就是通过这个executionMask来体现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private final int executionMask;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.executionMask = mask(handlerClass); ordered = executor == null || executor instanceof OrderedEventExecutor; }
|
通过位运算便可以组合出每个handler的感兴趣事件集合,来标榜每个handler是属于入站类型还是出站类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| static final int MASK_EXCEPTION_CAUGHT = 1; static final int MASK_CHANNEL_REGISTERED = 1 << 1; static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; static final int MASK_CHANNEL_ACTIVE = 1 << 3; static final int MASK_CHANNEL_INACTIVE = 1 << 4; static final int MASK_CHANNEL_READ = 1 << 5; static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; static final int MASK_BIND = 1 << 9; static final int MASK_CONNECT = 1 << 10; static final int MASK_DISCONNECT = 1 << 11; static final int MASK_CLOSE = 1 << 12; static final int MASK_DEREGISTER = 1 << 13; static final int MASK_READ = 1 << 14; static final int MASK_WRITE = 1 << 15; static final int MASK_FLUSH = 1 << 16;
|
上面这些枚举也对应着每个API,也可以直接看JavaDoc的说明:
五 入站事件
比如ChannelRead属于入站事件。
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) next.invokeChannelRead(m); else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { this.invokeExceptionCaught(t); } } else { fireChannelRead(msg); } }
|
1 2 3 4 5
| @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }
|
1 HandlerContext处理事件
HandlerContext本身组合了handler,所以可以执行处理逻辑。
2 HandlerContext干预事件传播
1 2 3 4 5 6
| @Override public ChannelHandlerContext fireChannelRead(final Object msg) { this.invokeChannelRead(this.findContextInbound(MASK_CHANNEL_READ), msg); return this; }
|
HandlerContext处理完逻辑之后还可以决定是否将事件继续传播下去,还是到此为止。
然后就是周而复始:
- invokeChannelRead
- channelRead
- fireChannelRead
每个入站处理器的逻辑都定义在channelRead(…)被回调,在fireChannelRead向后传播。
从入站事件传播就可以推出来出站事件的逻辑:
- pipline发布的出站事件以tail为起始点,开始传播
- 每个处理器处理完回调逻辑,继续顺着tail->head的方向传播事件
六 出站事件
比如Read属于出站事件。
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Override public Channel read() { pipeline.read(); return this; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Override public final ChannelPipeline read() { tail.read(); return this; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public ChannelHandlerContext read() { final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Tasks tasks = next.invokeTasks; if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } executor.execute(tasks.invokeReadTask); }
return this; }
|
1 2 3 4 5 6 7 8
| private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.prev; } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND)); return ctx; }
|