在Channel的初始化过程中会给每个Channel实例构造一个Pipeline,因此研究pipeline的入口就是构造方法。
一 类图
是继承关系上看,pipeline的实现比较简单。
 
二 JavaDoc
 
Netty采用Reactor的线程模型,一个IO线程负责读写,至于怎么操作读写的逻辑则定义在不同的handler中,pipline就是handler的栖身之所,pipeline负责组织和管理着。
将数据流转抽象定义为:
- 入站 从Socket流向Netty的Channel。
- 出站 从Netty的Channel流向Socket。
三 Demo
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 
 | package io.netty.example.basic.pipeline;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.embedded.EmbeddedChannel;
 
 
 
 
 
 
 public class PipelineTest00 {
 
 public static void main(String[] args) {
 EmbeddedChannel ch = new EmbeddedChannel();
 ch.pipeline()
 .addLast(new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 System.out.println("1");
 ctx.fireChannelRead(msg);
 }
 })
 .addLast(new ChannelOutboundHandlerAdapter() {
 @Override
 public void read(ChannelHandlerContext ctx) throws Exception {
 System.out.println("2");
 ctx.read();
 }
 })
 .addLast(new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
 System.out.println("3");
 ctx.fireChannelRegistered();
 }
 })
 .addLast(new ChannelOutboundHandlerAdapter() {
 @Override
 public void read(ChannelHandlerContext ctx) throws Exception {
 System.out.println("4");
 ctx.read();
 }
 })
 .addLast(new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 System.out.println("5");
 ctx.fireChannelRead(msg);
 }
 });
 System.out.println("入站");
 ch.pipeline().fireChannelRead("");
 System.out.println();
 System.out.println("出站");
 ch.pipeline().read();
 }
 }
 
 | 
从demo的输出直观看出:
- 入站事件 pipeline从head发布事件->tail。
- 出站事件 pipeline从tail发布事件->head。
四 构造方法
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | 
 
 
 
 
 
 
 
 
 protected DefaultChannelPipeline(Channel channel) {
 this.channel = ObjectUtil.checkNotNull(channel, "channel");
 succeededFuture = new SucceededChannelFuture(channel, null);
 voidPromise =  new VoidChannelPromise(channel, true);
 
 tail = new TailContext(this);
 head = new HeadContext(this);
 
 head.next = tail;
 tail.prev = head;
 }
 
 | 
内部就是一条双向链表,对于hanlder本身标识它对入站事件还是出站事件感兴趣,然后来了事件就顺着链表以职责链设计模式走一遍,哪个handler感兴趣就自己触发回调。
- 入站事件 从head->tail走一遍。
- 出站事件 从tail->head走一遍。
pipeline并没有将handler直接组织成双链表的节点,而是将handler封装成了HandlerContext。
而二者的区别在于:
- Handler - 执行逻辑处理。
- HandlerContext - 干预事件传播机制。
1 HeadContext
| 12
 3
 4
 5
 6
 
 | HeadContext(DefaultChannelPipeline pipeline) {
 super(pipeline, null, HEAD_NAME, HeadContext.class);
 unsafe = pipeline.channel().unsafe();
 setAddComplete();
 }
 
 | 
2 TailContext
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | TailContext(DefaultChannelPipeline pipeline) {
 
 
 
 super(pipeline, null, TAIL_NAME, TailContext.class);
 
 setAddComplete();
 }
 
 | 
3 AbstractChannelHandlerContext
怎么标识一个handler是属于入站还是出站就是通过这个executionMask来体现的。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | private final int executionMask;
 
 
 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
 
 this.name = ObjectUtil.checkNotNull(name, "name");
 
 this.pipeline = pipeline;
 
 this.executor = executor;
 
 this.executionMask = mask(handlerClass);
 
 ordered = executor == null || executor instanceof OrderedEventExecutor;
 }
 
 | 
通过位运算便可以组合出每个handler的感兴趣事件集合,来标榜每个handler是属于入站类型还是出站类型。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | static final int MASK_EXCEPTION_CAUGHT = 1; static final int MASK_CHANNEL_REGISTERED = 1 << 1;
 static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
 static final int MASK_CHANNEL_ACTIVE = 1 << 3;
 static final int MASK_CHANNEL_INACTIVE = 1 << 4;
 static final int MASK_CHANNEL_READ = 1 << 5;
 static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
 static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
 static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
 static final int MASK_BIND = 1 << 9;
 static final int MASK_CONNECT = 1 << 10;
 static final int MASK_DISCONNECT = 1 << 11;
 static final int MASK_CLOSE = 1 << 12;
 static final int MASK_DEREGISTER = 1 << 13;
 static final int MASK_READ = 1 << 14;
 static final int MASK_WRITE = 1 << 15;
 static final int MASK_FLUSH = 1 << 16;
 
 | 
上面这些枚举也对应着每个API,也可以直接看JavaDoc的说明:
 
五 入站事件
比如ChannelRead属于入站事件。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | 
 
 
 
 
 
 
 @Override
 public final ChannelPipeline fireChannelRead(Object msg) {
 AbstractChannelHandlerContext.invokeChannelRead(head, msg);
 return this;
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
 EventExecutor executor = next.executor();
 
 if (executor.inEventLoop())
 next.invokeChannelRead(m);
 else {
 executor.execute(new Runnable() {
 @Override
 public void run() {
 next.invokeChannelRead(m);
 }
 });
 }
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | private void invokeChannelRead(Object msg) {if (invokeHandler()) {
 try {
 ((ChannelInboundHandler) handler()).channelRead(this, msg);
 } catch (Throwable t) {
 
 this.invokeExceptionCaught(t);
 }
 } else {
 fireChannelRead(msg);
 }
 }
 
 | 
| 12
 3
 4
 5
 
 | @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 ctx.fireChannelRead(msg);
 }
 
 | 
1 HandlerContext处理事件
HandlerContext本身组合了handler,所以可以执行处理逻辑。
2 HandlerContext干预事件传播
| 12
 3
 4
 5
 6
 
 | @Override
 public ChannelHandlerContext fireChannelRead(final Object msg) {
 this.invokeChannelRead(this.findContextInbound(MASK_CHANNEL_READ), msg);
 return this;
 }
 
 | 
HandlerContext处理完逻辑之后还可以决定是否将事件继续传播下去,还是到此为止。
然后就是周而复始:
- invokeChannelRead
- channelRead
- fireChannelRead
每个入站处理器的逻辑都定义在channelRead(…)被回调,在fireChannelRead向后传播。
从入站事件传播就可以推出来出站事件的逻辑:
- pipline发布的出站事件以tail为起始点,开始传播
- 每个处理器处理完回调逻辑,继续顺着tail->head的方向传播事件
六 出站事件
比如Read属于出站事件。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | 
 
 
 
 
 
 
 @Override
 public Channel read() {
 pipeline.read();
 return this;
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | 
 
 
 
 
 
 
 @Override
 public final ChannelPipeline read() {
 tail.read();
 return this;
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | @Override
 public ChannelHandlerContext read() {
 final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
 EventExecutor executor = next.executor();
 
 if (executor.inEventLoop()) {
 next.invokeRead();
 } else {
 Tasks tasks = next.invokeTasks;
 if (tasks == null) {
 next.invokeTasks = tasks = new Tasks(next);
 }
 executor.execute(tasks.invokeReadTask);
 }
 
 return this;
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 
 | private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this;
 EventExecutor currentExecutor = executor();
 do {
 ctx = ctx.prev;
 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
 return ctx;
 }
 
 |