// 如果是异常,直接不进行下一步的selector优化 // 如果是Class且是系统所提供的selector的class,则进行Netty的selector优化 if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } returnnew SelectorTuple(unwrappedSelector); }
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; // 使用Netty的自定义实现的selectedKeySet(本质是一个数组) final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 进行相关的优化措施 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run(){ try { // 利用反射获取selectedKeys字段信息 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
// 如果是JDK9且存在Unsafe,则使用原本的sun.misc.Unsafe实现Field的替换 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); returnnull; } // We could not retrieve the offset, lets try reflection as last-resort. } // 否则使用反射进行操作,替换待优化的Field Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } // 进行相关的selectedKeys替换,替换为Netty的实现 selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); returnnull; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } });
// 如果在优化过程中出现错误,则直接不做优化 if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); returnnew SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); // 返回已优化过的selector,由于已将selectedKeySet替换进了JDK的selector实现中,因此所有的操作都 // 会反应在selectedKeySet中 returnnew SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
@Override protectedvoidrun(){ for (;;) { try { try { // 这里是Netty的任务执行策略选择 // 官方注释: // Provides the ability to control the behavior of the select loop. For example a blocking select // operation can be delayed or skipped entirely if there are events to process immediately. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // 执行select操作,而此select操作所查询出的事件,要么是在JDK原有的selectKeys中 // 要么是在Netty的自实现SelectKeys中 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 // 如果在期间发生错误,则进行重建Selector rebuildSelector0(); handleLoopException(e); continue; }
privatevoidselect(boolean oldWakenUp)throws IOException { Selector selector = this.selector; try { // 每次select操作都重新计数 int selectCnt = 0; // 本次select操作的开始时间 long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) { // 如果超过任务的执行时间分片 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { // 是否做了一次select操作,如果没有,则执行一次 if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } // 否则直接跳出本次循环 break; }
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; }
// 在指定的时间内进行select操作,如果select操作正常执行,那么所select出来的事件会在 int selectedKeys = selector.select(timeoutMillis); // 每次select操作都会selectCnt计数器增加 selectCnt ++; // 如果本次select选出了事件或者被唤醒或者有任务准备调度,则跳出循环 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } // 如果线程被中断过 if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // See https://github.com/netty/netty/issues/2426 selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } elseif (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 如果在指定的时间内,selectCnt超过了SELECTOR_AUTO_REBUILD_THRESHOLD,则表示 // 当前JDK的selector epoll空轮训BUG已触发,开启RebuildSelector任务 selector = selectRebuildSelector(selectCnt); // 重置selectCnt计数 selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { } } catch (CancelledKeyException e) { // Harmless exception - log anyway } }
try { // time to close the old selector as everything else is registered to the new one // 重建完毕后,关闭原有的selector oldSelector.close(); } catch (Throwable t) { ... } }