JRaft中的轻量级的对象池实现原理浅析

对象池

对象池存在的意义,就是为了避免频繁的创建对象而将对象进行池化操作——创建出来的对象具有可回收的特性,当对象不使用时,直接返回一个对象池容器中,清楚对象的所有属性信息,然后在池中等待复用

而对于对象池,也不是所有场景都使用的,对象池的意义在于减低创建需要高昂成本的对象带来的开销,比如数据库连接、Http连接、字节数组等等

在基于Jraft实现的RheaKV的分布式KV中,由于在Raft的日志应用中存在大量的字节数组操作,因此rheakv基于nettyRecyclers实现了一个对象池

源码浅析

初始化一个对象池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private static final Logger LOG = LoggerFactory.getLogger(Recyclers.class);

private static final AtomicInteger idGenerator = new AtomicInteger(Integer.MIN_VALUE);

private static final int OWN_THREAD_ID = idGenerator.getAndIncrement();
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
private static final int INITIAL_CAPACITY;

static {
// 每个线程的最大对象池容量
int maxCapacityPerThread = SystemPropertyUtil.getInt("jraft.recyclers.maxCapacityPerThread", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD);
if (maxCapacityPerThread < 0) {
maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
}

DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
// 日志打印相关参数信息
if (LOG.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
LOG.debug("-Djraft.recyclers.maxCapacityPerThread: disabled");
} else {
LOG.debug("-Djraft.recyclers.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
}
}

// 设置初始化容量信息
INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
}

// 当最大容量为0时,使用这个Handler
public static final Handle NOOP_HANDLE = new Handle() {};

private final int maxCapacityPerThread;
// 线程变量,保存每个线程的对象池信息,通过 ThreadLocal 的使用,避免了不同线程之间的竞争情况
private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() {

@Override
protected Stack<T> initialValue() {
// 初始化一个对象栈
return new Stack<>(Recyclers.this, Thread.currentThread(), maxCapacityPerThread);
}
};

protected Recyclers() {
this(DEFAULT_MAX_CAPACITY_PER_THREAD);
}

protected Recyclers(int maxCapacityPerThread) {
this.maxCapacityPerThread = Math.max(0, maxCapacityPerThread);
}
申请一个对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final T get() {
// 未设置线程最大容量,则每次申请都是采取 new Object() 的方式
if (maxCapacityPerThread == 0) {
return newObject(NOOP_HANDLE);
}
// 获取当前线程的 Stack
Stack<T> stack = threadLocal.get();
// 获取一个可用的对象
DefaultHandle handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
}

class Stack<T> {
...
DefaultHandle pop() {
int size = this.size;
if (size == 0) {
if (!scavenge()) {
return null;
}
size = this.size;
}
size--;
DefaultHandle ret = elements[size];
// 一个对象被存到了两个Stack<T>中
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
// 清空回收信息,以便判断是否重复回收
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
...
}
对象释放

对象的释放,首先的看DefaultHandler这个类,DefaultHandler是实际对象的持有类,这个handler负责对象的创建与释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static final class DefaultHandle implements Handle {
private int lastRecycledId;
private int recycleId;

private Stack<?> stack;
private Object value;

DefaultHandle(Stack<?> stack) {
this.stack = stack;
}

public void recycle() {
// 判断分配 Handle 的线程是否是当前线程
Thread thread = Thread.currentThread();
if (thread == stack.thread) {
stack.push(this);
return;
}
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
// 如果不是当前线程,则需要延迟回收,获取当前线程存储的延迟回收WeakHashMap
Map<Stack<?>, WeakOrderQueue> delayedRecycled = Recyclers.delayedRecycled.get();
// 当前 handler 所在的 stack 是否已经在延迟回收的任务队列中
// 并且 WeakOrderQueue是一个多线程间可以共享的Queue
WeakOrderQueue queue = delayedRecycled.get(stack);
if (queue == null) {
delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
}
queue.add(this);
}
}

class Stack<T> {
...

void push(DefaultHandle item) {
// 如果两个id有一个为0——回收位,
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}

// 设置对象的回收id为线程id信息,标记自己的被回收的线程信息
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

int size = this.size;
if (size >= maxCapacity) {
// Hit the maximum capacity - drop the possibly youngest object.
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
}

elements[size] = item;
this.size = size + 1;
}
}

对象的回收,如果是本线程申请又在本线程执行释放,那么就很简单,直接往当前线程的Stack<T>栈中压入对象就可以了,但如果在本线程执行回收操作的对象,不是在本线程申请的话,是不可以进入到本线程的对象缓存栈中的,需要加入到一个延迟回收的WeakOrderQueue容器中,同时将自己标记为可被任意线程回收的状态(DefaultHandler中的属性stack置为null

延迟回收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
private static final class WeakOrderQueue {
private static final int LINK_CAPACITY = 16;

// Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
@SuppressWarnings("serial")
private static final class Link extends AtomicInteger {
private final DefaultHandle[] elements = new DefaultHandle[LINK_CAPACITY];

private int readIndex;
private Link next;
}

// chain of data items
private Link head, tail;
// pointer to another queue of delayed items for the same stack
private WeakOrderQueue next;
// 将持有该Queue的对象进行设置为虚引用,就可以通过 null 判断对该线程的资源进行回收
private final WeakReference<Thread> owner;
private final int id = idGenerator.getAndIncrement();

WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
owner = new WeakReference<>(thread);
synchronized (stackLock(stack)) {
next = stack.head;
stack.head = this;
}
}

private Object stackLock(Stack<?> stack) {
return stack;
}

void add(DefaultHandle handle) {
// 设置handler的最近一次回收的id信息,标记此时暂存的handler是被谁回收的
handle.lastRecycledId = id;

Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
this.tail = tail = tail.next = new Link();
writeIndex = tail.get();
}
tail.elements[writeIndex] = handle;
// handler 的持有者设置为 null,避免了内存泄漏
handle.stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
// this also means we guarantee visibility of an element in the queue if we see the index updated
tail.lazySet(writeIndex + 1);
}

boolean hasFinalData() {
return tail.readIndex != tail.get();
}

// transfer as many items as we can from this queue to the stack, returning true if any were transferred
// 进行数据的转移,将 Queue 中的暂存的对象移到 Stack 中
@SuppressWarnings("rawtypes")
boolean transfer(Stack<?> dst) {

Link head = this.head;
if (head == null) {
return false;
}

// 读指针的位置已经到达了每个 Node 的存储容量,如果还有下一个节点,进行节点转移
if (head.readIndex == LINK_CAPACITY) {
// 没有下一个节点的话
if (head.next == null) {
return false;
}
// 重新设置头结点所在的位置
this.head = head = head.next;
}

final int srcStart = head.readIndex;
int srcEnd = head.get();
// 本次可转移的对象数量(写指针减去读指针)
final int srcSize = srcEnd - srcStart;
// 不存在被压入队列中延迟回收的 handler
if (srcSize == 0) {
return false;
}

final int dstSize = dst.size;
final int expectedCapacity = dstSize + srcSize;

// 期望的容量大小与实际 Stack 所能承载的容量大小进行比对,取最小值
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
}

if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
// 进行对象转移
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
// 表明自己还没有被任何一个 Stack 所回收
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
// 避免对象重复回收
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
element.stack = dst;
dstElems[newDstSize++] = element;
// 设置为null,清楚暂存的handler信息,同时帮助 GC
srcElems[i] = null;
}
dst.size = newDstSize;

if (srcEnd == LINK_CAPACITY && head.next != null) {
this.head = head.next;
}

// 设置读指针位置
head.readIndex = srcEnd;
return true;
} else {
// The destination stack is full already.
return false;
}
}
}
再次回到获取对象的操作

既然之前说到了延迟回收,已经会将WeakOrderQueue中的对象转移到Stack<T>栈中,那么这块是怎么做的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
DefaultHandle pop() {
int size = this.size;
if (size == 0) {
// 当 Stack<T> 此时的容量为 0 时,去 WeakOrder 中转移部分对象到 Stack 中
if (!scavenge()) {
return null;
}
size = this.size;
}
size--;
DefaultHandle ret = elements[size];
// 该对象被重复回收了
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}

boolean scavenge() {
// continue an existing scavenge, if any
// 扫描判断是否存在可转移的 Handler
if (scavengeSome()) {
return true;
}

// reset our scavenge cursor
// 如果本次扫描失败,则代表该节点及之前的节点都没有可回收的Handler了,将自己
// 的 WeakOrderQueue 指针进行向后移动一个节点
prev = null;
cursor = head;
return false;
}

boolean scavengeSome() {
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
cursor = head;
if (cursor == null) {
return false;
}
}
boolean success = false;
WeakOrderQueue prev = this.prev;
do {
// 从当前的WeakOrderQueue节点进行 handler 的转移
if (cursor.transfer(this)) {
success = true;
break;
}

WeakOrderQueue next = cursor.next;
// 如果 WeakOrderQueue 的实际持有线程因GC回收了
if (cursor.owner.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.next = next;
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
如何模拟一个对象被多个Stack<T>回收的用例

关于一个DefaultHandler在什么情况下会出现多次回收的情况,目前只想到一个点

DefaultHandlerrecycle方法被在不同的线程执行了两次,第一次执行方法时,正常走流程进入延迟回收队列;当第二次执行方法时,巧的时,第一次方法执行时进入的延迟队列WeakOrderQueue正好被某一个Stack<T>执行了转移操作(将对象从WeakOrderQueue转移到Stack<T>中),这个时候主要看这两个方法

线程A——第一次recycle方法执行被回收到的WeakOrderQueue此时被Stack<T>执行转移操作

1
2
3
4
5
6
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId; ——> first
// 避免对象重复回收
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}

线程B——第二次recycle方法执行,此时正在进入WeakOrderQueue队列

1
2
3
4
5
6
7
8
9
10
11
12
13
// 注意,此时的 handle.recycledId == 0,因为执行pop时recycleId被重置为0了
void add(DefaultHandle handle) {
handle.lastRecycledId = id; ——> second
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
this.tail = tail = tail.next = new Link();
writeIndex = tail.get();
}
tail.elements[writeIndex] = handle;
handle.stack = null;
tail.lazySet(writeIndex + 1);
}

这个时候问题就来了,当线程A执行到first时,而线程B正在执行recycle操作,当first处函数执行完毕时,此时的DefaultHandler的两个id信息如下

1
2
recycledId={thread-A}
lastRecycledId={thread-A}

而这个时候,马上切换到second处,此时的DefaultHandler的两个id信息如下

1
2
recycledId={thread-A}
lastRecycledId={thread-B}

因此,就导致了一个线程被两次重复recycle的现象