// If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { // 当使用ChannelInitializer辅助类添加handler时候 Channel还没注册到复用器上 这时候进行标识等待回调 newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); // newCtx就是对当前handler的封装 returnthis; }
// DefaultChannelPipeline.java finalvoidinvokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { // 保证了这个方法只会在Channel注册到IO多路复用器上之后才会执行 firstRegistration = false; // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, // that were added before the registration was done. this.callHandlerAddedForAllHandlers(); } }
privatevoidcallHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered;
// This Channel itself was registered. this.registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. this.pendingHandlerCallbackHead = null; }
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside // the EventLoop. PendingHandlerCallbacktask= pendingHandlerCallbackHead; while (task != null) { // 轮询单链表 task.execute(); task = task.next; } }
if (removed) { fireExceptionCaught(newChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(newChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } }
JAVA
1 2 3 4 5 6
// AbstractChannelHandlerContext.java finalvoidcallHandlerAdded()throws Exception { // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates // any pipeline events ctx.handler() will miss them because the state will not allow it. if (setAddComplete()) handler().handlerAdded(this); // NioServerSocketChannel的初始化init时 向pipeline中添加的ChannelInitializer实例 }
JAVA
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// ChannelInitializer.java @Override publicvoidhandlerAdded(ChannelHandlerContext ctx)throws Exception { if (ctx.channel().isRegistered()) { // 执行时机是在Channel注册到复用器之后(发布handlerAdded事件) // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now. removeState(ctx); } } }
JAVA
调用链终于走到了ChannelInitializer中了。
二 ChannelInitializer源码
1 handlerAdded
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// ChannelInitializer.java @Override publicvoidhandlerAdded(ChannelHandlerContext ctx)throws Exception { if (ctx.channel().isRegistered()) { // 执行时机是在Channel注册到复用器之后(发布handlerAdded事件) // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now. removeState(ctx); } } }
JAVA
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
privatebooleaninitChannel(ChannelHandlerContext ctx)throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { this.initChannel((C) ctx.channel()); // initChannel(...)就是交给实例自己去关注实现的点 } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipelinepipeline= ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); // 将ChannelInitializer从pipeline中移除 } } returntrue; } returnfalse; }