Netty是如何解决Java epoll中的BUG的

问题的由来

在Java原生使用NIO(epoll)中,会存在一个著名的bug——epoll空轮训导致CPU出现100%的情况出现

Netty是如何解决这个问题的

这里需要深入跟踪一下NioEventLoop类的实现

静态代码块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static {
final String key = "sun.nio.ch.bugLevel";
final String bugLevel = SystemPropertyUtil.get(key);
if (bugLevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}

// 设置epoll空轮训bug判断的触发阈值
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}

SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}

在静态代码块中看到一个很重要的属性io.netty.selectorAutoRebuildThreshold,这个属性是解决JDK中的epoll的bug的重要属性

Netty对于Selector的优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 获取系统对应的提供的selector实现(epoll——linux、poll——MacOS)
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

// 是否禁止了selectorKey的优化
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

// 加载sun.nio.ch.SelectorImpl的class信息
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});

// 如果是异常,直接不进行下一步的selector优化
// 如果是Class且是系统所提供的selector的class,则进行Netty的selector优化
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// 使用Netty的自定义实现的selectedKeySet(本质是一个数组)
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

// 进行相关的优化措施
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 利用反射获取selectedKeys字段信息
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

// 如果是JDK9且存在Unsafe,则使用原本的sun.misc.Unsafe实现Field的替换
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
// 否则使用反射进行操作,替换待优化的Field
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
// 进行相关的selectedKeys替换,替换为Netty的实现
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});

// 如果在优化过程中出现错误,则直接不做优化
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 返回已优化过的selector,由于已将selectedKeySet替换进了JDK的selector实现中,因此所有的操作都
// 会反应在selectedKeySet中
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

Netty其实提供了对selector的优化措施,其实就只是对于selectKeys的优化,从集合转为了用数组对selectKey进行管理

NioEventLoop所执行的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Override
protected void run() {
for (;;) {
try {
try {
// 这里是Netty的任务执行策略选择
// 官方注释:
// Provides the ability to control the behavior of the select loop. For example a blocking select
// operation can be delayed or skipped entirely if there are events to process immediately.
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
// 执行select操作,而此select操作所查询出的事件,要么是在JDK原有的selectKeys中
// 要么是在Netty的自实现SelectKeys中
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
// 如果在期间发生错误,则进行重建Selector
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
// 由于NioEventLoop需要处理IO事件与非IO事件,为了确保二者都能够得到足够的CPU时间运行
// 通过设置ioRatio进行二者对于线程占用时间的确定
// 如果ioRate为100则表示全部为IO任务
if (ioRatio == 100) {
try {
// 处理本次select所轮训出来的事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 运行所有的任务,全部为IO任务,则没有时间限制
runAllTasks();
}
}
// 如果存在非IO任务,则需要计算非IO任务所占用的时间
else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 计算非IO任务的时间
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
Netty的select操作(对selector的select操作的包装)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 每次select操作都重新计数
int selectCnt = 0;
// 本次select操作的开始时间
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
// 如果超过任务的执行时间分片
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
// 是否做了一次select操作,如果没有,则执行一次
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
// 否则直接跳出本次循环
break;
}

// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}

// 在指定的时间内进行select操作,如果select操作正常执行,那么所select出来的事件会在
int selectedKeys = selector.select(timeoutMillis);
// 每次select操作都会selectCnt计数器增加
selectCnt ++;
// 如果本次select选出了事件或者被唤醒或者有任务准备调度,则跳出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
// 如果线程被中断过
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
// See https://github.com/netty/netty/issues/2426
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 如果在指定的时间内,selectCnt超过了SELECTOR_AUTO_REBUILD_THRESHOLD,则表示
// 当前JDK的selector epoll空轮训BUG已触发,开启RebuildSelector任务
selector = selectRebuildSelector(selectCnt);
// 重置selectCnt计数
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
}
}

因此,Netty解决epoll的空轮训的问题是采取了selector重建的解决方案,通过对select操作的计数,如果是空轮训,那么在极端的时间内会执行多次的select操作,如果selectCnt次数达到了指定的需要进行rebuildSelector的阈值,那么就触发重建selector操作

Netty的selector重建过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private void rebuildSelector0() {
// 旧的selector
final Selector oldSelector = selector;
// 新的selector
final SelectorTuple newSelectorTuple;

if (oldSelector == null) {
return;
}

// 重新创建一个selector
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}

// Register all channels to the new Selector.
int nChannels = 0;
// 遍历所有oldSelector中的事件列表
for (SelectionKey key: oldSelector.keys()) {
// 事件的附件信息
Object a = key.attachment();
try {
// 判断事件是否有效以及是否在新的selector中
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
// key注销
key.cancel();
// 重新将key注册到新的selector中
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}

// 重新赋值selector对象
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;

try {
// time to close the old selector as everything else is registered to the new one
// 重建完毕后,关闭原有的selector
oldSelector.close();
} catch (Throwable t) {
...
}
}

通过创建新的selector以及将oldSelector中的事件重新注册到newSelector中,完成epoll空轮训bug的修复。因此,netty通过对JDKselect操作进行一次包装,记录一次select操作中,selectNow所产生的次数进行epoll空轮训bug的判断以及修复