From notifications-return-9581-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Thu Sep 9 15:27:08 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 82A76180670 for ; Thu, 9 Sep 2021 17:27:08 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id ACE133EE59 for ; Thu, 9 Sep 2021 15:27:07 +0000 (UTC) Received: (qmail 49861 invoked by uid 500); 9 Sep 2021 15:27:07 -0000 Mailing-List: contact notifications-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list notifications@zookeeper.apache.org Received: (qmail 49831 invoked by uid 99); 9 Sep 2021 15:27:07 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Sep 2021 15:27:07 +0000 From: =?utf-8?q?GitBox?= To: notifications@zookeeper.apache.org Subject: =?utf-8?q?=5BGitHub=5D_=5Bzookeeper=5D_Vanlightly_commented_on_a_change_in_p?= =?utf-8?q?ull_request_=231690=3A_ZOOKEEPER-3615=3A_Provide_formal_specifica?= =?utf-8?q?tion_and_verification_using_TLA+_for_Zab?= Message-ID: <163120122746.16310.13067755884713110213.asfpy@gitbox.apache.org> Date: Thu, 09 Sep 2021 15:27:07 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit References: In-Reply-To: Vanlightly commented on a change in pull request #1690: URL: https://github.com/apache/zookeeper/pull/1690#discussion_r705456407 ########## File path: zookeeper-specifications/zab-1.0/ZabWithFLE.tla ########## @@ -0,0 +1,1020 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *) + +----------------------------- MODULE ZabWithFLE ----------------------------- +(* This is the formal specification for the Zab consensus algorithm, + which means Zookeeper Atomic Broadcast.*) + +(* Reference: + FLE: FastLeaderElection.java, Vote.java, QuorumPeer.java in https://github.com/apache/zookeeper. + ZAB: QuorumPeer.java, Learner.java, Follower.java, LearnerHandler.java, Leader.java + in https://github.com/apache/zookeeper. + https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0. + *) +EXTENDS FastLeaderElection + +----------------------------------------------------------------------------- +(* Defined in FastLeaderElection.tla: +\* The set of server identifiers +CONSTANT Server + +\* Server states +CONSTANTS LOOKING, FOLLOWING, LEADING + +\* Message types +CONSTANTS NOTIFICATION + +\* Timeout signal +CONSTANT NONE +*) +\* The set of requests that can go into history +CONSTANT Value + +\* Zab states +CONSTANTS ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST + +\* Message types +CONSTANTS FOLLOWERINFO, LEADERINFO, ACKEPOCH, NEWLEADER, ACKLD, UPTODATE, PROPOSAL, ACK, COMMIT +(* Additional message types used for recovery in synchronization(TRUNC/DIFF/SNAP) are not needed + since we abstract this part.(see action RECOVERYSYNC) *) + +----------------------------------------------------------------------------- +(* Defined in FastLeaderElection.tla: Quorums, NullPoint *) +\* Return the maximum value from the set S +Maximum(S) == IF S = {} THEN -1 + ELSE CHOOSE n \in S: \A m \in S: n >= m + +\* Return the minimum value from the set S +Minimum(S) == IF S = {} THEN -1 + ELSE CHOOSE n \in S: \A m \in S: n <= m + +MAXEPOCH == 10 + +----------------------------------------------------------------------------- +(* Defined in FastLeaderElection.tla: + serverVars: <>, + electionVars: <>, + leaderVars: <>, + electionMsgs, + idTable *) +\* The current phase of server(ELECTION,DISCOVERY,SYNCHRONIZATION,BROADCAST) +VARIABLE zabState + +\* The epoch number of the last NEWEPOCH(LEADERINFO) packet accepted +\* namely f.p in paper, and currentEpoch in Zab.tla. +VARIABLE acceptedEpoch + +\* The history of servers as the sequence of transactions. +VARIABLE history + +\* commitIndex[i]: The maximum index of transactions that have been saved in a quorum of servers +\* in the perspective of server i.(increases monotonically before restarting) +VARIABLE commitIndex + +(* These transactions whose index \le commitIndex[i] can be applied to state machine immediately. + So if we have a variable applyIndex, we can suppose that applyIndex[i] = commitIndex[i] when verifying properties. + But in phase SYNC, follower will apply all queued proposals to state machine when receiving NEWLEADER. + But follower only serves traffic after receiving UPTODATE, so sequential consistency is not violated. + + So when we verify properties, we still suppose applyIndex[i] = commitIndex[i], because this is an engineering detail.*) + +\* learners[i]: The set of servers which leader i think are connected wich i. +VARIABLE learners + +\* The messages representing requests and responses sent from one server to another. +\* msgs[i][j] means the input buffer of server j from server i. +VARIABLE msgs + +\* The set of followers who has successfully sent CEPOCH(FOLLOWERINFO) to leader.(equals to connectingFollowers in code) +VARIABLE cepochRecv + +\* The set of followers who has successfully sent ACK-E to leader.(equals to electingFollowers in code) +VARIABLE ackeRecv + +\* The set of followers who has successfully sent ACK-LD to leader in leader.(equals to newLeaderProposal in code) +VARIABLE ackldRecv + +\* The set of servers which leader i broadcasts PROPOSAL and COMMIT to.(equals to forwardingFollowers in code) +VARIABLE forwarding + +\* ackIndex[i][j]: The latest index that leader i has received from follower j via ACK. +VARIABLE ackIndex + +\* currentCounter[i]: The count of transactions that clients request leader i. +VARIABLE currentCounter + +\* sendCounter[i]: The count of transactions that leader i has broadcast in PROPOSAL. +VARIABLE sendCounter + +\* committedIndex[i]: The maximum index of trasactions that leader i has broadcast in COMMIT. +VARIABLE committedIndex + +\* committedCounter[i][j]: The latest counter of transaction that leader i has confirmed that follower j has committed. +VARIABLE committedCounter + +\* initialHistory[i]: The initial history if leader i in epoch acceptedEpoch[i]. +VARIABLE initialHistory + +\* the maximum epoch in CEPOCH the prospective leader received from followers. +VARIABLE tempMaxEpoch + +\* cepochSent[i] = TRUE means follower i has sent CEPOCH(FOLLOWERINFO) to leader. +VARIABLE cepochSent + +\* leaderAddr[i]: The leader id of follower i. We use leaderAddr to express whether follower i has connected or lost connection. +VARIABLE leaderAddr + +\* synced[i] = TRUE: follower i has completed sync with leader. +VARIABLE synced + +\* The set of leaders in every epoch, only used in verifying properties. +VARIABLE epochLeader + +\* The set of all broadcast messages, only used in verifying properties. +VARIABLE proposalMsgsLog + +\* A variable used to check whether there are conditions contrary to the facts. +VARIABLE inherentViolated + +serverVarsZ == <> \* 7 variables + +electionVarsZ == electionVars \* 6 variables + +leaderVarsZ == <> \* 11 variables + +tempVarsZ == <> \* 2 variables + +followerVarsZ == <> \* 3 variables + +verifyVarsZ == <> \* 3 variables + +msgVarsZ == <> \* 2 variables + +vars == <> \* 35 variables +----------------------------------------------------------------------------- +\* Add a message to msgs - add a message m to msgs[i][j]. +Send(i, j, m) == msgs' = [msgs EXCEPT ![i][j] = Append(msgs[i][j], m)] + +\* Remove a message from msgs - discard head of msgs[i][j]. +Discard(i, j) == msgs' = IF msgs[i][j] /= << >> THEN [msgs EXCEPT ![i][j] = Tail(msgs[i][j])] + ELSE msgs + +\* Leader broadcasts a message(PROPOSAL/COMMIT) to all other servers in forwardingFollowers. +Broadcast(i, m) == msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF /\ v \in forwarding[i] + /\ v /= i + /\ \/ /\ m.mtype = PROPOSAL + /\ ackIndex[i][v] < Len(initialHistory[i]) + m.mproposal.counter + \/ /\ m.mtype = COMMIT + /\ committedCounter[i][v] < m.mzxid[2] + THEN Append(msgs[i][v], m) + ELSE msgs[i][v]]] + +BroadcastLEADERINFO(i, m) == msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF /\ v \in cepochRecv[i] + /\ v \in learners[i] + /\ v /= i THEN Append(msgs[i][v], m) + ELSE msgs[i][v]]] + +BroadcastUPTODATE(i, m) == msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF /\ v \in ackldRecv[i] + /\ v \in learners[i] + /\ v /= i THEN Append(msgs[i][v], m) + ELSE msgs[i][v]]] + +\* Combination of Send and Discard - discard head of msgs[j][i] and add m into msgs[i][j]. +Reply(i, j, m) == msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]), + ![i][j] = Append(msgs[i][j], m)] + +\* shuffle the input buffer from server j(i) in server i(j). +Clean(i, j) == msgs' = [msgs EXCEPT ![j][i] = << >>, ![i][j] = << >>] + +----------------------------------------------------------------------------- +PZxidEqual(p, z) == p.epoch = z[1] /\ p.counter = z[2] + +TransactionEqual(t1, t2) == /\ t1.epoch = t2.epoch + /\ t1.counter = t2.counter + +TransactionPrecede(t1, t2) == \/ t1.epoch < t2.epoch + \/ /\ t1.epoch = t2.epoch + /\ t1.counter < t2.counter + +----------------------------------------------------------------------------- +\* Define initial values for all variables +InitServerVarsZ == /\ InitServerVars + /\ zabState = [s \in Server |-> ELECTION] + /\ acceptedEpoch = [s \in Server |-> 0] + /\ history = [s \in Server |-> << >>] + /\ commitIndex = [s \in Server |-> 0] + +InitLeaderVarsZ == /\ InitLeaderVars + /\ learners = [s \in Server |-> {}] + /\ cepochRecv = [s \in Server |-> {}] + /\ ackeRecv = [s \in Server |-> {}] + /\ ackldRecv = [s \in Server |-> {}] + /\ ackIndex = [s \in Server |-> [v \in Server |-> 0]] + /\ currentCounter = [s \in Server |-> 0] + /\ sendCounter = [s \in Server |-> 0] + /\ committedIndex = [s \in Server |-> 0] + /\ committedCounter = [s \in Server |-> [v \in Server |-> 0]] + /\ forwarding = [s \in Server |-> {}] + +InitElectionVarsZ == InitElectionVars + +InitTempVarsZ == /\ initialHistory = [s \in Server |-> << >>] + /\ tempMaxEpoch = [s \in Server |-> 0] + +InitFollowerVarsZ == /\ cepochSent = [s \in Server |-> FALSE] + /\ leaderAddr = [s \in Server |-> NullPoint] + /\ synced = [s \in Server |-> FALSE] + +InitVerifyVarsZ == /\ proposalMsgsLog = {} + /\ epochLeader = [i \in 1..MAXEPOCH |-> {}] + /\ inherentViolated = FALSE + +InitMsgVarsZ == /\ msgs = [s \in Server |-> [v \in Server |-> << >>]] + /\ electionMsgs = [s \in Server |-> [v \in Server |-> << >>]] + +InitZ == /\ InitServerVarsZ + /\ InitLeaderVarsZ + /\ InitElectionVarsZ + /\ InitTempVarsZ + /\ InitFollowerVarsZ + /\ InitVerifyVarsZ + /\ InitMsgVarsZ + /\ idTable = InitializeIdTable(Server) + +----------------------------------------------------------------------------- +ZabTurnToLeading(i) == + /\ zabState' = [zabState EXCEPT ![i] = DISCOVERY] + /\ learners' = [learners EXCEPT ![i] = {i}] + /\ cepochRecv' = [cepochRecv EXCEPT ![i] = {i}] + /\ ackeRecv' = [ackeRecv EXCEPT ![i] = {i}] + /\ ackldRecv' = [ackldRecv EXCEPT ![i] = {i}] + /\ forwarding' = [forwarding EXCEPT ![i] = {}] + /\ ackIndex' = [ackIndex EXCEPT ![i] = [v \in Server |-> IF v = i THEN Len(history[i]) + ELSE 0]] + /\ currentCounter' = [currentCounter EXCEPT ![i] = 0] + /\ sendCounter' = [sendCounter EXCEPT ![i] = 0] + /\ commitIndex' = [commitIndex EXCEPT ![i] = 0] + /\ committedIndex' = [committedIndex EXCEPT ![i] = 0] + /\ committedCounter' = [committedCounter EXCEPT ![i] = [v \in Server |-> IF v = i THEN Len(history[i]) + ELSE 0]] + /\ initialHistory' = [initialHistory EXCEPT ![i] = history[i]] + /\ tempMaxEpoch' = [tempMaxEpoch EXCEPT ![i] = acceptedEpoch[i]] + +ZabTurnToFollowing(i) == + /\ zabState' = [zabState EXCEPT ![i] = DISCOVERY] + /\ cepochSent' = [cepochSent EXCEPT ![i] = FALSE] + /\ synced' = [synced EXCEPT ![i] = FALSE] + /\ commitIndex' = [commitIndex EXCEPT ![i] = 0] + + +(* Fast Leader Election *) +FLEReceiveNotmsg(i, j) == + /\ ReceiveNotmsg(i, j) + /\ UNCHANGED <> + +FLENotmsgTimeout(i) == + /\ NotmsgTimeout(i) + /\ UNCHANGED <> + +FLEHandleNotmsg(i) == + /\ HandleNotmsg(i) + /\ LET newState == state'[i] + IN + \/ /\ newState = LEADING + /\ ZabTurnToLeading(i) + /\ UNCHANGED <> + \/ /\ newState = FOLLOWING + /\ ZabTurnToFollowing(i) + /\ UNCHANGED <> + \/ /\ newState = LOOKING + /\ UNCHANGED <> + /\ UNCHANGED <> + +\* On the premise that ReceiveVotes.HasQuorums = TRUE, corresponding to logic in line 1050-1055 in LFE.java. +FLEWaitNewNotmsg(i) == + /\ WaitNewNotmsg(i) + /\ UNCHANGED <> + +\* On the premise that ReceiveVotes.HasQuorums = TRUE, corresponding to logic in line 1061-1066 in LFE.java. +FLEWaitNewNotmsgEnd(i) == + /\ WaitNewNotmsgEnd(i) + /\ LET newState == state'[i] + IN + \/ /\ newState = LEADING + /\ ZabTurnToLeading(i) + /\ UNCHANGED <> + \/ /\ newState = FOLLOWING + /\ ZabTurnToFollowing(i) + /\ UNCHANGED <> + \/ /\ newState = LOOKING + /\ PrintT("New state is LOOKING in FLEWaitNewNotmsgEnd, which should not happen.") + /\ UNCHANGED <> + /\ UNCHANGED <> + +----------------------------------------------------------------------------- +(* A sub-action describing how a server transitions from LEADING/FOLLOWING to LOOKING. + Initially I call it 'ZabTimeoutZ', but it will be called not only when timeout, + but also when finding a low epoch from leader.*) +FollowerShutdown(i) == + /\ ZabTimeout(i) + /\ zabState' = [zabState EXCEPT ![i] = ELECTION] + /\ leaderAddr' = [leaderAddr EXCEPT ![i] = NullPoint] + +LeaderShutdown(i) == + /\ ZabTimeout(i) + /\ zabState' = [zabState EXCEPT ![i] = ELECTION] + /\ leaderAddr' = [s \in Server |-> IF s \in learners[i] THEN NullPoint ELSE leaderAddr[s]] + /\ learners' = [learners EXCEPT ![i] = {}] + /\ forwarding' = [forwarding EXCEPT ![i] = {}] + /\ msgs' = [s \in Server |-> [v \in Server |-> IF v \in learners[i] \/ s \in learners[i] THEN << >> ELSE msgs[s][v]]] + +FollowerTimout(i) == + /\ state[i] = FOLLOWING + /\ leaderAddr[i] = NullPoint + /\ FollowerShutdown(i) + /\ msgs' = [s \in Server |-> [v \in Server |-> IF v = i THEN << >> ELSE msgs[s][v]]] + /\ UNCHANGED <> + +LeaderTimeout(i) == + /\ state[i] = LEADING + /\ learners[i] \notin Quorums + /\ LeaderShutdown(i) + /\ UNCHANGED <> + +----------------------------------------------------------------------------- +(* Establish connection between leader i and follower j. + It means i creates a learnerHandler for communicating with j, and j finds i's address.*) +EstablishConnection(i, j) == + /\ state[i] = LEADING /\ state[j] = FOLLOWING + /\ j \notin learners[i] /\ leaderAddr[j] = NullPoint + /\ currentVote[j].proposedLeader = i + /\ learners' = [learners EXCEPT ![i] = learners[i] \union {j}] \* Leader: 'addLearnerHandler(peer)' + /\ leaderAddr' = [leaderAddr EXCEPT ![j] = i] \* Follower: 'connectToLeader(addr, hostname)' + /\ UNCHANGED <> + +(* The leader i finds timeout and TCP connection between i and j closes.*) +Timeout(i, j) == + /\ state[i] = LEADING /\ state[j] = FOLLOWING + /\ j \in learners[i] /\ leaderAddr[j] = i + (* The action of leader i.(corresponding to function 'removeLearnerHandler(peer)'.) *) + /\ learners' = [learners EXCEPT ![i] = learners[i] \ {j}] + /\ forwarding' = [forwarding EXCEPT ![i] = IF j \in forwarding[i] THEN forwarding[i] \ {j} ELSE forwarding[i]] + /\ cepochRecv' = [cepochRecv EXCEPT ![i] = IF j \in cepochRecv[i] THEN cepochRecv[i] \ {j} ELSE cepochRecv[i]] + /\ ackeRecv' = [ackeRecv EXCEPT ![i] = IF j \in ackeRecv[i] THEN ackeRecv[i] \ {j} ELSE ackeRecv[i]] + /\ ackldRecv' = [ackldRecv EXCEPT ![i] = IF j \in ackldRecv[i] THEN ackldRecv[i] \ {j} ELSE ackldRecv[i]] + /\ ackIndex' = [ackIndex EXCEPT ![i][j] = 0] + /\ committedCounter' = [committedCounter EXCEPT ![i][j] = 0] + (* The action of follower j. *) + /\ FollowerShutdown(j) + (* Clean input buffer.*) + /\ Clean(i, j) + /\ UNCHANGED <> +----------------------------------------------------------------------------- +\* In phase f11, follower sends f.p to leader via FOLLOWERINFO(CEPOCH). +FollowerSendFOLLOWERINFO(i) == + /\ state[i] = FOLLOWING + /\ zabState[i] = DISCOVERY + /\ leaderAddr[i] /= NullPoint + /\ \lnot cepochSent[i] + /\ Send(i, leaderAddr[i], [mtype |-> FOLLOWERINFO, + mepoch |-> acceptedEpoch[i]]) + /\ cepochSent' = [cepochSent EXCEPT ![i] = TRUE] + /\ UNCHANGED <> + +(* In phase l11, leader waits for receiving FOLLOWERINFO from a quorum, + and then chooses a new epoch e' as its own epoch and broadcasts LEADERINFO. *) +LeaderHandleFOLLOWERINFO(i, j) == + /\ state[i] = LEADING + /\ msgs[j][i] /= << >> + /\ msgs[j][i][1].mtype = FOLLOWERINFO + /\ LET msg == msgs[j][i][1] + IN \/ /\ NullPoint \notin cepochRecv[i] \* 1. has not broadcast LEADERINFO - modify tempMaxEpoch + /\ LET newEpoch == Maximum({tempMaxEpoch[i], msg.mepoch}) + IN tempMaxEpoch' = [tempMaxEpoch EXCEPT ![i] = newEpoch] + /\ Discard(j, i) + \/ /\ NullPoint \in cepochRecv[i] \* 2. has broadcast LEADERINFO - no need to handle the msg, just send LEADERINFO to corresponding server + /\ Reply(i, j, [mtype |-> LEADERINFO, + mepoch |-> acceptedEpoch[i]]) + /\ UNCHANGED tempMaxEpoch + /\ cepochRecv' = [cepochRecv EXCEPT ![i] = IF j \in cepochRecv[i] THEN cepochRecv[i] + ELSE cepochRecv[i] \union {j}] + /\ UNCHANGED <> + +LeaderDiscovery1(i) == + /\ state[i] = LEADING + /\ zabState[i] = DISCOVERY + /\ cepochRecv[i] \in Quorums + /\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = tempMaxEpoch[i] + 1] + /\ cepochRecv' = [cepochRecv EXCEPT ![i] = cepochRecv[i] \union {NullPoint}] + /\ BroadcastLEADERINFO(i, [mtype |-> LEADERINFO, + mepoch |-> acceptedEpoch'[i]]) + /\ UNCHANGED <> + +(* In phase f12, follower receives NEWEPOCH. If e' > f.p, then follower sends ACK-E back, + and ACK-E contains f.a and lastZxid to let leader judge whether it is the latest. + After handling NEWEPOCH, follower's zabState turns to SYNCHRONIZATION. *) +FollowerHandleLEADERINFO(i, j) == + /\ state[i] = FOLLOWING + /\ msgs[j][i] /= << >> + /\ msgs[j][i][1].mtype = LEADERINFO + /\ LET msg == msgs[j][i][1] + infoOk == j = leaderAddr[i] + epochOk == /\ infoOk + /\ msg.mepoch >= acceptedEpoch[i] + correct == /\ epochOk + /\ zabState[i] = DISCOVERY + IN /\ infoOk + /\ \/ /\ epochOk \* 1. Normal case + /\ \/ /\ correct + /\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = msg.mepoch] + /\ Reply(i, j, [mtype |-> ACKEPOCH, + mepoch |-> msg.mepoch, + mlastEpoch |-> currentEpoch[i], + mlastZxid |-> lastZxid[i]]) + /\ cepochSent' = [cepochSent EXCEPT ![i] = TRUE] + /\ UNCHANGED inherentViolated + \/ /\ ~correct + /\ PrintT("Exception: Condition correct is false in FollowerHandleLEADERINFO(" \o ToString(i) \o ", " \o ToString(j) \o ").") + /\ inherentViolated' = TRUE + /\ Discard(j, i) + /\ UNCHANGED <> + /\ zabState' = [zabState EXCEPT ![i] = IF zabState[i] = DISCOVERY THEN SYNCHRONIZATION + ELSE zabState[i]] + /\ UNCHANGED <> + \/ /\ ~epochOk \* 2. Abnormal case - go back to election + /\ FollowerShutdown(i) + /\ Clean(i, j) + /\ UNCHANGED <> + /\ UNCHANGED <> + +\* Abstraction of actions making follower synced with leader before leader sending NEWLEADER. +subRECOVERYSYNC(i, j) == + LET canSync == /\ state[i] = LEADING /\ zabState[i] /= DISCOVERY /\ j \in learners[i] /\ j \in ackeRecv[i] + /\ state[j] = FOLLOWING /\ zabState[j] = SYNCHRONIZATION /\ leaderAddr[j] = i /\ synced[j] = FALSE + IN + \/ /\ canSync + /\ history' = [history EXCEPT ![j] = history[i]] + /\ lastZxid' = [lastZxid EXCEPT ![j] = lastZxid[i]] + /\ UpdateProposal(j, leaderAddr[j], lastZxid'[j], currentEpoch[j]) + /\ commitIndex' = [commitIndex EXCEPT ![j] = commitIndex[i]] + /\ synced' = [synced EXCEPT ![j] = TRUE] + /\ forwarding' = [forwarding EXCEPT ![i] = forwarding[i] \union {j}] \* j will receive PROPOSAL and COMMIT + /\ ackIndex' = [ackIndex EXCEPT ![i][j] = Len(history[i])] + /\ committedCounter' = [committedCounter EXCEPT ![i][j] = Maximum({commitIndex[i] - Len(initialHistory[i]), 0})] + /\ LET ms == [msource|->i, mtype|->"RECOVERYSYNC", mepoch|->acceptedEpoch[i], mproposals|->history[i]] + IN proposalMsgsLog' = IF ms \in proposalMsgsLog THEN proposalMsgsLog + ELSE proposalMsgsLog \union {ms} + /\ Reply(i, j, [mtype |-> NEWLEADER, + mepoch |-> acceptedEpoch[i], + mlastZxid |-> lastZxid[i]]) + \/ /\ ~canSync + /\ Discard(j, i) + /\ UNCHANGED <> + +(* In phase l12, leader waits for receiving ACKEPOPCH from a quorum, + and check whether it has the latest history and epoch from them. + If so, leader's zabState turns to SYNCHRONIZATION. *) +LeaderHandleACKEPOCH(i, j) == + /\ state[i] = LEADING + /\ msgs[j][i] /= << >> + /\ msgs[j][i][1].mtype = ACKEPOCH + /\ LET msg == msgs[j][i][1] + infoOk == /\ j \in learners[i] + /\ acceptedEpoch[i] = msg.mepoch + logOk == /\ infoOk \* logOk = TRUE means leader is more up-to-date than follower j + /\ \/ currentEpoch[i] > msg.mlastEpoch + \/ /\ currentEpoch[i] = msg.mlastEpoch + /\ \/ lastZxid[i][1] > msg.mlastZxid[1] + \/ /\ lastZxid[i][1] = msg.mlastZxid[1] + /\ lastZxid[i][2] >= msg.mlastZxid[2] + replyOk == /\ infoOk + /\ NullPoint \in ackeRecv[i] + IN /\ infoOk + /\ \/ /\ replyOk + /\ subRECOVERYSYNC(i, j) + /\ ackeRecv' = [ackeRecv EXCEPT ![i] = IF j \notin ackeRecv[i] THEN ackeRecv[i] \union {j} + ELSE ackeRecv[i]] + /\ UNCHANGED <> + \/ /\ ~replyOk + /\ \/ /\ logOk + /\ ackeRecv' = [ackeRecv EXCEPT ![i] = IF j \notin ackeRecv[i] THEN ackeRecv[i] \union {j} + ELSE ackeRecv[i]] + /\ Discard(j, i) + /\ UNCHANGED <> + \/ /\ ~logOk \* go back to election + /\ LeaderShutdown(i) + /\ UNCHANGED ackeRecv + /\ UNCHANGED <> + /\ UNCHANGED <> + +LeaderDiscovery2(i) == + /\ state[i] = LEADING + /\ zabState[i] = DISCOVERY + /\ ackeRecv[i] \in Quorums + /\ zabState' = [zabState EXCEPT ![i] = SYNCHRONIZATION] + /\ currentEpoch' = [currentEpoch EXCEPT ![i] = acceptedEpoch[i]] + /\ initialHistory' = [initialHistory EXCEPT ![i] = history[i]] + /\ ackeRecv' = [ackeRecv EXCEPT ![i] = ackeRecv[i] \union {NullPoint}] + /\ ackIndex' = [ackIndex EXCEPT ![i][i] = Len(history[i])] + /\ UpdateProposal(i, i, lastZxid[i], currentEpoch'[i]) + /\ LET epoch == acceptedEpoch[i] + IN epochLeader' = [epochLeader EXCEPT ![epoch] = epochLeader[epoch] \union {i}] + /\ UNCHANGED <> + +(* Note: Set cepochRecv, ackeRecv, ackldRecv to {NullPoint} in corresponding three actions to + make sure that the prospective leader will not broadcast NEWEPOCH/NEWLEADER/COMMITLD twice.*) +----------------------------------------------------------------------------- +RECOVERYSYNC(i, j) == Review comment: Consider in the future replacing this magic atomic synchronization with DIFF based message passing. The current form could hide a defect. It would increase the state space a lot though, so other state space reduction might be worth doing first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscribe@zookeeper.apache.org For queries about this service, please contact Infrastructure at: users@infra.apache.org