gossip
一.Gossip 协议的执行过程:
Gossip 过程是由种子节点发起,当一个种子节点有状态需要更新到网络中的其他节点时,它会随机的选择周围几个节点散播消息,收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到了消息。这个过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。
二.算法原理
三.Gossip类型
Gossip 有两种类型:
- Anti-Entropy(反熵):以固定的概率传播所有的数据。Anti-Entropy 是 SI model,节点只有两种状态,Suspective 和 Infective,叫做 simple epidemics。
- Rumor-Mongering(谣言传播):仅传播新到达的数据。Rumor-Mongering 是 SIR model,节点有三种状态,Suspective,Infective 和 Removed,叫做 complex epidemics。
其实,Anti-entropy 反熵是一个很奇怪的名词,之所以定义成这样,Jelasity 进行了解释,因为 entropy 是指混乱程度(disorder),而在这种模式下可以消除不同节点中数据的 disorder,因此 Anti-entropy 就是 anti-disorder。换句话说,它可以提高系统中节点之间的 similarity。
在 SI model 下,一个节点会把所有的数据都跟其他节点共享,以便消除节点之间数据的任何不一致,它可以保证最终、完全的一致。由于在 SI model 下消息会不断反复的交换,因此消息数量是非常庞大的,无限制的(unbounded),这对一个系统来说是一个巨大的开销。
但是在 Rumor Mongering (SIR Model) 模型下,消息可以发送得更频繁,因为消息只包含最新 update,体积更小。而且,一个 Rumor 消息在某个时间点之后会被标记为 removed,并且不再被传播,因此,SIR model 下,系统有一定的概率会不一致。
而由于,SIR Model 下某个时间点之后消息不再传播,因此消息是有限的,系统开销小。
四.Gossip 中的通信模式
具体而言Gossip Protocol的通信模式可以分为Push-based、Pull-based和Pull/Push三种。
Push-based Gossip Protocol的具体工作流程如下:
- 网络中的某个节点随机的选择其他b个节点作为传输对象。
- 该节点向其选中的b个节点传输相应的数据(key,value,version)。
- 接收到信息的节点更新比自己新的数据,重复完成相同的工作。
Pull-based Gossip Protol的协议过程刚好相反:
- 某个节点V随机的选择b个节点将数据(key, version)推送。
- 收到请求的节点将本地比节点V新的数据(key, value, version)推送给 V,V 更新本新的数据(Key, value, version)推送给 V,V 更新本地。
五.Gossip性能分析
Gossip协议的分析是基于流行病学(Epidemiology)研究的。因此在分析Gossip的性能之前,需要首先介绍一下流行病学中基本的模型。
- (n+1)个人均匀的分布在一起
- 每一对人群之间的传染概率是β,显然0<β<1.
- 任意时刻,某个人要么处于infected的状态要么处于uninfected的状态.
- 一旦某个人从uninfected状态转变成为infected状态,其一直停留在infected状态。
六.gossip配置源码分析
type Config struct {
// 节点的名字,其在列表中唯一
Name string
//Transport是一个用于提供与其他节点通信的关联,如果保留为nil,则默认为memberlist。
//使用此结构中的BindAddr和BindPort创建NetTransport。
Transport Transport
// 与绑定到哪个地址和侦听的端口有关的配置。该端口用于UDP和TCP八卦。
// 默认其他节点在这个端口上运行,但不是必须
BindAddr string
BindPort int
// 与向其他群集成员advertise的地址有关的配置。用于NAT遍历。
AdvertiseAddr string
AdvertisePort int
// ProtocolVersion is the configured protocol version that we
// will _speak_. This must be between ProtocolVersionMin and
// ProtocolVersionMax.
ProtocolVersion uint8
// TCPTimeout是与远程节点建立流连接以进行完全状态同步以及流读写操作的超时。
TCPTimeout time.Duration
// 间接检测是在直接检测失败的情况下,要求节点执行间接检测的节点数。成员列表等待来自任何单个间接节点的ack,因此增加这个数目将增加以带宽为代价的间接探测成功的可能性。
indirectChecks int
// ReTrimeMult是对通过八卦广播的消息尝试重传次数的乘数。重传的实际计数使用公式计算:
// Retransmits = RetransmitMult * log(N+1)
//这允许重传与簇大小适当地缩放。RetransmitMult ,失败的广播越有可能以增加的带宽为代价收敛。 //
RetransmitMult int
// 无法访问的节点在声明death之前被认为是suspicion,超出suspicion时间则被认为节点death
// 计算公式如下
// SuspicionTimeout = SuspicionMult * log(N+1) * ProbeInterval
SuspicionMult int
// ProbeInterval和ProbeTimeout用于配置探测成员列表。
//
// ProbeInterval是随机节点探测之间的间隔。设置
//更低会导致memberlist(更频繁)的进行随机节点间的探测
//
// ProbeTimeout是等待来自被探测节点的确认的超时,取决于您网络上的RTT(往返时间)。
ProbeInterval time.Duration
ProbeTimeout time.Duration
// disableTcpPings will turn off the fallback TCP pings that are attempted
// if the direct UDP ping fails. These get pipelined along with the
// indirect UDP pings.
DisableTcpPings bool
// AwarenessMaxMultiplier will increase the probe interval if the node
// becomes aware that it might be degraded and not meeting the soft real
// time requirements to reliably probe other nodes.
AwarenessMaxMultiplier int
// GossipInterval and GossipNodes被用于配置gossip的成员列表
//GossipNodes是发送八卦消息的随机节点数
GossipInterval time.Duration
GossipNodes int
GossipToTheDeadTime time.Duration
}
EventDelegate是一个更简单的委托,仅用于接收有关成员加入和离开的通知。
type EventDelegate interface {
//当检测到某个节点已加入时,将调用NotifyJoin。
//不得修改Node参数。
NotifyJoin(* Node)
//当检测到一个节点离开时,调用NotifyLeave。
//不得修改Node参数。
NotifyLeave(* Node)
//检测到节点时调用NotifyUpdate
//更新,通常涉及元数据。Node参数
//不得修改。
NotifyUpdate(* 节点)
}
transport用于抽象与其他对等方的通信。假设分组接口是尽力而为,并且假定流接口是可靠的。
type Transport interface {
// finalAdvertiseAddr给出用户配置的值(可能是空的),并返回所需的IP和端口,
//以便向集群的其余部分进行advertise。
FinalAdvertiseAddr(ip string, port int) (net.IP, int, ERROR)
// WriteTo在传输中帮助在探测期间进行准确的RTT测量
WriteTo(b []byte, addr string) (time.Time, error)
// PacketCh返回可被读取以接收来自其他对等体的传入分组的信道。
PacketCh() <-chan *Packet
// 传输超时控制
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
// StreamCh返回一个可以读取的通道来处理来自其他对等点的传入流连接
StreamCh() <-chan net.Conn
// 关闭节点
Shutdown() error
}
七.gossip流程分析
1 )节点状态
节点的state有3种
- alive 节点是”活的”
- suspect 对于PingMsg没有应答或应答超时,这个节点的状态是”可疑的”
- dead 节点”已死亡”
- 如果节点B无法被对节点A发出的PingMsg(这里是作者自定义的udp协议,不是ICMP)进行响应,或者响应超时,它会被节点A标为suspect, 如果suspect持续一段时间(或它收到足够多的其它节点关于B的SuspectMsg),节点A会在集群中广播SuspectMsg,告知集群中的其它节点,节点B很可疑
2)如果B收到了针对它的SuspectMsg,这显然是对它的不利言论,B可以通过发送AliveMsg告知对方, “I’m alive”。那么在对方节点看来B的state从suspect变为alive
- 如果一段时间内,B的状态仍然是suspect, 那么对节点A而言,B的状态会被置为dead
- 如果节点B在down掉一段时间后,重新上线,它可以通过与种子节点的Gossip(push/pull) 重新被认为alive
2.节点动作
state.go
// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// If we already have tickers, then don't do anything, since we're
// scheduled
if len(m.tickers) > 0 {
return
}
// Create the stop tick channel, a blocking channel. We close this
// when we should stop the tickers.
stopCh := make(chan struct{})
// Create a new probeTicker
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
// 动作1
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = APPend(m.tickers, t)
}
// Create a push pull ticker if needed
if m.config.PushPullInterval > 0 {
// 动作2
go m.pushPullTrigger(stopCh)
}
// Create a gossip ticker if needed
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
// 动作3
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
m.tickers = append(m.tickers, t)
}
// If we made any tickers, then record the stopTick channel for
// later.
if len(m.tickers) > 0 {
m.stopTick = stopCh
}
}
-
动作1 probe
周期性的探测所有集群中状态为alive和suspect的节点
-
动作2 pushpull
周期性的从已知的alive的集群节点中选1个节点进行push/pull交换信息
交换的信息包含2种
a) 集群信息
b) 用户自定义的状态信息
-
动作3 gossip
其实它是广播所有处于dead的节点(只广播一次)
注意
这里需要说明的是
1)广播其实也是一种Gossip,发布者并不把消息发给集群中的每一个节点,而是随机挑选n个(默认是3个),将消息发送出去
2)处于dead状态的节点,仍然会被保留在集群信息中一段时间,以便Push/Pull的时候,这个状态能够被扩散出去