分类: 架构设计与优化
2023-02-17 14:33:07
作者: 京东物流 郭益如
在分布式系统中, 什么是拜占庭将军问题?产生的场景和解决方案是什么?什么是 Raft 共识算法?Raft 算法是如何解决拜占庭将军问题的?其核心原理和算法逻辑是什么?除了 Raft,还有哪些共识算法?共识问题作为分布式系统的一大难点和痛点,本文主要介绍了其产生的背景、原因,以及通用的 Raft 算法解决方案。
【分布式对等网络中的通信容错问题。
在分布式计算中,不同的计算机通过通讯交换信息达成共识按照一套协作策略行动。有时候,系统中的成员计算机可能出错而发送错误的信息,用于传递信息的通讯网络也可能导致信息损坏,使得网络中不同的成员关于全体协作的策略得出不同结论,从而破坏系统一致性,这就是拜占庭将军问题。
拜占庭将军问题被认为是容错性问题中{BANNED}最佳难的问题类型之一。】
9 位将军兵分 9 路去打仗,他们各自有权力观测敌情并做出行动判断 —— 进攻或撤退,他们必须行动一致,即所有军队一起进攻或者一起撤退,否则部分进攻部分撤退会造成灾难性后果。
前提:
将军之间只能通过信使互相联系,每位将军将自己的判断发送给其他将军,并接收其他将军发送的判断;
收到信息的将军综合所有的判断,当超过半数都选择进攻时,就决定进攻,当超过半数都选择撤退时就决定撤退;
问题是,将军中间可能出现叛徒,他可能会选择相反的结果进行通信(投票),也可能选择性的发送信息,叛徒要达成的目标是:
选择性的发送信息,欺骗某些将军采取进攻的行动;
促成一个错误的决定,比如将军们不希望进攻时进攻;
迷惑某些将军,使得他们无法做出决定;
如果叛徒达成了其中之一,任何的攻击结果都是注定要失败的,只有完全达成一致的努力才能获得胜利。
比如,可能 9 位将军中有 8 位忠诚的将军和一名叛徒,8 位将军中 4 位选择进攻,4 位选择撤退,叛徒分别给选择进攻的将军发送进攻的信息,给选择撤退的将军发送撤退信息。这样一来,在4 位选择进攻的将军看,共 5 位将军选择进攻,从而发起进攻;而在 4 位选择撤退的将军看,共 5 位将军选择撤退,从而发起撤退,这样各个将军的一致性就遭到了破坏。
并且,叛徒将军可能会伪造其他将军的身份发送信件;
拜占庭将军问题描述的是,在存在信息丢失的不可靠信道上试图通过消息传递的方式达到一致性是不可能的,在系统中除了存在的消息延迟或不可送达故障外,还可能包括消息篡改、节点处理异常等潜在性异常。
在早期的解决方案中,一种是 “拜占庭容错” ,它遵循“少数服从多数”的共识机制,即使出现了错误或伪造的信息,只要有问题的将军数量不到 1/3,仍可以达到“拜占庭容错”,使整个系统便可以正常运作。
为什么是 1/3呢?
其原理是这样的,假设将军总数是 N,其中正直的将军数量是 S,反叛的将军数量是 T, 那么 N=S+T;
为了保证即使反叛的将军都不去投票也能产生{BANNED}最佳终的结果,那么 S 必须要超过半数,这种情况下,S 都做出相同的选择,依然可以达成共识,即 S>T;
如果叛徒给一半支持进攻的将军发送进攻信息,给一半支持撤退的将军发送撤退信息,这种情况要保证也能产生{BANNED}最佳终的投票结果,则 X > S/2 + E;
综合以上关系,可以得到:
N = S + T
X < S
X > S/2 + T
求解以上不等式,可以得到:
(N-T)/2 > T,即 N > 3T
所以要保证正直的将军至少占所有将军总数的 2/3,才有可能达成共识。
拜占庭算法是一种共识算法,确定共识的原则,各个节点通过这个共识原则既可以保证一致性,又能保证基本的分区容错性。
共识是可容错系统中的一个基本问题: 即使面对故障,服务器如何在共享状态上达成一致?
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取{BANNED}最佳新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,{BANNED}最佳终将目标页面展示到屏幕。
【Raft 算法解决的是简化版的拜占庭将军问题,即在不考虑数据丢失、篡改的情况下的拜占庭将军问题。】
假设现在有 3 位将军 A、B、C,将军中没有叛徒,信使的信息可靠,但有可能被暗杀,此时将军们如何达成进攻的一致性决定?
方案: Raft的方案是,在所有的将军中选出一个大将军,用来做出所有的决定。大将军派信使给其他将军,如果过一段时间没有回复(可能被暗杀)就再派一个信使,直到收到回复。
如果大将军的信使,派出去一个被干掉一个,其他将军们总也收不到大将军的信息,他们如何达成一致性决定?
方案: 每位将军都有一个随机时间的计时器,时间一到,他就把自己当成大将军的候选人,派信使将选举结果给将军 B、C。 如果将军 B、C 还没有把选举大将军结果投给其他人(包括自己)时,他们就会把选举票投给 A。A 将军的信使返回 A 时,A 将军就知道自己收到了足够的票数,成为了新的大将军。
Raft 算法是一种简单易懂的共识算法,它通过首先选举一个 Leader 主节点,然后让Leader 完全负责数据同步,依靠状态机和主从同步的方式,在各个节点之间实现数据的一致性。
通过这种主节点进行数据同步的方式,Raft 将一致性问题拆分成了三个相对独立的子问题:
1. 主节点选取 Leader Election: 启动集群时,或者现有主节点失败时,会启动新的投票,获得大多数选票(N/2+1)的节点会成为新的主节点;
2. 复制日志 Log Replication: 主节点从客户端接收日志信息,再把信息复制到其他从节点上,使得日志信息都能保持数据一致;
3. 安全性: Raft 定义了一系列规范来保证数据安全性。
Raft 算法为节点定义了三种角色: Leader(主节点) 、Follower(从节点) 、Candidate(参与投票竞争的节点) ,节点的角色是可以转换的,在任意的时间,每个服务器一定处于三种状态中的一个。
每个节点上都有一个倒计时器(Election Timeout),随机值在 150ms ~ 300ms 之间,当节点收到选举请求,或收到 Leader 的 Heartbeat 时,就会重置倒计时。
通常情况下,系统中只有一个主节点,用来发起心跳,处理所有的客户端请求,创建日志和同步日志。
除主节点外,其他的节点都是从节点,用于接收主节点的心跳和日志数据,保证其数据状态与主节点一致,以及在 Leader 选举时,投票给 Candidate。
如果有客户端跟Follower 联系,那么 Follower 会把请求重定向给 Leader。
候选人 Candidate是在 Leader 选举过程中的临时角色,由 Follower 转换而来,用于发起投票参与竞选。
Raft 节点状态图:
图1 Raft 节点状态图
启动时,或 Follower 接收不到 Leader 信息时,它就会变成 Candidate 并发起一次选举。获得集群中大多数选票的Candidate 就成为新的 Leader。
Raft 把时间分割成任意长度的任期 Term,用连续的整数标记。
图2 各任期 Term 下的状态演变
每一个任期都会开始一次新的选举,一个或多个 Candidate 会尝试成为 Leader。如果一个 Candidate 赢得了选举,它就会在该任期内担任 Leader,直到任期结束或者服务器宕机。在某些情况下,没有选出 Leader(如选票瓜分等),则会开启下一个任期并立刻开始新的选举。
任期在 Raft 算法中充当逻辑时钟的作用,每一个节点都会存储当前的 Term 号,这一编号在整个集群时期内单调增长,服务器之间通信的时候也会交换当前的 Term 号:
Raft 算法中服务器节点之间通信使用远程过程调用(RPCs),并且基本的一致性算法只需要两种类型的 RPCs。请求投票(RequestVote) RPCs 由候选人在选举期间发起,然后附加条目(AppendEntries)RPCs 由领导者发起,用来复制日志和提供一种心跳机制。如果未及时收到响应,则请求者有责任重试 RPC。
每一个事件是一个 Entry,只有 Leader 可以创建 Entry,结构为
日志是 Raft 的核心概念,是一个由 Entry 构成的数组。只有 Leader 才可以改变其他节点的 Log。Leader 先把 Entry 添加到自己的 Log 数组中,发起共识请求,获得同意后,才会将 Entry 提交给状态机。Follower 只能从 Leader 中获取新日志和当前的 CommitIndex,然后把对应的 Entry 应用到自己的状态机中。
Follower 自增的 term 号并且转换状态为 Candidate。然后他会向所有节点发起 RequestVoteRPC 请求, Candidate 的状态会持续到以下情况发生:
在 Candidate 等待选票的时候,它可能收到其他节点声明其是 Leader 的心跳:
为了避免出现“脑裂”,即同一时刻出现多个 Candidate,导致没有 Candidate 获得大多数选票的状况,Raft 增加了随机选举超时时间的方法。每一个Candidate 在发起选举后,都会随机化一个超时时间( 150-300 毫秒),使得各个服务器分散开来,在大多数情况下只有一个服务器会率先超时,赢得选举。
相关代码实现:
【plain】
func (rf *Raft) RequestVote(request *RequestVoteRequest, response *RequestVoteResponse) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()
defer DPrintf("{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing requestVoteRequest %v and reply requestVoteResponse %v", rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), request, response)
if request.Term < rf.currentTerm || (request.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != request.CandidateId) {
response.Term, response.VoteGranted = rf.currentTerm, false
return
}
if request.Term > rf.currentTerm {
rf.ChangeState(StateFollower)
rf.currentTerm, rf.votedFor = request.Term, -1 }
if !rf.isLogUpToDate(request.LastLogTerm, request.LastLogIndex) {
response.Term, response.VoteGranted = rf.currentTerm, false
return
}
rf.votedFor = request.CandidateId rf.electionTimer.Reset(RandomizedElectionTimeout())
response.Term, response.VoteGranted = rf.currentTerm, true
}
func (rf *Raft) StartElection() {
request := rf.genRequestVoteRequest()
DPrintf("{Node %v} starts election with RequestVoteRequest %v", rf.me, request)
// use Closure
grantedVotes := 1 rf.votedFor = rf.me
rf.persist()
for peer := range rf.peers {
if peer == rf.me {
continue
}
go func(peer int) {
response := new(RequestVoteResponse)
if rf.sendRequestVote(peer, request, response) {
rf.mu.Lock()
defer rf.mu.Unlock() DPrintf("{Node %v} receives RequestVoteResponse %v from {Node %v} after sending RequestVoteRequest %v in term %v", rf.me, response, peer, request, rf.currentTerm)
if rf.currentTerm == request.Term && rf.state == StateCandidate {
if response.VoteGranted {
grantedVotes += 1 if grantedVotes > len(rf.peers)/2 { DPrintf("{Node %v} receives majority votes in term %v", rf.me, rf.currentTerm)
rf.ChangeState(StateLeader)
rf.BroadcastHeartbeat(true)
}
} else if response.Term > rf.currentTerm { DPrintf("{Node %v} finds a new leader {Node %v} with term %v and steps down in term %v", rf.me, peer, response.Term, rf.currentTerm)
rf.ChangeState(StateFollower)
rf.currentTerm, rf.votedFor = response.Term, -1 rf.persist()
}
}
}
}(peer)
}
}
Raft 通过Leader 向集群中所有 Follower 进行日志同步来保证整个集群数据的{BANNED}最佳终一致性。
只有 Leader 有权限接受客户端的请求并且同步数据给集群中其他节点。每一个客户端的请求都包含一条需要被复制状态机 RSM(Replicated State Mechine)执行的命令,Leader 收到客户端请求后,会生成一个 Entry,包含,再将这个 entry 添加到自己的日志末尾后,向所有的节点广播该 Entry,要求其他服务器复制这条 Entry。
图3 主从节点进行 Entry 复制
如图所示,Logs 日志是一个顺序存储的 Entry 数组,方框内是任期 Term 号。
例如,在 Term3 中,Leader {BANNED}最佳后一个 Entry 的Index 为 7,x 值为 5,收到请求 set x=4时:
图4 日志同步流程
Leader 收到客户端请求 x←4 时,Leader 会生成一条新的 Entry<8, 3, set x=4>,并将该 Entry 添加到自己的 Log 数组{BANNED}最佳后
Leader 通过 AppendEntries RPC 广播该 Entry;
如果 Follower 接受该 Entry,则会将 Entry 添加到其日志后面,同时返回给 Leader 同意。
如果 Leader 收到了多数的成功响应,Leader 认为这个 Entry 是 committed,应用到自己的状态机 RSM 中,并且向客户端返回执行结果。之后,该 commited 信息会随着之后的 AppendEntryRPC 传达到其他节点。
committed 表示被 Leader 创建的 Entry 已经复制到了大多数的服务器上,Leader 会跟踪它记录的{BANNED}最佳大索引值 Index,并在之后的 AppendEntries RPC(包括心跳)中,包含该索引值,以此确保其他服务器同步这个 Entry 已经提交,Follower 接收到该信息后,也会按顺序同步更新到本地的状态机中。
Raft 通过这种日志机制来保证不同服务器上日志的一致性和安全性:
一般情况下,Leader 和 Follower 的日志保持一致,AppendEntries 的一致性检查通常不会失败。然后,Leader Crash 可能会导致数据丢失:
图5 Leader Crash时的数据状况
当{BANNED}最佳上面的 Leader 掌权后,Follower 日志可能有 a~f 几种情况:
1. 日志丢失(a,b);
2. Follower 含有未提交数据(c、d);
3. 日志丢失 + Follower 含有未提交数据(e、f);
场景 f 可能出现的情况为:
如果一台服务器在 Term2 时是 Leader,并且向它的日志中添加了一些数据条目,然后在数据提交前宕机了;接着该 Leader 很快重启后,又称为了任期 3 的 Leader,接着又向它的日志中添加了一些数据,然后在 Term2,Term3 数据条目提交前,又宕机了,之后一直处于宕机状态,直到有新的 Leader 产生。
当遇到这种一致性检查失败的情况时,Leader 通过强制 Follower 复制自己的日志来处理日志的不一致。这就意味着,在 Follower 上的冲突日志会被领导者的日志覆盖。
Leader 找到 Follower 与它日志一致的地方(Index=3),然后删除 Follower 在该位置之后的日志,接着把这之后的日志发送给 Follower:
Leader 给每一个Follower 维护了一个 nextIndex,它表示 Leader 将要发送给该追随者的下一条日志条目的索引。当一个 Leader 开始掌权时,它会将 nextIndex 初始化为它的{BANNED}最佳新的日志条目索引数+1。如果一个 Follower 的日志和 Leader 的不一致,AppendEntries 一致性检查会在下一次 AppendEntries RPC 时返回失败。在失败之后,Leader 会将 nextIndex 递减然后重试 AppendEntries RPC。{BANNED}最佳终 nextIndex 会达到一个 Leader 和 Follower 日志一致的地方。这时,AppendEntries 会返回成功,Follower 中冲突的日志条目都被移除了,并且添加所缺少的上了 Leader 的日志条目。一旦 AppendEntries 返回成功,Follower 和 Leader 的日志就一致了,这样的状态会保持到该任期结束。
相关实现代码:
【plain】
func (rf *Raft) replicateOneRound(peer int) {
rf.mu.RLock()
if rf.state != StateLeader {
rf.mu.RUnlock()
return
}
prevLogIndex := rf.nextIndex[peer] - 1 if prevLogIndex < rf.getFirstLog().Index { // only snapshot can catch up request := rf.genInstallSnapshotRequest()
rf.mu.RUnlock()
response := new(InstallSnapshotResponse)
if rf.sendInstallSnapshot(peer, request, response) {
rf.mu.Lock()
rf.handleInstallSnapshotResponse(peer, request, response)
rf.mu.Unlock()
}
} else { // just entries can catch up request := rf.genAppendEntriesRequest(prevLogIndex)
rf.mu.RUnlock()
response := new(AppendEntriesResponse)
if rf.sendAppendEntries(peer, request, response) {
rf.mu.Lock()
rf.handleAppendEntriesResponse(peer, request, response)
rf.mu.Unlock()
}
}
}
func (rf *Raft) AppendEntries(request *AppendEntriesRequest, response *AppendEntriesResponse) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()
defer DPrintf("{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing AppendEntriesRequest %v and reply AppendEntriesResponse %v", rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), request, response)
if request.Term < rf.currentTerm {
response.Term, response.Success = rf.currentTerm, false
return
}
if request.Term > rf.currentTerm {
rf.currentTerm, rf.votedFor = request.Term, -1 }
rf.ChangeState(StateFollower)
rf.electionTimer.Reset(RandomizedElectionTimeout())
if request.PrevLogIndex < rf.getFirstLog().Index {
response.Term, response.Success = 0, false DPrintf("{Node %v} receives unexpected AppendEntriesRequest %v from {Node %v} because prevLogIndex %v < firstLogIndex %v", rf.me, request, request.LeaderId, request.PrevLogIndex, rf.getFirstLog().Index)
return
}
if !rf.matchLog(request.PrevLogTerm, request.PrevLogIndex) {
response.Term, response.Success = rf.currentTerm, false
lastIndex := rf.getLastLog().Index
if lastIndex < request.PrevLogIndex {
response.ConflictTerm, response.ConflictIndex = -1, lastIndex+1 } else {
firstIndex := rf.getFirstLog().Index
response.ConflictTerm = rf.logs[request.PrevLogIndex-firstIndex].Term
index := request.PrevLogIndex - 1 for index >= firstIndex && rf.logs[index-firstIndex].Term == response.ConflictTerm {
index--
}
response.ConflictIndex = index
}
return
}
firstIndex := rf.getFirstLog().Index
for index, entry := range request.Entries {
if entry.Index-firstIndex >= len(rf.logs) || rf.logs[entry.Index-firstIndex].Term != entry.Term {
rf.logs = shrinkEntriesArray(append(rf.logs[:entry.Index-firstIndex], request.Entries[index:]...))
break
}
}
rf.advanceCommitIndexForFollower(request.LeaderCommit)
response.Term, response.Success = rf.currentTerm,True
}
Leader 需要保证其存储全部已经提交的日志条目,保证日志条目只能从 Leader 流向 Follower,且 Leader 永远不会覆盖已经存在的日志条目。
每个 Candidate 发送 RequestVoteRPC 时,都会带上{BANNED}最佳后一个 Entry 的信息。所有节点收到投票信息时,会对该 Entry 进行比较,如果发现自己的更新,则拒绝投票给该 Candidate。
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取{BANNED}最佳新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,{BANNED}最佳终将目标页面展示到屏幕。
早期的共识算法,由拜占庭将军问题的提出者 Leslie Lamport 所发明。谷歌的分布式锁服务 Chubby 就是以 Paxos 算法为基础。
Zookeeper 所使用的一致性算法,在流程上和 Raft 算法比较接近。
区块链技术所使用的共识算法之一,适用于私有链的共识。
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取{BANNED}最佳新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,{BANNED}最佳终将目标页面展示到屏幕。
Raft 算法是很广泛的强一致性、去中心化和高可用的分布式协议,是一种 leader-based 的共识算法。通过将共识问题拆分成主节点选举和主从日志同步,以及安全流程,来提高分布式系统的数据一致性、可靠性和容错性;首先选举主节点,然后主节点负责接收外部请求、数据复制、提交,保证系统中数据都是一致的。