Netty为了提高系统的吞吐,大量使用异步线程模型。
一 Demo java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class FutureTest00 { public static void main (String[] args) throws InterruptedException, ExecutionException { CountDownLatch latch = new CountDownLatch (3 ); EventLoopGroup group = new DefaultEventLoopGroup (3 ); Future<Long> f = group.submit(() -> { System.out.println("task..." ); Thread.sleep(100_000 ); return 100L ; }); new Thread (() -> { try { Long ans = f.get(); System.out.println("get..." + Thread.currentThread().getName() + " " + ans); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException (e); } finally { latch.countDown(); } }, "get" ).start(); new Thread (() -> { try { Long ans = f.sync().getNow(); System.out.println("sync..." + Thread.currentThread().getName() + " " + ans); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { latch.countDown(); } }, "sync" ).start(); new Thread (() -> { f.addListener(future -> { System.out.println("future..." + Thread.currentThread().getName() + " " + f.get()); latch.countDown(); }); }, "listen" ).start(); latch.await(); group.shutdownGracefully(); } }
异步线程模型一定是依托于多线程实现的。
提交任务的线程负责提交任务,有专门的线程去关注任务过程,对于结果而言就有两种方式获取。
提交任务的线程自己去取,但是不知道什么时候执行线程才执行结束,所以可以阻塞等待执行线程的结果。
任务提交线程不要干等,通过监听器的回调机制,执行线程负责执行过程,自然知道什么时候执行结束,所以主动权交给执行线程,等有结果了让执行线程按照监听器的定义处理结果。
二 类图
三 任务提交流程 我们要关注ftask的实现类型是什么。
java 1 2 3 4 5 6 7 public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
java 1 2 3 4 5 @Override protected final <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new PromiseTask <T>(this , callable); }
四 sync阻塞等待 java
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public V get () throws InterruptedException, ExecutionException { Object result = this .result; if (!isDone0(result)) { this .await(); result = this .result; } if (result == SUCCESS || result == UNCANCELLABLE) { return null ; } Throwable cause = cause0(result); if (cause == null ) { return (V) result; } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException (cause); }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public Promise<V> await () throws InterruptedException { if (this .isDone()) { return this ; } if (Thread.interrupted()) { throw new InterruptedException (toString()); } checkDeadLock(); synchronized (this ) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this ; }
现在获取任务的线程已经阻塞了,只能等待异步线程执行完任务之后,通过notify或者notifyAll唤醒这个阻塞线程了。
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void run () { try { if (setUncancellableInternal()) { V result = runTask(); setSuccessInternal(result); } } catch (Throwable e) { setFailureInternal(e); } }
java 1 2 3 4 5 6 7 8 9 10 11 protected final Promise<V> setSuccessInternal (V result) { super .setSuccess(result); clearTaskAfterCompletion(true , COMPLETED); return this ; }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public Promise<V> setSuccess (V result) { if (this .setSuccess0(result)) { return this ; } throw new IllegalStateException ("complete already: " + this ); }
java 1 2 3 4 5 6 7 8 9 private boolean setSuccess0 (V result) { return this .setValue0(result == null ? SUCCESS : result); }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private boolean setValue0 (Object objResult) { if (RESULT_UPDATER.compareAndSet(this , null , objResult) || RESULT_UPDATER.compareAndSet(this , UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { this .notifyListeners(); } return true ; } return false ; }
唤醒阻塞的线程。
java 1 2 3 4 5 6 7 private synchronized boolean checkNotifyWaiters () { if (waiters > 0 ) { notifyAll(); } return listeners != null ; }
五 监听器回调 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void notifyListeners () { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1 ); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return ; } } safeExecute(executor, new Runnable () { @Override public void run () { notifyListenersNow(); } }); }
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void notifyListenersNow () { Object listeners; synchronized (this ) { if (notifyingListeners || this .listeners == null ) { return ; } notifyingListeners = true ; listeners = this .listeners; this .listeners = null ; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this , (GenericFutureListener<?>) listeners); } synchronized (this ) { if (this .listeners == null ) { notifyingListeners = false ; return ; } listeners = this .listeners; this .listeners = null ; } } }
java 1 2 3 4 5 6 7 private void notifyListeners0 (DefaultFutureListeners listeners) { GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0 ; i < size; i ++) { notifyListener0(this , a[i]); } }
java 1 2 3 4 5 6 7 8 9 private static void notifyListener0 (Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()" , t); } } }