对操作系统而言:
1 多路复用就是os提供了系统调用,能一次性知道多个fd的状态。
2 不同的操作系统类型有不同的实现方式,实现方式即使是轮询也是发生在kernel中,其效率肯定远远大于在内核外的轮询。
select 数组容量限制fd数量的无差别轮询O(N) 发生内存拷贝
poll 没有fd数量限制的链表无差别轮询O(N) 发生内存拷贝
epoll 近乎O(1)时间复杂度
就Java而言,封装统一API,追根溯源是依赖OS的实现方式。
一 Demo java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package debug.io.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.util.Iterator;import java.util.Set;public class SelectorTest { public static void main (String[] args) throws IOException { ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false ); channel.bind(new InetSocketAddress (9001 )); Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); int ret = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()){ SelectionKey sk = it.next(); it.remove(); sk.isValid(); sk.isWritable(); sk.isReadable(); sk.isAcceptable(); } } }
重点API:
Selector.open()
serverSocketChannel.register()
selector.select()
整体执行流程:
根据不同的OS类型,JDK适配操作系统创建不同的复用器实现,即open()方法。
一个复用器管理N个Channel,在恰当时机告知业务层有哪些Channel已经事件到达。
每个Channel跟复用器的关联通过register()方法实现,
让复用器对该Channel某种类型事件(连接、可读、可写)进行监听。
恰当的时机就是select()方法。
二 源码剖析 1 Java IO多路复用器创建 1.1 Selector.open() java 1 2 Selector selector = Selector.open();
跟进到如下方法:
java 1 return SelectorProvider.provider().openSelector();
对于一个实现没有直接给创建出来,而是通过一个所谓的Provider这样的中间者,这样做的方式应该是通过抽象一层,提供统一API,方便代码扩展。
IO多路复用器是OS的实现,不同系统实现不同。
JDK版本存在跨平台性,不同系统版本不同。
在Java语言层面根据不同的JDK版本,可以感知到不同的运行系统,封装对应系统的复用器实现。
那么就可知:
Provider是依赖系统类型的。
Provider提供了统一API创建具体的复用器实现。
1.2 Provider复用器提供器创建 java 1 SelectorProvider.provider()
1.2.1 provider()方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public static SelectorProvider provider () { synchronized (lock) { if (provider != null ) return provider; return AccessController.doPrivileged( new PrivilegedAction <SelectorProvider>() { public SelectorProvider run () { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
根据方法名称以及JavaDoc可知Provider创建优先级:
VM参数指定一个Provider。
SPI机制指定Provider。
适配JDK版本的默认Provider。
1.2.2 sun.nio.ch.DefaultSelectorProvider.create() 通过全局搜索openjdk源码发现有3个实现,不同系统类型,该实现不同。
solaris:
java 1 2 3 4 5 6 7 8 9 10 11 12 public static SelectorProvider create () { String osname = AccessController .doPrivileged(new GetPropertyAction ("os.name" )); if (osname.equals("SunOS" )) return createProvider("sun.nio.ch.DevPollSelectorProvider" ); if (osname.equals("Linux" )) return createProvider("sun.nio.ch.EPollSelectorProvider" ); return new sun .nio.ch.PollSelectorProvider(); }
macosx:
java 1 2 3 public static SelectorProvider create () { return new sun .nio.ch.KQueueSelectorProvider(); }
windows:
java 1 2 3 public static SelectorProvider create () { return new sun .nio.ch.WindowsSelectorProvider(); }
1.3 通过提供器Provider创建复用器Selector java
Linux:
java 1 2 3 4 5 6 7 8 9 10 11 public class EPollSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector () throws IOException { return new EPollSelectorImpl (this ); } public Channel inheritedChannel () throws IOException { return InheritedChannel.getChannel(); } }
macosx:
java 1 2 3 4 5 6 7 public class KQueueSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector () throws IOException { return new KQueueSelectorImpl (this ); } }
windows:
java 1 2 3 4 5 6 public class WindowsSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector () throws IOException { return new WindowsSelectorImpl (this ); } }
1.4 从而可知不同的操作系统采用的复用器实现以及调用到系统调用 Jdk8版本下:
Linux -> EPollSelectorProvider -> EPollSelectorImpl
-> EPollArrayWrapper
Java_sun_nio_ch_EPollArrayWrapper_epollCreate -> epoll_create
Java_sun_nio_ch_EPollArrayWrapper_epollCtl -> epoll_ctl
Java_sun_nio_ch_EPollArrayWrapper_epollWait -> epoll_wait
MacOS -> KQueueSelectorProvider -> KQueueSelectorImpl
-> KQueueArrayWrapper
Java_sun_nio_ch_KQueueArrayWrapper_init -> kqueue
Java_sun_nio_ch_KQueueArrayWrapper_register0 -> EV_SET
Java_sun_nio_ch_KQueueArrayWrapper_kevent0 -> kevent
Windows -> WindowsSelectorProvider -> WindowsSelectorImpl
-> PollArrayWrapper
Java_sun_nio_ch_PollArrayWrapper_poll0 -> PollArrayWrapper::ipoll -> poll
Jdk11版本下:
Linux -> EPollSelectorProvider -> EPollSelectorImpl
Java_sun_nio_ch_EPoll_create -> epoll_create
Java_sun_nio_ch_EPoll_ctl -> epoll_ctl
Java_sun_nio_ch_EPoll_wait -> epoll_wait
MacOS -> KQueueSelectorProvider -> KQueueSelectorImpl
Java_sun_nio_ch_KQueue_create -> kqueue
Java_sun_nio_ch_KQueue_register -> EV_SET
Java_sun_nio_ch_KQueue_poll -> kevent
Windows -> WindowsSelectorProvider -> WindowsSelectorImpl
Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0 -> select
2 Java IO多路复用器流程
类图关系明显使用了模板方法的设计模式,将通用API封装在顶层抽象,将具体实现延迟到具体实现类各自关注细节,假使将来有其他的OS平台系统调用可选,就可以只要简单继承SelectorImpl就行,也做到了对修改关闭,对功能开发这样的基本原则。
2.1Selector提供了select(...)
抽象
以select()为例:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public abstract int select () throws IOException;@Override public final int select () throws IOException { return this .lockAndDoSelect(null , -1 ); }private int lockAndDoSelect (Consumer<SelectionKey> action, long timeout) throws IOException { synchronized (this ) { this .ensureOpen(); if (inSelect) throw new IllegalStateException ("select in progress" ); inSelect = true ; try { synchronized (this .publicSelectedKeys) { return this .doSelect(action, timeout); } } finally { inSelect = false ; } } }protected abstract int doSelect (Consumer<SelectionKey> action, long timeout) throws IOException;
2.2 AbstractSelector提供了register()
抽象
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public final SelectionKey register (Selector sel, int ops) throws ClosedChannelException { return this .register(sel, ops, null ); }public abstract SelectionKey register (Selector sel, int ops, Object att) throws ClosedChannelException; public final SelectionKey register (Selector sel, int ops, Object att) throws ClosedChannelException { if ((ops & ~validOps()) != 0 ) throw new IllegalArgumentException (); if (!this .isOpen()) throw new ClosedChannelException (); synchronized (regLock) { if (isBlocking()) throw new IllegalBlockingModeException (); synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException (); SelectionKey k = this .findKey(sel); if (k != null ) { k.attach(att); k.interestOps(ops); } else { k = ((AbstractSelector)sel).register(this , ops, att); this .addKey(k); } return k; } } }protected abstract SelectionKey register (AbstractSelectableChannel ch, int ops, Object att) ; @Override protected final SelectionKey register (AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException (); SelectionKeyImpl k = new SelectionKeyImpl ((SelChImpl)ch, this ); k.attach(attachment); this .implRegister(k); this .keys.add(k); try { k.interestOps(ops); } catch (ClosedSelectorException e) { assert ch.keyFor(this ) == null ; keys.remove(k); k.cancel(); throw e; } return k; }
在register(…)过程中,并没有涉及系统调用发生,仅仅是将事件缓存在Java容器之中。
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public SelectionKey interestOps (int ops) { ensureValid(); if ((ops & ~channel().validOps()) != 0 ) throw new IllegalArgumentException (); int oldOps = (int ) INTERESTOPS.getAndSet(this , ops); if (ops != oldOps) { this .selector.setEventOps(this ); } return this ; }protected abstract void setEventOps (SelectionKeyImpl ski) ;
又是一个抽象,需要子类实现类去关注的。
2.3 AbstractSelector构造方法 java 1 2 3 4 5 6 7 8 9 10 11 12 private final Set<SelectionKey> keys; private final Set<SelectionKey> selectedKeys; protected SelectorImpl (SelectorProvider sp) { super (sp); this .keys = ConcurrentHashMap.newKeySet(); this .selectedKeys = new HashSet <>(); this .publicKeys = Collections.unmodifiableSet(keys); this .publicSelectedKeys = Util.ungrowableSet(this .selectedKeys); }
2.4 获取状态就绪Channel java 1 2 3 4 5 6 7 8 9 10 11 12 Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();public abstract Set<SelectionKey> selectedKeys () ; @Override public final Set<SelectionKey> selectedKeys () { this .ensureOpen(); return this .publicSelectedKeys; }
而这个publicSelectedKeys
集合就是在AbstractSelector构造方法中定义的:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 this .publicSelectedKeys = Util.ungrowableSet(this .selectedKeys); static <E> Set<E> ungrowableSet (final Set<E> s) { return new Set <E>() { public int size () { return s.size(); } public boolean isEmpty () { return s.isEmpty(); } public boolean contains (Object o) { return s.contains(o); } public Object[] toArray() { return s.toArray(); } public <T> T[] toArray(T[] a) { return s.toArray(a); } public String toString () { return s.toString(); } public Iterator<E> iterator () { return s.iterator(); } public boolean equals (Object o) { return s.equals(o); } public int hashCode () { return s.hashCode(); } public void clear () { s.clear(); } public boolean remove (Object o) { return s.remove(o); } public boolean containsAll (Collection<?> coll) { return s.containsAll(coll); } public boolean removeAll (Collection<?> coll) { return s.removeAll(coll); } public boolean retainAll (Collection<?> coll) { return s.retainAll(coll); } public boolean add (E o) { throw new UnsupportedOperationException (); } public boolean addAll (Collection<? extends E> coll) { throw new UnsupportedOperationException (); } }; }
可以将publicSelectedKeys
看成是selectedKeys
的副本。
因此,只需要关注实现类里面这几个方法即可:
构造方法
setEventOps(…)方法
doSelect(…)方法
3 Java IO多路复用器不同平台实现 3.1 MacOSX平台KQueueSelectorImpl实现 3.1.1 构造方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static final int MAX_KEVENTS = 256 ; KQueueSelectorImpl(SelectorProvider sp) throws IOException { super (sp); this .kqfd = KQueue.create(); this .pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS); try { long fds = IOUtil.makePipe(false ); this .fd0 = (int ) (fds >>> 32 ); this .fd1 = (int ) fds; } catch (IOException ioe) { KQueue.freePollArray(pollArrayAddress); FileDispatcherImpl.closeIntFD(kqfd); throw ioe; } KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD); }
后面代码创建IO标识的fd并且向KQueue复用器注册一个事件,去关注这个fd的读状态,我还没明白此处的意图,先忽略。
3.1.2 setEventOps(…)方法 java 1 2 3 4 5 6 7 @Override public void setEventOps (SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { this .updateKeys.addLast(ski); } }
3.1.3 doSelect(…)方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override protected int doSelect (Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this ); long to = Math.min(timeout, Integer.MAX_VALUE); boolean blocking = (to != 0 ); boolean timedPoll = (to > 0 ); int numEntries; this .processUpdateQueue(); this .processDeregisterQueue(); try { this .begin(blocking); do { long startTime = timedPoll ? System.nanoTime() : 0 ; numEntries = KQueue.poll(kqfd, this .pollArrayAddress, MAX_KEVENTS, to); if (numEntries == IOStatus.INTERRUPTED && timedPoll) { long adjust = System.nanoTime() - startTime; to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); if (to <= 0 ) { numEntries = 0 ; } } } while (numEntries == IOStatus.INTERRUPTED); assert IOStatus.check(numEntries); } finally { end(blocking); } this .processDeregisterQueue(); return this .processEvents(numEntries, action); }
主要功能是:
向OS内核IO多路复用器注册事件。
OS内核复用器告知有多少个Channel发生了事件,并且将这些事件放在一片内存上(KQueueSelectorImpl::pollArrayAddress指针)。
过滤OS复用器结果,筛选哪些是真正关注的。
3.1.3.1 向OS内核复用器注册事件 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 this .processUpdateQueue();private void processUpdateQueue () { assert Thread.holdsLock(this ); synchronized (updateLock) { SelectionKeyImpl ski; while ((ski = this .updateKeys.pollFirst()) != null ) { if (ski.isValid()) { int fd = ski.getFDVal(); SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); assert (previous == null ) || (previous == ski); int newEvents = ski.translateInterestOps(); int registeredEvents = ski.registeredEvents(); if (newEvents != registeredEvents) { if ((registeredEvents & Net.POLLIN) != 0 ) { if ((newEvents & Net.POLLIN) == 0 ) { KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); } } else if ((newEvents & Net.POLLIN) != 0 ) { KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD); } if ((registeredEvents & Net.POLLOUT) != 0 ) { if ((newEvents & Net.POLLOUT) == 0 ) { KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); } } else if ((newEvents & Net.POLLOUT) != 0 ) { KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD); } ski.registeredEvents(newEvents); } } } } }static native int register (int kqfd, int fd, int filter, int flags) ; JNIEXPORT jint JNICALLJava_sun_nio_ch_KQueue_register (JNIEnv *env, jclass clazz, jint kqfd, jint fd, jint filter, jint flags) { struct kevent changes[1 ]; int res; EV_SET(&changes[0 ], fd, filter, flags, 0 , 0 , 0 ); RESTARTABLE(kevent(kqfd, &changes[0 ], 1 , NULL, 0 , NULL), res); return (res == -1 ) ? errno : 0 ; }
本质就是在客户端通过channel.register(selector)
方式,让复用器监听哪些channel时,将这些事件封装到updateKey
这个队列容器中,此时轮询队列中所有事件,注册到OS的复用器上。
3.1.3.2 OS内核复用器返回状态就绪结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 numEntries = KQueue.poll(kqfd, this .pollArrayAddress, MAX_KEVENTS, to);static native int poll (int kqfd, long pollAddress, int nevents, long timeout) throws IOException; JNIEXPORT jint JNICALL Java_sun_nio_ch_KQueue_poll (JNIEnv *env, jclass clazz, jint kqfd, jlong address, jint nevents, jlong timeout) { struct kevent *events = jlong_to_ptr(address); int res; struct timespec ts; struct timespec *tsp; if (timeout >= 0 ) { ts.tv_sec = timeout / 1000 ; ts.tv_nsec = (timeout % 1000 ) * 1000000 ; tsp = &ts; } else { tsp = NULL; } res = kevent(kqfd, NULL, 0 , events, nevents, tsp); if (res < 0 ) { if (errno == EINTR) { return IOS_INTERRUPTED; } else { JNU_ThrowIOExceptionWithLastError(env, "kqueue failed" ); return IOS_THROWN; } } return res; }
3.1.3.3 后置处理OS内核结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 return this .processEvents(numEntries, action); private int processEvents (int numEntries, Consumer<SelectionKey> action) throws IOException { assert Thread.holdsLock(this ); int numKeysUpdated = 0 ; boolean interrupted = false ; this .pollCount++; for (int i = 0 ; i < numEntries; i++) { long kevent = KQueue.getEvent(this .pollArrayAddress, i); int fd = KQueue.getDescriptor(kevent); if (fd == fd0) { interrupted = true ; } else { SelectionKeyImpl ski = this .fdToKey.get(fd); if (ski != null ) { int rOps = 0 ; short filter = KQueue.getFilter(kevent); if (filter == EVFILT_READ) { rOps |= Net.POLLIN; } else if (filter == EVFILT_WRITE) { rOps |= Net.POLLOUT; } int updated = super .processReadyEvents(rOps, ski, action); if (updated > 0 && ski.lastPolled != pollCount) { numKeysUpdated++; ski.lastPolled = pollCount; } } } } if (interrupted) { clearInterrupt(); } return numKeysUpdated; }protected final int processReadyEvents (int rOps, // 当前实际发生的事件 SelectionKeyImpl ski, Consumer<SelectionKey> action) { if (action != null ) { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { action.accept(ski); ensureOpen(); return 1 ; } } else { assert Thread.holdsLock(publicSelectedKeys); if (this .selectedKeys.contains(ski)) { if (ski.translateAndUpdateReadyOps(rOps)) { return 1 ; } } else { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { this .selectedKeys.add(ski); return 1 ; } } } return 0 ; }
所以最终就是将状态就绪的Channel放在了selectedKeys
容器中了,从AbstractSelector的构造方法可知将来返回给用户的结果publicSelectedKeys
也就是此时的selectedKeys
。
有了KQueue的实现分析,下面的实现就比较简单了。
3.2 Linux平台EPollSelectorImpl实现 3.2.1 构造方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024 ); EPollSelectorImpl(SelectorProvider sp) throws IOException { super (sp); this .epfd = EPoll.create(); this .pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS); try { long fds = IOUtil.makePipe(false ); this .fd0 = (int ) (fds >>> 32 ); this .fd1 = (int ) fds; } catch (IOException ioe) { EPoll.freePollArray(pollArrayAddress); FileDispatcherImpl.closeIntFD(epfd); throw ioe; } EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); }
跟KQueue实现几乎一致,区别在于:
依赖实现为epoll,创建的是epoll实例。
KQueue一次系统调用返回的状态就绪结果数量上限为256个,EPoll一次系统调用返回的状态就绪结果数量上限为1024个(正常情况下一个进程fd可以关联的fd远不止1024个)。
3.2.2 setEventOps(…)方法 跟KQueue实现是一样的。
java 1 2 3 4 5 6 7 8 @Override public void setEventOps (SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { updateKeys.addLast(ski); } }
3.2.3 doSelect(…)方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override protected int doSelect (Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this ); int to = (int ) Math.min(timeout, Integer.MAX_VALUE); boolean blocking = (to != 0 ); boolean timedPoll = (to > 0 ); int numEntries; processUpdateQueue(); processDeregisterQueue(); try { begin(blocking); do { long startTime = timedPoll ? System.nanoTime() : 0 ; numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); if (numEntries == IOStatus.INTERRUPTED && timedPoll) { long adjust = System.nanoTime() - startTime; to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); if (to <= 0 ) { numEntries = 0 ; } } } while (numEntries == IOStatus.INTERRUPTED); assert IOStatus.check(numEntries); } finally { end(blocking); } processDeregisterQueue(); return processEvents(numEntries, action); }
3.2.3.1 向OS内核复用器注册事件 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 processUpdateQueue(); private void processUpdateQueue () { assert Thread.holdsLock(this ); synchronized (updateLock) { SelectionKeyImpl ski; while ((ski = updateKeys.pollFirst()) != null ) { if (ski.isValid()) { int fd = ski.getFDVal(); SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); assert (previous == null ) || (previous == ski); int newEvents = ski.translateInterestOps(); int registeredEvents = ski.registeredEvents(); if (newEvents != registeredEvents) { if (newEvents == 0 ) { EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0 ); } else { if (registeredEvents == 0 ) { EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents); } else { EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents); } } ski.registeredEvents(newEvents); } } } } }static native int ctl (int epfd, int opcode, int fd, int events) ; JNIEXPORT jint JNICALL Java_sun_nio_ch_EPoll_ctl (JNIEnv *env, jclass clazz, jint epfd, jint opcode, jint fd, jint events) { struct epoll_event event; int res; event.events = events; event.data.fd = fd; res = epoll_ctl(epfd, (int )opcode, (int )fd, &event); return (res == 0 ) ? 0 : errno; }
跟KQueue实现几乎一致,区别在于:
EPoll注册事件的系统调用名称为epoll_ctl。
EPoll使用的数据结构名称为epoll_event,采用手动赋值方式,没有借助宏。
EPoll注册事件只能借助epoll_ctl一个一个操作,而KQueue可以轮询EV_SET赋值完kevent后,将整个changelist通过kevent一次性注册。
3.2.3.2 OS内核复用器返回状态就绪结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); static native int wait (int epfd, long pollAddress, int numfds, int timeout) throws IOException; JNIEXPORT jint JNICALLJava_sun_nio_ch_EPoll_wait (JNIEnv *env, jclass clazz, jint epfd, jlong address, jint numfds, jint timeout) { struct epoll_event *events = jlong_to_ptr(address); int res = epoll_wait(epfd, events, numfds, timeout); if (res < 0 ) { if (errno == EINTR) { return IOS_INTERRUPTED; } else { JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed" ); return IOS_THROWN; } } return res; }
跟KQueue几乎一致,除了OS系统调用名称不同。
3.2.3.3 后置处理OS内核结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 return processEvents(numEntries, action); private int processEvents (int numEntries, Consumer<SelectionKey> action) throws IOException { assert Thread.holdsLock(this ); boolean interrupted = false ; int numKeysUpdated = 0 ; for (int i=0 ; i<numEntries; i++) { long event = EPoll.getEvent(pollArrayAddress, i); int fd = EPoll.getDescriptor(event); if (fd == fd0) { interrupted = true ; } else { SelectionKeyImpl ski = fdToKey.get(fd); if (ski != null ) { int rOps = EPoll.getEvents(event); numKeysUpdated += processReadyEvents(rOps, ski, action); } } } if (interrupted) { clearInterrupt(); } return numKeysUpdated; }protected final int processReadyEvents (int rOps, // 当前实际发生的事件 SelectionKeyImpl ski, Consumer<SelectionKey> action) { if (action != null ) { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { action.accept(ski); ensureOpen(); return 1 ; } } else { assert Thread.holdsLock(publicSelectedKeys); if (this .selectedKeys.contains(ski)) { if (ski.translateAndUpdateReadyOps(rOps)) { return 1 ; } } else { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { this .selectedKeys.add(ski); return 1 ; } } } return 0 ; }
跟KQueue几乎一致。
3.3 PollSelectorImpl实现 3.3.1 构造方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 PollSelectorImpl(SelectorProvider sp) throws IOException { super (sp); int size = pollArrayCapacity * SIZE_POLLFD; this .pollArray = new AllocatedNativeObject (size, false ); try { long fds = IOUtil.makePipe(false ); this .fd0 = (int ) (fds >>> 32 ); this .fd1 = (int ) fds; } catch (IOException ioe) { pollArray.free(); throw ioe; } synchronized (this ) { setFirst(fd0, Net.POLLIN); } }
跟KQueue和EPoll的差别很大,Poll没有创建数据结构实例:
KQueue创建了kqueue实例
EPoll创建了epoll实例
由此已经可以猜测出:
OS内核系统调用形参不需要实例
没有数据结构存放状态就绪的Channel,也就意味着poll只能知道多少个Channel状态就绪,却不知道它们是哪些
3.3.2 setEventOps(…)方法 java 1 2 3 4 5 6 7 8 @Override public void setEventOps (SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { updateKeys.addLast(ski); } }
跟EPoll一样。
3.3.3 doSelect(…)方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override protected int doSelect (Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this ); int to = (int ) Math.min(timeout, Integer.MAX_VALUE); boolean blocking = (to != 0 ); boolean timedPoll = (to > 0 ); processUpdateQueue(); processDeregisterQueue(); try { begin(blocking); int numPolled; do { long startTime = timedPoll ? System.nanoTime() : 0 ; numPolled = poll(pollArray.address(), pollArraySize, to); if (numPolled == IOStatus.INTERRUPTED && timedPoll) { long adjust = System.nanoTime() - startTime; to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); if (to <= 0 ) { numPolled = 0 ; } } } while (numPolled == IOStatus.INTERRUPTED); assert numPolled <= pollArraySize; } finally { end(blocking); } processDeregisterQueue(); return processEvents(action); }
3.3.3.1 向OS内核复用器注册事件 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 processUpdateQueue(); private void processUpdateQueue () { assert Thread.holdsLock(this ); synchronized (updateLock) { SelectionKeyImpl ski; while ((ski = updateKeys.pollFirst()) != null ) { int newEvents = ski.translateInterestOps(); if (ski.isValid()) { int index = ski.getIndex(); assert index >= 0 && index < pollArraySize; if (index > 0 ) { assert pollKeys.get(index) == ski; if (newEvents == 0 ) { this .remove(ski); } else { this .update(ski, newEvents); } } else if (newEvents != 0 ) { this .add(ski, newEvents); } } } } }private void add (SelectionKeyImpl ski, int ops) { expandIfNeeded(); int index = pollArraySize; assert index > 0 ; putDescriptor(index, ski.getFDVal()); putEventOps(index, ops); putReventOps(index, 0 ); ski.setIndex(index); pollArraySize++; pollKeys.add(ski); assert pollKeys.size() == pollArraySize; }
Poll跟EPoll比较,差异很明显,注册不存在系统调用,仅仅是在用户层按照内核约定的时间数据结构准备好放在一个数组中,即pollArray指针指向的内存。
3.3.3.2 OS内核复用器返回状态就绪结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 numPolled = poll(pollArray.address(), pollArraySize, to); private static native int poll (long pollAddress, int numfds, int timeout) ; JNIEXPORT jint JNICALL Java_sun_nio_ch_PollSelectorImpl_poll (JNIEnv *env, jclass clazz, jlong address, jint numfds, jint timeout) { struct pollfd *a; int res; a = (struct pollfd *) jlong_to_ptr(address); res = poll(a, numfds, timeout); if (res < 0 ) { if (errno == EINTR) { return IOS_INTERRUPTED; } else { JNU_ThrowIOExceptionWithLastError(env, "poll failed" ); return IOS_THROWN; } } return (jint) res; }
Poll跟EPoll区别:
EPoll不仅能知道多少个事件就绪,还知道它们是谁。
Poll只能知道多少个事件就绪,不知道它们是谁,如果想知道还得用户层轮询一遍。
这样的效率差距可想而知。
3.3.3.3 后置处理OS内核结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 return processEvents(action); private int processEvents (Consumer<SelectionKey> action) throws IOException { assert Thread.holdsLock(this ); assert pollArraySize > 0 && pollArraySize == pollKeys.size(); int numKeysUpdated = 0 ; for (int i = 1 ; i < pollArraySize; i++) { int rOps = getReventOps(i); if (rOps != 0 ) { SelectionKeyImpl ski = pollKeys.get(i); assert ski.getFDVal() == getDescriptor(i); if (ski.isValid()) { numKeysUpdated += processReadyEvents(rOps, ski, action); } } } if (getReventOps(0 ) != 0 ) { assert getDescriptor (0 ) == fd0; clearInterrupt(); } return numKeysUpdated; }protected final int processReadyEvents (int rOps, // 当前实际发生的事件 SelectionKeyImpl ski, Consumer<SelectionKey> action) { if (action != null ) { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { action.accept(ski); ensureOpen(); return 1 ; } } else { assert Thread.holdsLock(publicSelectedKeys); if (this .selectedKeys.contains(ski)) { if (ski.translateAndUpdateReadyOps(rOps)) { return 1 ; } } else { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { this .selectedKeys.add(ski); return 1 ; } } } return 0 ; }
Poll跟EPoll区别在于:
Poll只知道就绪状态事件的数量,不知道它们具体哪些,所以需要用户层程序进行轮询。
EPoll在OS内核中就返回了就绪的事件,用户层直接去取就行。
3.4 Windows平台WindowsSelectorImpl实现 3.4.1 构造方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 WindowsSelectorImpl(SelectorProvider sp) throws IOException { super (sp); pollWrapper = new PollArrayWrapper (INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true ); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0 ); }
3.4.2 setEventOps(…)方法 java 1 2 3 4 5 6 7 8 @Override public void setEventOps (SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { updateKeys.addLast(ski); } }
3.4.3 doSelect(…)方法 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override protected int doSelect (Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this ); this .timeout = timeout; processUpdateQueue(); processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0 ; } adjustThreadsCount(); finishLock.reset(); startLock.startThreads(); try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); } if (threads.size() > 0 ) finishLock.waitForHelperThreads(); } finally { end(); } finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(action); resetWakeupSocket(); return updated; }
3.4.3.1 向OS内核复用器注册事件 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 processUpdateQueue(); private void processUpdateQueue () { assert Thread.holdsLock(this ); synchronized (updateLock) { SelectionKeyImpl ski; while ((ski = newKeys.pollFirst()) != null ) { if (ski.isValid()) { growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); pollWrapper.putEntry(totalChannels, ski); totalChannels++; MapEntry previous = fdMap.put(ski); assert previous == null ; } } while ((ski = updateKeys.pollFirst()) != null ) { int events = ski.translateInterestOps(); int fd = ski.getFDVal(); if (ski.isValid() && fdMap.containsKey(fd)) { int index = ski.getIndex(); assert index >= 0 && index < totalChannels; pollWrapper.putEventOps(index, events); } } } }
3.4.3.2 OS内核复用器返回状态就绪结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 subSelector.poll(); private int poll () throws IOException{ return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); }private int poll (int index) throws IOException { return poll0(pollWrapper.pollArrayAddress + (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(MAX_SELECTABLE_FDS, totalChannels - (index + 1 ) * MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); }private native int poll0 (long pollAddress, int numfds, int [] readFds, int [] writeFds, int [] exceptFds, long timeout) ; JNIEXPORT jint JNICALL Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0 (JNIEnv *env, jobject this , jlong pollAddress, jint numfds, jintArray returnReadFds, jintArray returnWriteFds, jintArray returnExceptFds, jlong timeout) { DWORD result = 0 ; pollfd *fds = (pollfd *) pollAddress; int i; FD_SET readfds, writefds, exceptfds; struct timeval timevalue, *tv; static struct timeval zerotime = {0 , 0 }; int read_count = 0 , write_count = 0 , except_count = 0 ; ... for (i = 0 ; i < numfds; i++) { if (fds[i].events & POLLIN) { readfds.fd_array[read_count] = fds[i].fd; read_count++; } if (fds[i].events & (POLLOUT | POLLCONN)) { writefds.fd_array[write_count] = fds[i].fd; write_count++; } exceptfds.fd_array[except_count] = fds[i].fd; except_count++; } readfds.fd_count = read_count; writefds.fd_count = write_count; exceptfds.fd_count = except_count; ... if ((result = select(0 , &readfds, &writefds, &exceptfds, tv)) == SOCKET_ERROR) { ... } }
3.4.3.3 后置处理OS内核结果 java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 int updated = updateSelectedKeys(action); private int updateSelectedKeys (Consumer<SelectionKey> action) { updateCount++; int numKeysUpdated = 0 ; numKeysUpdated += subSelector.processSelectedKeys(updateCount, action); for (SelectThread t: threads) { numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action); } return numKeysUpdated; }private int processSelectedKeys (long updateCount, Consumer<SelectionKey> action) { int numKeysUpdated = 0 ; numKeysUpdated += processFDSet(updateCount, action, readFds, Net.POLLIN, false ); numKeysUpdated += processFDSet(updateCount, action, writeFds, Net.POLLCONN | Net.POLLOUT, false ); numKeysUpdated += processFDSet(updateCount, action, exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true ); return numKeysUpdated; }private int processFDSet (long updateCount, Consumer<SelectionKey> action, int [] fds, int rOps, boolean isExceptFds) { int numKeysUpdated = 0 ; for (int i = 1 ; i <= fds[0 ]; i++) { int desc = fds[i]; if (desc == wakeupSourceFd) { synchronized (interruptLock) { interruptTriggered = true ; } continue ; } MapEntry me = fdMap.get(desc); if (me == null ) continue ; SelectionKeyImpl sk = me.ski; if (isExceptFds && (sk.channel() instanceof SocketChannelImpl) && discardUrgentData(desc)) { continue ; } int updated = processReadyEvents(rOps, sk, action); if (updated > 0 && me.updateCount != updateCount) { me.updateCount = updateCount; numKeysUpdated++; } } return numKeysUpdated; } }protected final int processReadyEvents (int rOps, // 当前实际发生的事件 SelectionKeyImpl ski, Consumer<SelectionKey> action) { if (action != null ) { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { action.accept(ski); ensureOpen(); return 1 ; } } else { assert Thread.holdsLock(publicSelectedKeys); if (this .selectedKeys.contains(ski)) { if (ski.translateAndUpdateReadyOps(rOps)) { return 1 ; } } else { ski.translateAndSetReadyOps(rOps); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) { this .selectedKeys.add(ski); return 1 ; } } } return 0 ; }