memberlist 的 gossip 实现

Gossip 协议

Gossip协议,也叫做流言算法,或者疫情传播协议,在这个协议当中,所有的节点的角色地位都是一样的,节点间的数据怎么同步呢?大致的步骤如下

  • 随机选取周边的k个节点
  • 向这k个节点进行数据广播同步
  • 这k个节点再次重复刚刚的操作

因此,最终一个消息经过多轮广播之后,所有的节点都同步了此数据。

gossip协议存在三种数据同步方式

  • Push : 节点 A 将数据 (key,value,version) 及对应的版本号推送给 B 节点,B 节点更新 A 中比自己新的数据
  • Pull : A 仅将数据 key, version 推送给 B,B 将本地比 A 新的数据(Key, value, version)推送给 A,A 更新本地
  • Push/Pull : 与 Pull 类似,只是多了一步,A 再将本地比 B 新的数据推送给 B,B 则更新本地

memberlist 的实现

那么现在来分析一下一个Gossip的实现——hashicorp/memberlist

创建一个 Memberlist

1
2
3
4
5
6
7
8
9
10
11
12
13
func Create(conf *Config) (*Memberlist, error) {
m, err := newMemberlist(conf)
if err != nil {
return nil, err
}
if err := m.setAlive(); err != nil {
m.Shutdown()
return nil, err
}
// 重要方法,里面调度了三个重要的,与gossip协议有关的任务
m.schedule()
return m, nil
}

接着这一块代码就很重要了,在scheuld里面调度了三个重要的任务,这三个任务都有各自的指责,现在我们来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()

// If we already have tickers, then don't do anything, since we're
// scheduled
if len(m.tickers) > 0 {
return
}

// Create the stop tick channel, a blocking channel. We close this
// when we should stop the tickers.
stopCh := make(chan struct{})

// 用于单轮故障检测和gossip协议广播
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = append(m.tickers, t)
}

// 创建一个定时的 push-pull 触发器
if m.config.PushPullInterval > 0 {
go m.pushPullTrigger(stopCh)
}

// 创建一个定时任务,将消息广播给随机的k个节点
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
m.tickers = append(m.tickers, t)
}

// If we made any tickers, then record the stopTick channel for
// later.
if len(m.tickers) > 0 {
m.stopTick = stopCh
}
}

probe

首先我们先来分析下m.probe是做什么事情的

这里会通过m.config.ProbeInterval参数来判断是否开启Probe任务,如果设置了的话,则会开启一个定时的TimerTicker

1
2
3
4
5
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = append(m.tickers, t)
}

接下来就是Probe任务的正体了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
// Track the number of indexes we've considered probing
numCheck := 0
START:
// 开始本次的循环
m.nodeLock.RLock()

// 确保不会出现无限循环
if numCheck >= len(m.nodes) {
m.nodeLock.RUnlock()
return
}

// 当索引超出节点的数量长度时,重置索引位置
if m.probeIndex >= len(m.nodes) {
m.nodeLock.RUnlock()
m.resetNodes()
m.probeIndex = 0
// 记录循环次数
numCheck++
goto START
}

// 确定是否应该探测该节点
skip := false
var node nodeState

node = *m.nodes[m.probeIndex]
// 节点是不是自己
if node.Name == m.config.Name {
// 如果节点是自己,则跳过
skip = true
} else if node.DeadOrLeft() {
// 如果节点已经死亡或者离开集群了,则跳过
skip = true
}

// Potentially skip
m.nodeLock.RUnlock()
m.probeIndex++
if skip {
numCheck++
goto START
}

// 探查特定节点
m.probeNode(&node)
}

上面这段代码看着其实就挺简单的,其核心就是,判断该节点是否可以进行探测任务。总结下几个步骤

  • 当前循环测试是否超过了节点数量,超过则跳出循环
  • 当前节点的索引是否超过节点数量,是则重置索引位置,重新循环
  • 当前节点是否是自己,或者是否以及处于死亡或者离开的状态,是则跳过重新循环
  • 进行节点探测

接着我们来看probeNode(&node)做的事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())

// metrics data collection
...

// 开始准备好本次的Ping消息
selfAddr, selfPort := m.getAdvertise()
ping := ping{
SeqNo: m.nextSeqNo(),
Node: node.Name,
SourceAddr: selfAddr,
SourcePort: selfPort,
SourceNode: m.config.Name,
}
// 构建 ack channel
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
// 构建 uack channel
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
// 设置 Probe 对应的 channel 通道
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)

// 准备发送的事件
sent := time.Now()

// 期望的截止时间
deadline := sent.Add(probeInterval)
// probe的目标节点
addr := node.Address()

// Arrange for our self-awareness to get updated.
var awarenessDelta int
defer func() {
m.awareness.ApplyDelta(awarenessDelta)
}()

// 如果目前节点的状态为存活状态
if node.State == stateAlive {
// 这里直接调用 encodeAndSendMsg 将 ping 信息发出去
if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
// 根据错误原因进行判断,是否需要跳转到对应的处理逻辑
if failedRemote(err) {
goto HANDLE_REMOTE_FAILURE
} else {
return
}
}
} else {
var msgs [][]byte
// 对数据进行编码
if buf, err := encode(pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
// 怀疑信息
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
if buf, err := encode(suspectMsg, &s); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
// 对需要发送的数据进行压缩
compound := makeCompoundMessage(msgs)
// 调用 rawSendMsgPacket 发送数据
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
// 根据错误原因进行判断,是否需要跳转到对应的处理逻辑
if failedRemote(err) {
goto HANDLE_REMOTE_FAILURE
} else {
return
}
}
}

// Arrange for our self-awareness to get updated. At this point we've
// sent the ping, so any return statement means the probe succeeded
// which will improve our health until we get to the failure scenarios
// at the end of this function, which will alter this delta variable
// accordingly.
awarenessDelta = -1

// Wait for response or round-trip-time.
select {
// 如果 ack 通道有消息了
case v := <-ackCh:
// ack 相应的处理逻辑
if v.Complete == true {
if m.config.Ping != nil {
rtt := v.Timestamp.Sub(sent)
// 广播通知本次 probe 的结果
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
}
return
}

// As an edge case, if we get a timeout, we need to re-enqueue it
// here to break out of the select below.
if v.Complete == false {
ackCh <- v
}
// 如果在指定的时间内还没有完成,则结束
case <-time.After(m.config.ProbeTimeout):
// Note that we don't scale this timeout based on awareness and
// the health score. That's because we don't really expect waiting
// longer to help get UDP through. Since health does extend the
// probe interval it will give the TCP fallback more time, which
// is more active in dealing with lost packets, and it gives more
// time to wait for indirect acks/nacks.
m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name)
}

// 这里开始处理错误逻辑
HANDLE_REMOTE_FAILURE:
// 获取一些随机活动节点
m.nodeLock.RLock()
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
// 如果是本机节点或者是目标节点或者节点状态不是存活的节点,都不会被选中
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()

// 尝试间接对节点进行Ping操作,其实这里就是由于可能是自己的问题导致对该节点的访问失败,因此让其他节点代替自己对目标节点发起Ping操作
expectedNacks := 0
selfAddr, selfPort = m.getAdvertise()
ind := indirectPingReq{
SeqNo: ping.SeqNo,
Target: node.Addr,
Port: node.Port,
Node: node.Name,
SourceAddr: selfAddr,
SourcePort: selfPort,
SourceNode: m.config.Name,
}

// 对目前的 k 个节点进行消息发送
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}

// 发送消息
if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
}
}

// 因为UDP的不可靠性,这里再次采用一个TCP探测来避免由于网络故障引起的探测失败问题。
// 这里自己直接通过TCP协议进行一次对该节点的直接通信操作
fallbackCh := make(chan bool, 1)

// 首先要判断是否可以开启TCP直接通信操作
disableTcpPings := m.config.DisableTcpPings ||
(m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name))
if (!disableTcpPings) && (node.PMax >= 3) {
go func() {
defer close(fallbackCh)
// 直接对目标节点发起TCP通信
didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
} else {
fallbackCh <- didContact
}
}()
} else {
close(fallbackCh)
}

// 如果接收到 ack 回复,则结束
select {
case v := <-ackCh:
if v.Complete == true {
return
}
}

for didContact := range fallbackCh {
// 这里如果TCP探测成功,但是UDP探测失败,则当前的网络状况出现以异常
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
return
}
// 否则,当TCP探测也失败时,则需要下一步处理——广播自己对该节点的健康状态有所怀疑
}

...

// 无论是UDP还是TCP,都没能接收到目标节点的反馈,广播自己怀疑该节点健康状况有问题.
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
}

至此,probe任务到这里就讲完了

pushPullTrigger

接着,我们来看看m.pushPullTrigger是做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 这里是 gossip 的 push-pull 定时触发任务,并且这里 memberlist 做了一个优化:根据当前集群节点数据来决定,从而避免影响当前网络状况。
func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
interval := m.config.PushPullInterval

// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
select {
case <-time.After(randStagger):
case <-stop:
return
}

// Tick using a dynamic timer
for {
// pushPullScale 是一个优化点,通过当前集群节点的数量来动态的调整任务延迟时间
tickTime := pushPullScale(interval, m.estNumNodes())
select {
case <-time.After(tickTime):
m.pushPull()
case <-stop:
return
}
}
}

这里的pushPullScale就是一个优化的措施,如果当前集群节点的数量超出了阈值,则进行scale操作。

1
2
3
4
5
6
7
8
9
func pushPullScale(interval time.Duration, n int) time.Duration {
// Don't scale until we cross the threshold
if n <= pushPullScaleThreshold {
return interval
}

multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(pushPullScaleThreshold)) + 1.0
return time.Duration(multiplier) * interval
}

这里是通过 push-pull 操作,与对方节点交换自己的全部信息,这实际上是一个很昂贵的操作,因为要交换的数据可能很多很大。因此在一个 tick 内只会与一个节点进行一次 push-pull 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()

// 随机选取一个节点进行全部信息交换
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()

// If no nodes, bail
if len(nodes) == 0 {
return
}
node := nodes[0]

// 进行 push-pull 操作
if err := m.pushPullNode(node.FullAddress(), false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
}
}

当通过随机选取K个节点完成之后,会选择这k个节点的第一个节点进行两个节点间的数据同步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(a Address, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())

// 尝试向对方节点发送数据并且接收对方节点的反馈消息,包括两部分
// 一部分是对方已知的集群节点信息,一部分是用户数据信息
remote, userState, err := m.sendAndReceiveState(a, join)
if err != nil {
return err
}

// 这里接收到了对方节点的数据之后,将信息和自己本地的进行merge合并。
if err := m.mergeRemoteState(join, remote, userState); err != nil {
return err
}
return nil
}

那自己节点发送的数据是什么呢?也就是m.sendAndReceiveState这一步做了什么事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// sendLocalState is invoked to send our local state over a stream connection.
func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
// 首先这里设置网络链接的超时时间
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))

// Prepare the local node state
m.nodeLock.RLock()
// 这里将自己所知道的所有集群节点信息进行收集整理,待等下发送给对方节点
localNodes := make([]pushNodeState, len(m.nodes))
for idx, n := range m.nodes {
localNodes[idx].Name = n.Name
localNodes[idx].Addr = n.Addr
localNodes[idx].Port = n.Port
localNodes[idx].Incarnation = n.Incarnation
localNodes[idx].State = n.State
localNodes[idx].Meta = n.Meta
localNodes[idx].Vsn = []uint8{
n.PMin, n.PMax, n.PCur,
n.DMin, n.DMax, n.DCur,
}
}
m.nodeLock.RUnlock()

// Get the delegate state
// 如果存在用户数据信息,则提取出来准备一起发送
var userData []byte
if m.config.Delegate != nil {
userData = m.config.Delegate.LocalState(join)
}

// Create a bytes buffer writer
bufConn := bytes.NewBuffer(nil)

// 数据发送出去之前需要做好的一些编码操作
...

// Get the send buffer
return m.rawSendMsgStream(conn, bufConn.Bytes())
}

因此,到这里,push-pull操作中的push操作就完成了,其pull操作也基本类似,将自己的信息推给对方节点之后,就准备“拉取对方节点的数据”,在接收到对方节点发送来的消息之后,需要将对方节点的数据与自己的进行合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// mergeRemoteState is used to merge the remote state with our local state
func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, userBuf []byte) error {
// 对协议进行验证
if err := m.verifyProtocol(remoteNodes); err != nil {
return err
}

// Invoke the merge delegate if any
if join && m.config.Merge != nil {
// 这里传输过来的是对方节点他所已知的集群的成员节点列表信息数据
nodes := make([]*Node, len(remoteNodes))
for idx, n := range remoteNodes {
// 数据转换成对象
...
}
// 广播通知有节点需要合并操作
if err := m.config.Merge.NotifyMerge(nodes); err != nil {
return err
}
}

// 合并节点的状态
m.mergeState(remoteNodes)

// 如果存在用户数据信息,则调用接口用户需要进行数据合并操作
if userBuf != nil && m.config.Delegate != nil {
m.config.Delegate.MergeRemoteState(userBuf, join)
}
return nil
}

// mergeState is invoked by the network layer when we get a Push/Pull
// state transfer
func (m *Memberlist) mergeState(remote []pushNodeState) {
for _, r := range remote {
switch r.State {
// 如果是节点存在,则走 aliveNode
case stateAlive:
a := alive{
Incarnation: r.Incarnation,
Node: r.Name,
Addr: r.Addr,
Port: r.Port,
Meta: r.Meta,
Vsn: r.Vsn,
}
m.aliveNode(&a, nil, false)
// 如果节点已经离开,则走 deadNode
case stateLeft:
d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name}
m.deadNode(&d)
case stateDead:
// If the remote node believes a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
fallthrough
// 如果只是对节点有所怀疑,则广播此怀疑消息
case stateSuspect:
s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
m.suspectNode(&s)
}
}
}

到这里,push-pull的流程就走完了。

Gossip

现在,我们来看最后一步,也就是Gossip,这一步就是将自己的数据随机的广播给多个节点去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())

// Get some random live, suspect, or recently dead nodes
m.nodeLock.RLock()

// 这里随机选择K个节点来进行广播操作
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
if n.Name == m.config.Name {
return true
}

switch n.State {
case stateAlive, stateSuspect:
return false

// 如果节点处于 dead 状态的话,需要对该节点的上次状态变更时间距离现在时间的时间差,与GossipToTheDeadTimezhi
case stateDead:
return time.Since(n.StateChange) > m.config.GossipToTheDeadTime

default:
return true
}
})
m.nodeLock.RUnlock()

// Compute the bytes available
bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
if m.config.EncryptionEnabled() {
bytesAvail -= encryptOverhead(m.encryptionVersion())
}

for _, node := range kNodes {
// 这里会收集需要广播出去的数据信息,分为两大部分
// 第一部分,也是最优先收集的:memberlist 协议自身运行时产生的数据信息
// 第二部分,如果用户实现了接口,则收集用户侧需要进行广播的消息数据
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
return
}

// 这里的代码就只是将数据发送出去而已
...
}
}