/** * Channel注册复用器是NioEventLoop线程的异步任务 * 这里是为了保证注册复用器的操作先于bind * - 复用器注册完触发ChannelInitializer方法回调 向pipeline中添加必要的handler 保证后续发生读写时 Channel都能依赖上完整的handler链 * - 前一个复用器注册时异步执行 * - 如果已经复用器注册已经完成 pipeline中handler已经初始化好 向NioEventLoop提交任务让它执行bind * - 如果复用器注册还没完成 说明这个任务还在NioEventLoop的任务队列taskQueue中 这个时候再向NioEventLoop提交一个异步任务 这两个任务的顺序通过任务队列保证了相对顺序 */ if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromisepromise= channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); // 此时Channel已经跟NioEventLoop线程绑定了 给boss线程添加个任务 让boss线程执行bind操作 这个地方也是真正启动boss线程的时机 return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. finalPendingRegistrationPromisepromise=newPendingRegistrationPromise(channel); regFuture.addListener(newChannelFutureListener() { // nio线程注册复用器是异步操作 给这个操作加个回调 等nio线程完成注册复用器之后 让它执行doBind0(...) @Override publicvoidoperationComplete(ChannelFuture future)throws Exception { Throwablecause= future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
// If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. /** * // 执行到这 说明后续可以进行NioSocketChannel::connect()方法或者NioServerSocketChannel::bind()方法 * 两种情况 * -1 register动作是在eventLoop中发起 那么到这里的时候 register一定已经完成了 * -2 如果register任务已经提交到eventLoop中 也就是进到了eventLoop中的taskQueue中 由于后续的connect和bind方法也会进入到同一个eventLoop的taskQueue中 所以一定会先执行register成功 再执行connect和bind方法 */ return regFuture; }
// AbstractChannel.java::AbstractUnsafe /** * - NioEventLoop线程执行 Jdk的Channel注册到复用器上 * - 发布事件 * - 发布handlerAdd事件 触发ChannelInitializer方法执行 * - 发布register事件 */ privatevoidregister0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) return; booleanfirstRegistration= neverRegistered; /** * 实际的注册 * jdk底层操作 将channel注册到selector复用器上 */ AbstractChannel.this.doRegister(); neverRegistered = false; AbstractChannel.this.registered = true; // 标识Channel跟NioEventLoop绑定成功
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener.
safeSetSuccess(promise); // 设置当前promise状态为success 当前register()方法是在eventLoop中的线程中执行的 需要通知提交register操作的那个线程 /** * 发布register事件 * 让pipeline中handler关注channelRegistered(...)的handler执行 */ pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { // active指channel已经打开 if (firstRegistration) { // 如果该channel是第一次执行register 那么往pipeline中丢一个fireChannelActive事件 /** * 发布active事件 * 让pipeline中handler关注invokeChannelActive(...)的handler执行 */ pipeline.fireChannelActive(); } elseif (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 this.beginRead(); // 该channel已经register过了 让该channel立马去监听通道中的OP_READ事件 } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
/** * Channel中发生的读写数据操作都会按照InBound和OutBound类型交给pipeline 让关注的handler执行 */ privatestaticvoiddoBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. /** * 提交异步任务让NioEventLoop线程执行bind * 这个地方的bind对应OS的Socket编程中的bind和listen * 此时pipeline的handler(head workerHandler ServerBootstrapAcceptor tail) bind操作属于OutBound类型 因此从tail往前找handler headHandler负责真正执行bind和listen * NioEventLoop执行真正的bind操作 添加监听器处理异步操作结果 NioEventLoop执行bind结束后 它自己知道bind结果 处理ChannelFutureListener.CLOSE_ON_FAILURE的逻辑 */ channel.eventLoop().execute(newRunnable() { @Override publicvoidrun() { if (regFuture.isSuccess()) channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); else promise.setFailure(regFuture.cause()); } }); }
if (!promise.setUncancellable() || !ensureOpen(promise)) return;
// See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. }