距离上次看nio相关知识想来也快一年了,感觉对于nio的理解总是停留在IO复用的io模型,知其然但不知其所以然,故而今天来解开Java NIO的神秘面纱。
首先来回顾下NIO基本概念,Java NIO主要由Buffer、Channel、Selector三大组件组成。其他组件比如Pipe、FileLock只不过是这三个组件的公共工具类。
Buffer是与NIO Channel交互的载体,提供了一系列便于操作内存块的方法。读数据是从Channel读取到Buffer中,写数据是从Buffer写入到Channel。
使用Buffer进行读写数据通常需要4步:
将数据写入到Buffer
- 调用buffer.flip()
- 从Buffer中读取数据
- 调用buffer.clear()或者buffer.compact()
![](/IO/Java-NIO/buffers-modes.png)
TCP/IP 三次握手
![](/IO/Java-NIO/tcp-shake.png)
Java NIO API基本原理Selector.open
-> Pipe.open 创建一个Selector实例 初始化Selector(windows为WindowsSelectorImpl Linux为PollSelectorImpl) Selector同时会初始化并持有PollArrayWapper
类
(功能近似于一个数组,保存注册的Socket句柄、感兴趣的事件掩码以及调用系统调用poll后的返回的就绪事件掩码)ServerSocketChannel.open
初始化channel,构造ServerSocketChannelImpl
对象ServerSocketChannel.register
注册需要监听的通道 将channel(socket)以及感兴趣的事件注册到pollArray
中
Selector.select
获取已经就绪的通道
以Linux为例:
Selector.open
1 | public static Selector open() throws IOException { |
从上可知Selecotr
是由SelectorProvider.openSelector
提供。则要初始化Selector必须先拿到SelectorProvier。
获取SelectorProvider
1 | public static SelectorProvider provider() { |
- 通过获取系统变量
java.nio.channels.spi.SelectorProvider
的方式获取SelectorProvider - 通过SPI扩展加载获取SelectorProvider
- 通过
sun.nio.ch.DefaultSelectorProvider.create()
获取
不一样的DefaultSelectorProvider
通过阅读JDK源码 我们可以找到linux
、macosx
、windows
三个版本的实现
linux版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
"unchecked") (
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}
}
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
}macosx版本:
1
2
3
4
5
6
7
8
9
10
11
12
13public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.KQueueSelectorProvider();
}
}windows版本:
1
2
3
4
5
6
7
8
9
10
11
12
13public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
}以Linux为例 我们来接着一探究竟,通过源码可知Linux使用的Provider为:
EPollSelectorProvider
EPollSelectorProvider
1
2
3
4
5
6
7
8
9
10public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}显然Linux下
Selector selector = Selector.open()
最终初始化了一个EPollSelectorImpl
对象
Selector的真身EPollSelectorImpl
EPollSelectorImpl初始化过程,会建立一个管道pipe 并初始化一个EPollArrayWrapper数组保存pollfd, 并初始化一个维持文件描述符与SelectorKeyImpl
映射关系的map
1 | EPollSelectorImpl(SelectorProvider sp) throws IOException { |
关键的EPollArrayWrapper
EPollArrayWrapper实际通过JNI操作本地epoll_event
的数据结构,其定义如下
1 | typedef union epoll_data { |
1 |
|
epollCreate()
epollCreate通过JNI调用系统方法epoll_create
并返回句柄 完成epoll的初始化工作1
2
3
4
5
6
7
8
9
10
11
12
13JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
在上面EpollSelectorImpl初始化过程中,我们看到pollWrapper.initInterrupt(fd0, fd1);
的调用1
2
3
4
5void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
1 | JNIEXPORT void JNICALL |
而epollCtl实际通过JNI调用了系统方法epoll_ctl(epfd, (int)opcode, (int)fd, &event)
来将需要监听的句柄加入到epoll的数据结构中epoll_ctl
函数用于管理文件描述符的事件集 使用此函数可以注册、修改、删除一个或多个事件
Selector.select()
1 | public int select(long timeout) |
Selector.select()最终调用了pollArrayWrapper.poll()方法 将已经就绪的fd添加到pollWrapper里的数组中1
2
3
4
5
6
7
8
9
10
11
12int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
PollArrayWrapper.poll()方法通过JNI调用系统方法epoll_wait
来实现 获取准备就绪的句柄1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}epoll_wait
函数负责检测事件
Selector.select()最终通过EPollArrayWrapper.poll()方法获取准备就绪的fd并添加到pollWrapper数组中,然后调用updateSelectedKeys()
方法更新Selector持有的Map<Integer,SelectionKeyImpl> fdToKey
中的
SelectionKeyImpl中事件状态
1 | private int updateSelectedKeys() { |
从pollWrapper中拿到准备就绪的fd,根据Map<Integer,SelectionKeyImpl> fdToKey
的映射关系查到对应的SelectionKeyImpl并更新其持有Channel中事件的状态SelectionKeyImpl.channel.translateAndSetReadyOps(rOps, ski)
,
准备就绪的fd对应的SelectionKeyImpl会被放入SelectorImpl
中的Set<SelectionKey> selectedKeys
中,这样用户级别的API selector.selectedKes()
就可以拿到准备就绪的fd 进行业务处理
ServerSocketChannel.register()
ServerSocketChannel.open()会初始化ServerSocketChannelImpl
对象1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
//如果channel和selector已经注册过 则直接添加感兴趣的事件和附件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
//如果没有注册过 先通过selector.register注册
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
接下来我们来看下selector.register的注册过程
1 | protected final SelectionKey register(AbstractSelectableChannel ch, |
如果未注册过调用selector.register() 构建SelectionKey 注册感兴趣事件和attachment附件 并将新建的SelectionKey添加到pollWrapper的数组中
Selector.wakeup
1 | public Selector wakeup() { |
1 | //EPollArrayWrapper |
EPollArrayWrapper.c1
2
3
4
5
6
7
8
9JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
int fakebuf[1];
fakebuf[0] = 1;
if (write(fd, fakebuf, 1) < 0) {
JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
}
}
linux的poll实现
linux中有系统调用poll方法,定义如下:
1 | int poll (struct pollfd *fds, unsigned int nfds, int timeout); |
上述pollfd结构体定义如下:
1 | typedef struct pollfd { |
int fd:一个文件描述句柄,代表一个Channel连接
short events:该文件描述符感兴趣的事件,如POLLIN表示该文件描述符有读事件,POLLOUT表示该文件描述符可写。
short revents:代表该文件描述符当前已有的事件,如有读事件则值为POLLIN,有读写事件则为POLLIN和POLLOUT的并集
整体的意思就是:你指定了结构体列表的起始地址和要监控的结构体个数,linux系统就会为你在timeout时间内监控上述结构体列表中的文件描述符的相关事件,并把发生的事件写入到上述的short revents属性中。
所以我们在执行一次poll之后,要想获取所有的发生了事件的文件描述符,则需要遍历整个pollfd列表,依次判断上述的short revents是否不等于0,不等于0代表发生了事件。
jdk的poll实现概述
jdk要做的事情就是准备参数数据,然后去调用上述poll方法,这就要用到JNI来实现。jdk使用PollSelectorImpl来实现上述poll调用。
3.1 pollfd参数
jdk需要将java层面接收到的一个Channel连接映射到一个pollfd结构体,PollSelectorImpl针对此创建了一个AllocatedNativeObject 对象,该对象不是在堆中,它内部使用Unsafe类直接操作内存地址。它就是专门用来存放上述一个个pollfd结构体的内容,通过固定的offset来获取每个结构体的数据内容。
所以在调用上述poll方法的时候,直接传递的是AllocatedNativeObject对象的内存地址
注册Channel要做的事:其实就是将Channel的相关数据填充到上述AllocatedNativeObject的内存地址上,下次调用poll的时候,自然就会被监控
取消Channel注册要做的事:其实就是从上述AllocatedNativeObject的内存地址上移除该Channel代表的pollfd结构体PollSelectorImpl代码分析
PollSelectorImpl的创建过程有如下2个内容
1 创建了pipe,得到读写文件描述符,并注册到了PollArrayWrapper中
2 创建了PollArrayWrapper
PollArrayWrapper pollWrapper:内部创建了一个上述介绍的AllocatedNativeObject对象(用于存放注册的Channel),而pollWrapper则更像是一个工具类,来方便的用户操作AllocatedNativeObject对象,pollWrapper把普通的操作都转化成对内存的操作
我们知道PollSelectorImpl在select过程的阻塞时间受控于所注册的Channel的事件,一旦有事件才会进行返回,没有事件的话就一直阻塞,为了可以允许手动控制这种局面的话,就额外增加了一个监控,即对pipe的读监控。对pipe的读文件描述符即fd0注册到PollArrayWrapper中的第一个位置,如果我们对pipe的写文件描述符fd1进行写数据操作,则pipe的读文件描述符必然会收到读事件,即可以使PollSelectorImpl不再阻塞,立即返回。
来看下初始化注册f0的代码
1 | void initInterrupt(int fd0, int fd1) { |
即将fd0存放到PollArrayWrapper的AllocatedNativeObject中,并关注POLLIN即读事件。并将pipe的写文件描述符保存到interruptFD属性中
Selector对外提供了wakeup方法,来看下PollSelectorImpl的实现
1 | public void interrupt() { |
这里就是对上述pipe的写文件描述符执行interrupt操作,来看看底层实现代码是:
1 | JNIEXPORT void JNICALL |
这里就是简单的对pipe的写文件描述符写入数据用来触发pipe的读文件描述符的读事件而已。
至此,PollSelectorImpl的初始化过程就完成了。
- 注册和取消注册Channel过程
注册Channel其实就是向PollSelectorImpl中的PollArrayWrapper存放该Channel的fd、关注的事件信息,来看下实现代码
1 | protected void implRegister(SelectionKeyImpl ski) { |
将channel存储到PollArrayWrapper中的AllocatedNativeObject中
1 | /** |
存储的信息是:Channel的fd,关注的事件(初始是0)而channel的关注事件是后来才设置到PollArrayWrapper的AllocatedNativeObject中的
1 | void putEventOps(int i, int event) { |
不同的Selector实现,上述实现过程也是不一样的。
再来看看取消注册Channel
1 | protected void implDereg(SelectionKeyImpl ski) throws IOException { |
其实就是将最后一个直接覆盖到要删除的那个,以及更新相关数据的变化。
- doSelect实现过程
1 | protected int doSelect(long timeout) |
第一步:就是处理那些取消了的Channel,即遍历Selector的Set
第二步:就是使用pollWrapper执行poll过程,该过程即是准备好参数,然后调用linux的系统调用poll方法,如下1
2
3
4
5int poll(int numfds, int offset, long timeout) {
return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
numfds, timeout);
}
private native int poll0(long pollAddress, int numfds, long timeout);
这里将AllocatedNativeObject的内存地址作为pollAddress,已注册的所有的Channel的数量作为numfds,timeout是用户传递的参数,然后就开始JNI调用
再看下native方法实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21JNIEXPORT jint JNICALL
Java_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout)
{
struct pollfd *a;
int err = 0;
a = (struct pollfd *) jlong_to_ptr(address);
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE (poll(a, numfds, timeout), err);
} else { /* Bounded wait; bounded restarts */
err = ipoll(a, numfds, timeout);
}
if (err < 0) {
JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
}
return (jint)err;
}
先将内存地址作为address转换成pollfd结构体地址,然后调用ipoll,在ipoll中我们就会见到linux的系统调用poll1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29static int
ipoll(struct pollfd fds[], unsigned int nfds, int timeout)
{
jlong start, now;
int remaining = timeout;
struct timeval t;
int diff;
gettimeofday(&t, NULL);
start = t.tv_sec * 1000 + t.tv_usec / 1000;
for (;;) {
int res = poll(fds, nfds, remaining);
if (res < 0 && errno == EINTR) {
if (remaining >= 0) {
gettimeofday(&t, NULL);
now = t.tv_sec * 1000 + t.tv_usec / 1000;
diff = now - start;
remaining -= diff;
if (diff < 0 || remaining <= 0) {
return 0;
}
start = now;
}
} else {
return res;
}
}
}
至此linux系统开始为上述所有的Channel进行监控事件。
在发生了事件之后,会有2次遍历所有注册的Channel集合:
一次就是在linux底层poll调用的时候会遍历,将产生的事件值存放到pollfd结构体的revents地址中
另一次就是在java层面,获取产生的事件时,会遍历上述每一个结构体,拿到revents地址中的数据
第三步:一旦第二步返回就说明有事件或者超时了,一旦有事件,则linux的poll调用会把产生的事件遍历的赋值到poll调用指定的地址上,即我们指定的一个个pollfd结构体,映射到java对象就是PollArrayWrapper的AllocatedNativeObject,这时候我们获取事件就是遍历底层的每一个地址,拿到pollfd结构体中的revents,如果revents不为0代表发生了事件,还要与Channel关注的事件进行相&操作,不为0代表发生了Channel关注的事件了,并清空pollfd结构体中的revents数据供下次使用,代码如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28/**
* Copy the information in the pollfd structs into the opss
* of the corresponding Channels. Add the ready keys to the
* ready queue.
*/
protected int updateSelectedKeys() {
int numKeysUpdated = 0;
// Skip zeroth entry; it is for interrupts only
for (int i=channelOffset; i<totalChannels; i++) {
int rOps = pollWrapper.getReventOps(i);
if (rOps != 0) {
SelectionKeyImpl sk = channelArray[i];
pollWrapper.putReventOps(i, 0);//清理已有事件
if (selectedKeys.contains(sk)) {
if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
numKeysUpdated++;
}
} else {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
这里就是通过指针操作直接获取对应底层结构体的revents数据。
第四步:上面提到了Selector也会注册一个fd用于监听,并且注册的位置时第一个即0,这里会取出该fd的发生事件,然后读取内容忽略掉即可,不然后仍然会触发该事件。代码如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17static native boolean drain(int fd) throws IOException;
JNIEXPORT jboolean JNICALL
Java_sun_nio_ch_IOUtil_drain(JNIEnv *env, jclass cl, jint fd)
{
char buf[128];
int tn = 0;
for (;;) {
int n = read(fd, buf, sizeof(buf));
tn += n;
if ((n < 0) && (errno != EAGAIN))
JNU_ThrowIOExceptionWithLastError(env, "Drain");
if (n == (int)sizeof(buf))
continue;
return (tn > 0) ? JNI_TRUE : JNI_FALSE;
}
}