type Lessor interface { // SetRangeDeleter lets the lessor create TxnDeletes to the store. // Lessor deletes the items in the revoked or expired lease by creating // new TxnDeletes. SetRangeDeleter(rd RangeDeleter)
SetCheckpointer(cp Checkpointer)
// Grant grants a lease that expires at least after TTL seconds. Grant(id LeaseID, ttl int64) (*Lease, error) // Revoke revokes a lease with given ID. The item attached to the // given lease will be removed. If the ID does not exist, an error // will be returned. Revoke(id LeaseID) error
// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set the expiry of leases to less than the full TTL when possible. Checkpoint(id LeaseID, remainingTTL int64) error
// Attach attaches given leaseItem to the lease with given LeaseID. // If the lease does not exist, an error will be returned. Attach(id LeaseID, items []LeaseItem) error
// GetLease returns LeaseID for given item. // If no lease found, NoLease value will be returned. GetLease(item LeaseItem) LeaseID
// Detach detaches given leaseItem from the lease with given LeaseID. // If the lease does not exist, an error will be returned. Detach(id LeaseID, items []LeaseItem) error
// Promote promotes the lessor to be the primary lessor. Primary lessor manages // the expiration and renew of leases. // Newly promoted lessor renew the TTL of all lease to extend + previous TTL. Promote(extend time.Duration)
// Demote demotes the lessor from being the primary lessor. Demote()
// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, // an error will be returned. Renew(id LeaseID) (int64, error)
// Lookup gives the lease at a given lease id, if any Lookup(id LeaseID) *Lease
// Leases lists all leases. Leases() []*Lease
// ExpiredLeasesC returns a chan that is used to receive expired leases. ExpiredLeasesC() <-chan []*Lease
// Recover recovers the lessor state from the given backend and RangeDeleter. Recover(b backend.Backend, rd RangeDeleter)
// Stop stops the lessor for managing leases. The behavior of calling Stop multiple // times is undefined. Stop() }
// 将其从 map 对象中移除 l := le.leaseMap[id] if l == nil { le.mu.Unlock() return ErrLeaseNotFound } deferclose(l.revokec) // unlock before doing external work le.mu.Unlock()
le.mu.Lock() defer le.mu.Unlock() delete(le.leaseMap, l.ID) // lease deletion needs to be in the same backend transaction with the // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
// 租约的 Renew 操作只能发生在 Leader 节点 if !le.isPrimary() { // forward renew request to primary instead of returning error. le.mu.RUnlock() return-1, ErrNotPrimary }
demotec := le.demotec
// 判断对应的租约是否存在 l := le.leaseMap[id] if l == nil { le.mu.RUnlock() return-1, ErrLeaseNotFound } // 判断是否需要重置租约的剩余时间 clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
le.mu.RUnlock() // 如果租约过期,必须走正常的租约过期流程进行租约过期操作 if l.expired() { select { // A expired lease might be pending for revoking or going through // quorum to be revoked. To be accurate, renew request must wait for the // deletion to complete. case <-l.revokec: return-1, ErrLeaseNotFound // The expired lease might fail to be revoked if the primary changes. // The caller will retry on ErrNotPrimary. case <-demotec: return-1, ErrNotPrimary case <-le.stopC: return-1, ErrNotPrimary } }
// 这里是进行撤销已经过期的租约 func(le *lessor)revokeExpiredLeases() { var ls []*Lease
// rate limit revokeLimit := leaseRevokeRate / 2
le.mu.RLock() // 如果是 Leader 节点 if le.isPrimary() { // 这里找出过期的租约列表 ls = le.findExpiredLeases(revokeLimit) } le.mu.RUnlock()
iflen(ls) != 0 { select { // 如果该租约管理者已经停止了,结束,生命也不做 case <-le.stopC: return // 将过期的租约发送到 channel 中 case le.expiredC <- ls: default: // the receiver of expiredC is probably busy handling // other stuff // let's try this next time after 500ms } } }
for { // 租约管理者调用自身的 expireExists 方法去判断当前是否有租约过期, // 如果有租约过期,在判断他的下一个是否也是过期的了,好判断这个循环是否需要继续 l, ok, next := le.expireExists() if !ok && !next { break } if !ok { continue } if next { continue }
func(le *lessor)expireExists()(l *Lease, ok bool, next bool) { // 如果当前需要进行过期通知的元素没有,则结束 if le.leaseExpiredNotifier.Len() == 0 { returnnil, false, false }
// 这里弹出堆顶的租约 ID item := le.leaseExpiredNotifier.Poll() // 找到租约对象。 l = le.leaseMap[item.id] if l == nil { // 如果不存在需要将其注销 le.leaseExpiredNotifier.Unregister() // O(log N) returnnil, false, true } now := time.Now() // 判断当前时间是否小于租约的到期目标时间,如果小于则表示当前没有任何一个租约过期了 if now.UnixNano() < item.time /* expiration time */ { // Candidate expirations are caught up, reinsert this item // and no need to revoke (nothing is expiry) return l, false, false }
// When a lease's deadline should be persisted to preserve the remaining TTL across leader elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
func(le *lessor)checkpointScheduledLeases() { var cps []*pb.LeaseCheckpoint