Netty源码-06-MpscQueue

在IO线程NioEventLoop中维护了一个队列实现,用于存放非IO任务,一个IO线程负责N个Channel,为了保证一个线程贯穿始终负责一个Channel的所有任务(任务执行次序有先后区分需要),因此可能IO线程自己将待执行的内容封装成异步任务,也有可能其他线程提交任务(立即执行或者定时任务)。

面对这种场景:

  • 多个线程充当生产者
  • 唯一线程充当消费者

Netty选择了JCTools的实现MpscQueue,即多生产者单消费者模型。

一 源码出处

NioEventLoop的构造方法:

java
1
2
// NioEventLoop.java
newTaskQueue(taskQueueFactory), // 非IO任务队列
java
1
2
3
4
5
6
7
8
// NioEventLoop.java
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
java
1
2
3
4
5
6
// NioEventLoop.java
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
java
1
2
3
4
// PlatformDependent.java
public static <T> Queue<T> newMpscQueue() {
return Mpsc.newMpscQueue();
}
java
1
2
3
4
static <T> Queue<T> newMpscQueue() {
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
: new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
}

二 类图关系

三 实现


Netty源码-06-MpscQueue
https://bannirui.github.io/2023/03/06/Netty源码-06-MpscQueue/
作者
dingrui
发布于
2023年3月6日
许可协议