if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } returnthis.doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); // 触发了NioSocketChannel绑定的NioEventLoop线程启动 } else { // Registration future is almost always fulfilled already, but just in case it's not. finalPendingRegistrationPromisepromise=newPendingRegistrationPromise(channel); regFuture.addListener(newChannelFutureListener() { @Override publicvoidoperationComplete(ChannelFuture future)throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwablecause= future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. doConnect(remoteAddress, localAddress, promise); // 通过向NioEventLoop线程提交任务方式触发线程启动 并让该线程真正发起系统调用connect return promise; }
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) { finalThrowableresolveFailureCause= resolveFuture.cause();
if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); promise.setFailure(resolveFailureCause); } else { // Succeeded to resolve immediately; cached? (or did a blocking lookup) doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; }
// Wait until the name resolution is finished. resolveFuture.addListener(newFutureListener<SocketAddress>() { @Override publicvoidoperationComplete(Future<SocketAddress> future)throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Bootstrap.java privatestaticvoiddoConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. finalChannelchannel= connectPromise.channel(); // 确保IO线程的执行权利 提交异步任务 channel.eventLoop().execute(newRunnable() { @Override publicvoidrun() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
@Override public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
if (isNotValidPromise(promise, false)) { // cancelled return promise; }
// AbstractNioUnsafe privatevoidfulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; }
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. // We still need to ensure we call fireChannelActive() in this case. booleanactive= isActive();
// trySuccess() will return false if a user cancelled the connection attempt. booleanpromiseSet= promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); // 向NioSocketChannel的pipeline发布一个active事件 head->tail 现在pipeline中有3个handler head-bizHandler-tail 最终实现在head中 }
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } }