Etcd3 租约机制解析

租约机制

相比Etcd2的TTL机制,在Etcd3中实现了租约机制,也就是一个租约绑定了多个Key,这样就不再需要单独去管理每一个Key的过期事件了,化零为整的操作,降低了需要处理过期Key的服务端的开销

具体实现

既然是实现了租约机制,那么就一定会有个管理者,需要对这些租约进行管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
type Lessor interface {
// SetRangeDeleter lets the lessor create TxnDeletes to the store.
// Lessor deletes the items in the revoked or expired lease by creating
// new TxnDeletes.
SetRangeDeleter(rd RangeDeleter)

SetCheckpointer(cp Checkpointer)

// Grant grants a lease that expires at least after TTL seconds.
Grant(id LeaseID, ttl int64) (*Lease, error)

// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
Revoke(id LeaseID) error

// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set the expiry of leases to less than the full TTL when possible.
Checkpoint(id LeaseID, remainingTTL int64) error

// Attach attaches given leaseItem to the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Attach(id LeaseID, items []LeaseItem) error

// GetLease returns LeaseID for given item.
// If no lease found, NoLease value will be returned.
GetLease(item LeaseItem) LeaseID

// Detach detaches given leaseItem from the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Detach(id LeaseID, items []LeaseItem) error

// Promote promotes the lessor to be the primary lessor. Primary lessor manages
// the expiration and renew of leases.
// Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
Promote(extend time.Duration)

// Demote demotes the lessor from being the primary lessor.
Demote()

// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
// an error will be returned.
Renew(id LeaseID) (int64, error)

// Lookup gives the lease at a given lease id, if any
Lookup(id LeaseID) *Lease

// Leases lists all leases.
Leases() []*Lease

// ExpiredLeasesC returns a chan that is used to receive expired leases.
ExpiredLeasesC() <-chan []*Lease

// Recover recovers the lessor state from the given backend and RangeDeleter.
Recover(b backend.Backend, rd RangeDeleter)

// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
// times is undefined.
Stop()
}

上面是Lessor的相关操作接口,现在来一个一个看下

Grant(id LeaseID, ttl int64) (*Lease, error)

Grant是一个授权的操作,对一个租约授予一个租约时间,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
if id == NoLease {
return nil, ErrLeaseNotFound
}

if ttl > MaxLeaseTTL {
return nil, ErrLeaseTTLTooLarge
}

// 这里进行创建一个租约
l := &Lease{
ID: id,
ttl: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}

le.mu.Lock()
defer le.mu.Unlock()

// 判断租约是否已经存在,如果存在,则抛出租约存在的异常
if _, ok := le.leaseMap[id]; ok {
return nil, ErrLeaseExists
}

// 对租约进行修正,如果小于Etcd所设置的最小值,则进行替换
if l.ttl < le.minLeaseTTL {
l.ttl = le.minLeaseTTL
}

// 当前Lessor是否是在Leader节点
if le.isPrimary() {
// 如果是Leader节点,则设置过期时间
l.refresh(0)
} else {
// 否则设置对应的该租约时间为永久
l.forever()
}

le.leaseMap[id] = l
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}

// 将该租约注册到过期通知中心去
le.leaseExpiredNotifier.RegisterOrUpdate(item)
// 对当前的 Lease 对象进行持久化
l.persistTo(le.b)

// 指标数据变动
leaseTotalTTLs.Observe(float64(l.ttl))
leaseGranted.Inc()

// 如果是 Leader 节点,需要将当前的Lease压入监听堆中,用于后面的Checkpoint机制。
if le.isPrimary() {
//
le.scheduleCheckpointIfNeeded(l)
}

return l, nil
}

func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
if le.cp == nil {
return
}

// 当前租约剩余的时间是否大于每次 checkpoint 的调度时间
if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) {
if le.lg != nil {
le.lg.Debug("Scheduling lease checkpoint",
zap.Int64("leaseID", int64(lease.ID)),
zap.Duration("intervalSeconds", le.checkpointInterval),
)
}
// 将其加入一个有点队列,用于 checkpoint 机制移除过期的租约
heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
id: lease.ID,
time: time.Now().Add(le.checkpointInterval).UnixNano(),
})
}
}

在注册一个租约的时候,会对其进行一数据校验以及相关初始化,由于租约是带有生存时间的,因此需要利用一个checkpoint机制对所有的租约进行检查,移除过期的Lease

Revoke(id LeaseID) error

既然有租约的申请,那么必然有租约的撤销,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (le *lessor) Revoke(id LeaseID) error {
le.mu.Lock()

// 将其从 map 对象中移除
l := le.leaseMap[id]
if l == nil {
le.mu.Unlock()
return ErrLeaseNotFound
}
defer close(l.revokec)
// unlock before doing external work
le.mu.Unlock()

if le.rd == nil {
return nil
}

// 由于租约是绑定了多个key,因此对于key的删除就是一个批量操作
txn := le.rd()

// 这里需要对 key 进行排序操作,已确保每个节点对于key的删除顺序都是一致的
keys := l.Keys()
sort.StringSlice(keys).Sort()
for _, key := range keys {
// 删除 key
txn.DeleteRange([]byte(key), nil)
}

le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))

txn.End()

leaseRevoked.Inc()
return nil
}

租约的创建、删除,其代码都比较简单。这两个操作都需要确保租约只会被创建一次,并且最终都会被持久化保存。

刚刚只是说了租约的创建与删除,但是租约是可以被延长的,那么延长租约的操作呢?

Renew(id LeaseID) (int64, error)

这里是租约延长的操作,其本质就是对租约进行Renew操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (le *lessor) Renew(id LeaseID) (int64, error) {
le.mu.RLock()

// 租约的 Renew 操作只能发生在 Leader 节点
if !le.isPrimary() {
// forward renew request to primary instead of returning error.
le.mu.RUnlock()
return -1, ErrNotPrimary
}

demotec := le.demotec

// 判断对应的租约是否存在
l := le.leaseMap[id]
if l == nil {
le.mu.RUnlock()
return -1, ErrLeaseNotFound
}
// 判断是否需要重置租约的剩余时间
clearRemainingTTL := le.cp != nil && l.remainingTTL > 0

le.mu.RUnlock()
// 如果租约过期,必须走正常的租约过期流程进行租约过期操作
if l.expired() {
select {
// A expired lease might be pending for revoking or going through
// quorum to be revoked. To be accurate, renew request must wait for the
// deletion to complete.
case <-l.revokec:
return -1, ErrLeaseNotFound
// The expired lease might fail to be revoked if the primary changes.
// The caller will retry on ErrNotPrimary.
case <-demotec:
return -1, ErrNotPrimary
case <-le.stopC:
return -1, ErrNotPrimary
}
}

// 如果需要对当前租约的剩余时间进行一次重置,这需要走一个Raft流程,将租约信息进行同步
if clearRemainingTTL {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
}

le.mu.Lock()

// 租约续约成功之后,重新走一次创建租约的后续流程——加入监听堆中。
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.mu.Unlock()

leaseRenewed.Inc()
return l.ttl, nil
}

至此,租约的相关操作大致就这样些。Etcd是将具体的Key绑定在租约中,那么这个绑定是怎么做的?

Attach(id LeaseID, items []LeaseItem) error

Key绑定租约,是通过将Key作为一个附件附加到Lease当中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
le.mu.Lock()
defer le.mu.Unlock()

l := le.leaseMap[id]
if l == nil {
return ErrLeaseNotFound
}

l.mu.Lock()
// 这里将附带的key放入 Lease 的附带map对象中
for _, it := range items {
l.itemSet[it] = struct{}{}
le.itemMap[it] = id
}
l.mu.Unlock()
return nil
}

Key绑定到对应的租约,其代码很简单,就是将Key作为了Attachment附带在对应的Lease上,这个就像Netty或者JDKChannel,可以附带一些信息在对应的Channel上,将信息的生命周期与其生命周期进行绑定在一起。

怎么实现过期的

刚刚上面把租约的创建、销毁、续约以及如何将Key绑定到对应的租约上分析了下,但是没有看到说如何对租约过期的扫描操作,这里就要回到Lessor的创建上了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
checkpointInterval = defaultLeaseCheckpointInterval
}
if expiredLeaseRetryInterval == 0 {
expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
}
// 构建一个租约管理者
l := &lessor{
// 租约容器
leaseMap: make(map[LeaseID]*Lease),
// Key 到 租约的映射
itemMap: make(map[LeaseItem]LeaseID),
// 租约过期通知
leaseExpiredNotifier: newLeaseExpiredNotifier(),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
}
l.initAndRecover()

// 关键方法,开启一个协程去做了一个任务,来看看这个任务是做了什么
go l.runLoop()

return l
}

func (le *lessor) runLoop() {
defer close(le.doneC)

// 这里用一个死循环+time.After 实现一个定时调度
// 去定时执行 revokeExpiredLeases 以及 checkpointScheduledLeases
for {
le.revokeExpiredLeases()
le.checkpointScheduledLeases()

select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
}

revokeExpiredLeases

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// 这里是进行撤销已经过期的租约
func (le *lessor) revokeExpiredLeases() {
var ls []*Lease

// rate limit
revokeLimit := leaseRevokeRate / 2

le.mu.RLock()
// 如果是 Leader 节点
if le.isPrimary() {
// 这里找出过期的租约列表
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()

if len(ls) != 0 {
select {
// 如果该租约管理者已经停止了,结束,生命也不做
case <-le.stopC:
return
// 将过期的租约发送到 channel 中
case le.expiredC <- ls:
default:
// the receiver of expiredC is probably busy handling
// other stuff
// let's try this next time after 500ms
}
}
}

func (le *lessor) findExpiredLeases(limit int) []*Lease {
leases := make([]*Lease, 0, 16)

for {
// 租约管理者调用自身的 expireExists 方法去判断当前是否有租约过期,
// 如果有租约过期,在判断他的下一个是否也是过期的了,好判断这个循环是否需要继续
l, ok, next := le.expireExists()
if !ok && !next {
break
}
if !ok {
continue
}
if next {
continue
}

// 判断是否是真的过期了。
if l.expired() {
// 最加到列表中
leases = append(leases, l)

// reach expired limit
if len(leases) == limit {
break
}
}
}

return leases
}

func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
// 如果当前需要进行过期通知的元素没有,则结束
if le.leaseExpiredNotifier.Len() == 0 {
return nil, false, false
}

// 这里弹出堆顶的租约 ID
item := le.leaseExpiredNotifier.Poll()
// 找到租约对象。
l = le.leaseMap[item.id]
if l == nil {
// 如果不存在需要将其注销
le.leaseExpiredNotifier.Unregister() // O(log N)
return nil, false, true
}
now := time.Now()
// 判断当前时间是否小于租约的到期目标时间,如果小于则表示当前没有任何一个租约过期了
if now.UnixNano() < item.time /* expiration time */ {
// Candidate expirations are caught up, reinsert this item
// and no need to revoke (nothing is expiry)
return l, false, false
}

//TODO 为什么需要再次将这个租约回压进行二次判断是否真的过期了?
item.time = now.Add(le.expiredLeaseRetryInterval).UnixNano()
le.leaseExpiredNotifier.RegisterOrUpdate(item)
return l, true, false
}

而具体处理这个Channel的代码在etcd/etcdserver/server.go:972

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
s.goAttach(func() {
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, lease := range leases {
select {
case c <- struct{}{}:
case <-s.stopping:
return
}
lid := lease.ID
s.goAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
// 对租约进行销毁
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
leaseExpired.Inc()
} else {
lg.Warn(
"failed to revoke lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(lerr),
)
}
<-c
})
}
})

checkpointScheduledLeases

Checkpoint机制主要是为了避免由于Leader的选举以及重启可能会导致租约的剩余时间继续流逝。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

// When a lease's deadline should be persisted to preserve the remaining TTL across leader elections and restarts, the lessor will checkpoint the lease by the Checkpointer.

func (le *lessor) checkpointScheduledLeases() {
var cps []*pb.LeaseCheckpoint

// rate limit
for i := 0; i < leaseCheckpointRate/2; i++ {
le.mu.Lock()
// 如果是在Leader节点
if le.isPrimary() {
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
}
le.mu.Unlock()
// 保存租约的快照
if len(cps) != 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
}
if len(cps) < maxLeaseCheckpointBatchSize {
return
}
}
}

func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCheckpoint {
if le.cp == nil {
return nil
}

// 记录当前时间信息
now := time.Now()
cps := []*pb.LeaseCheckpoint{}
// 当前还有租约以及还没到达限制数
for le.leaseCheckpointHeap.Len() > 0 && len(cps) < checkpointLimit {
lt := le.leaseCheckpointHeap[0]
if lt.time /* next checkpoint time */ > now.UnixNano() {
return cps
}
// 弹出堆顶元素,这是需要进行快照机制保存的
heap.Pop(&le.leaseCheckpointHeap)
var l *Lease
var ok bool
// 如果租约不在,继续
if l, ok = le.leaseMap[lt.id]; !ok {
continue
}
// 如果当前时间已经超过了该租约的过期时间,继续
if !now.Before(l.expiry) {
continue
}
// 计算该租约的剩余时间
remainingTTL := int64(math.Ceil(l.expiry.Sub(now).Seconds()))
if remainingTTL >= l.ttl {
continue
}
if le.lg != nil {
le.lg.Debug("Checkpointing lease",
zap.Int64("leaseID", int64(lt.id)),
zap.Int64("remainingTTL", remainingTTL),
)
}

// 追加到 LeaseCheckpoint[] 队列中
cps = append(cps, &pb.LeaseCheckpoint{ID: int64(lt.id), Remaining_TTL: remainingTTL})
}
return cps
}