1 向NioEventLoop提交任务
API |
实现类 |
execute |
SingleThreadEventExecutor |
submit |
AbstractEventExecutor |
schedule |
AbstractScheduledEventExecutor |
scheduleAtFixedRate |
AbstractScheduledEventExecutor |
scheduleWithFixedDelay |
AbstractScheduledEventExecutor |
register |
SingleThreadEventLoop |
提交的普通任务都将缓存在taskQueue这种队列中。
1.1 execute
将接收到的任务存放在taskQueue这个队列中。
java1 2 3 4 5 6 7
| @Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); this.execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); }
|
java1 2 3 4 5
| private void execute(Runnable task, boolean immediate) { this.addTask(task);
}
|
java1 2 3 4 5 6
| protected void addTask(Runnable task) { ObjectUtil.checkNotNull(task, "task"); if (!this.offerTask(task)) { reject(task); } }
|
java1 2 3 4 5 6
| final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return this.taskQueue.offer(task); }
|
1.2 submit
区别于execute的唯一地方是AbstractEventExecutor先将submit提交过来的任务进行封装,封装成RunnableFuture类型,最终还是调用的execute流程。
java1 2 3 4
| @Override public Future<?> submit(Runnable task) { return (Future<?>) super.submit(task); }
|
1.3 schedule相关
1.3.1 任务分类
1.3.1.1 一次性定时任务
java1 2 3 4 5 6 7 8
| @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return this.schedule(new ScheduledFutureTask<Void>( this, command, deadlineNanos(unit.toNanos(delay)))); }
|
1.3.1.2 周期性定时任务
java1 2 3 4 5 6
| @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return this.schedule(new ScheduledFutureTask<Void>( this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); }
|
java1 2 3 4 5 6
| @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return this.schedule(new ScheduledFutureTask<Void>( this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); }
|
1.3.2 通用
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduleFromEventLoop(task); } else { final long deadlineNanos = task.deadlineNanos(); if (beforeScheduledTaskSubmitted(deadlineNanos)) {
this.execute(task); } else {
lazyExecute(task); } }
return task; }
|
1.4 IO任务register
java1 2 3 4
| @Override public ChannelFuture register(Channel channel) { return this.register(new DefaultChannelPromise(channel, this)); }
|
2 任务的执行
同任务的提交一样,任务都被缓存在NioEventLoop组件的taskQueue队列中,那执行的线程就负责取队列。
在NioEventLoop的继承体系中没有出现Thread身影,但是执行任务的本体肯定是Thread实例,所有一定有NioEventLoop的父类持有了Thread。
将来调用了Thread::start()->等待CPU回调->Thread::run()作为entry point执行->NioEvnetLoop的run()
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
private volatile Thread thread;
protected abstract void run();
|
在NioEventLoop实现中会关注父类抽象的run()方法
java1 2 3 4 5 6
| @Override protected void run() { ranTasks = super.runAllTasks(0); }
|