publicvoidrecycle(){ // 判断分配 Handle 的线程是否是当前线程 Thread thread = Thread.currentThread(); if (thread == stack.thread) { stack.push(this); return; } // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) // 如果不是当前线程,则需要延迟回收,获取当前线程存储的延迟回收WeakHashMap Map<Stack<?>, WeakOrderQueue> delayedRecycled = Recyclers.delayedRecycled.get(); // 当前 handler 所在的 stack 是否已经在延迟回收的任务队列中 // 并且 WeakOrderQueue是一个多线程间可以共享的Queue WeakOrderQueue queue = delayedRecycled.get(stack); if (queue == null) { delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread)); } queue.add(this); } }
int size = this.size; if (size >= maxCapacity) { // Hit the maximum capacity - drop the possibly youngest object. return; } if (size == elements.length) { elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity)); }
// Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex. @SuppressWarnings("serial") privatestaticfinalclassLinkextendsAtomicInteger{ privatefinal DefaultHandle[] elements = new DefaultHandle[LINK_CAPACITY];
privateint readIndex; private Link next; }
// chain of data items private Link head, tail; // pointer to another queue of delayed items for the same stack private WeakOrderQueue next; // 将持有该Queue的对象进行设置为虚引用,就可以通过 null 判断对该线程的资源进行回收 privatefinal WeakReference<Thread> owner; privatefinalint id = idGenerator.getAndIncrement();
WeakOrderQueue(Stack<?> stack, Thread thread) { head = tail = new Link(); owner = new WeakReference<>(thread); synchronized (stackLock(stack)) { next = stack.head; stack.head = this; } }
Link tail = this.tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { this.tail = tail = tail.next = new Link(); writeIndex = tail.get(); } tail.elements[writeIndex] = handle; // handler 的持有者设置为 null,避免了内存泄漏 handle.stack = null; // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; // this also means we guarantee visibility of an element in the queue if we see the index updated tail.lazySet(writeIndex + 1); }
// transfer as many items as we can from this queue to the stack, returning true if any were transferred // 进行数据的转移,将 Queue 中的暂存的对象移到 Stack 中 @SuppressWarnings("rawtypes") booleantransfer(Stack<?> dst){
Link head = this.head; if (head == null) { returnfalse; }
// 读指针的位置已经到达了每个 Node 的存储容量,如果还有下一个节点,进行节点转移 if (head.readIndex == LINK_CAPACITY) { // 没有下一个节点的话 if (head.next == null) { returnfalse; } // 重新设置头结点所在的位置 this.head = head = head.next; }
WeakOrderQueue next = cursor.next; // 如果 WeakOrderQueue 的实际持有线程因GC回收了 if (cursor.owner.get() == null) { // If the thread associated with the queue is gone, unlink it, after // performing a volatile read to confirm there is no data left to collect. // We never unlink the first queue, as we don't want to synchronize on updating the head. if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } } if (prev != null) { prev.next = next; } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success; }