publicHashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts){
if (threadFactory == null) { thrownew NullPointerException("threadFactory"); } if (unit == null) { thrownew NullPointerException("unit"); } if (tickDuration <= 0) { thrownew IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { thrownew IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); }
privatestatic HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { thrownew IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { thrownew IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); }
// 对数组长度进行 2 的指数幂,找到最接近此数字的 2 的倍数 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i++) { // 初始化每一个 HashedWheelBucket 对象,该对象其实本质就是一个 linked-list wheel[i] = new HashedWheelBucket(); } return wheel; }
@Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit){ if (task == null) { thrownew NullPointerException("task"); } if (unit == null) { thrownew NullPointerException("unit"); }
// 这里根据 pendingTimeoutsCount 的数值和 maxPendingTimeouts 相比,判断是否需要拒绝添加当前任务 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); thrownew RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); }
// 触发事件轮算法的正式运行 start();
// Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; }
publicvoidstart(){ switch (workerStateUpdater.get(this)) { case WORKER_STATE_INIT: // 如果还没有被初始化过,这将 workerThread 跑起来 if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: thrownew IllegalStateException("cannot be started once stopped"); default: thrownew Error("Invalid WorkerState"); }
// 这里之所以要加上训话,是为了避免 linux 的 spurious wakeups // https://en.m.wikipedia.org/wiki/Spurious_wakeup while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
@Override publicvoidrun(){ // 这里设置时间参考点 startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; }