LogCabin
Server/RaftConsensusInvariants.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  *
00003  * Permission to use, copy, modify, and distribute this software for any
00004  * purpose with or without fee is hereby granted, provided that the above
00005  * copyright notice and this permission notice appear in all copies.
00006  *
00007  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00008  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00009  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00010  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00011  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00012  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00013  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00014  */
00015 
00016 #include "Core/Debug.h"
00017 #include "Core/ProtoBuf.h"
00018 #include "Server/RaftConsensus.h"
00019 
00020 namespace LogCabin {
00021 namespace Server {
00022 namespace RaftConsensusInternal {
00023 
00024 #define expect(expr) do { \
00025     if (!(expr)) { \
00026         WARNING("`%s' is false", #expr); \
00027         ++errors; \
00028     } \
00029 } while (0)
00030 
00031 struct Invariants::ConsensusSnapshot {
00032     explicit ConsensusSnapshot(const RaftConsensus& consensus)
00033         : stateChangedCount(consensus.stateChanged.notificationCount)
00034         , exiting(consensus.exiting)
00035         , numPeerThreads(consensus.numPeerThreads)
00036         , lastLogIndex(consensus.log->getLastLogIndex())
00037         , lastLogTerm(0)
00038         , configurationId(consensus.configuration->id)
00039         , configurationState(consensus.configuration->state)
00040         , currentTerm(consensus.currentTerm)
00041         , state(consensus.state)
00042         , commitIndex(consensus.commitIndex)
00043         , leaderId(consensus.leaderId)
00044         , votedFor(consensus.votedFor)
00045         , currentEpoch(consensus.currentEpoch)
00046         , startElectionAt(consensus.startElectionAt)
00047     {
00048         if (consensus.log->getLastLogIndex() >=
00049             consensus.log->getLogStartIndex()) {
00050             lastLogTerm = consensus.log->getEntry(
00051                                     consensus.log->getLastLogIndex()).term();
00052         }
00053 
00054     }
00055 
00056     uint64_t stateChangedCount;
00057     bool exiting;
00058     uint32_t numPeerThreads;
00059     uint64_t lastLogIndex;
00060     uint64_t lastLogTerm;
00061     uint64_t configurationId;
00062     Configuration::State configurationState;
00063     uint64_t currentTerm;
00064     RaftConsensus::State state;
00065     uint64_t commitIndex;
00066     uint64_t leaderId;
00067     uint64_t votedFor;
00068     uint64_t currentEpoch;
00069     TimePoint startElectionAt;
00070 };
00071 
00072 
00073 Invariants::Invariants(RaftConsensus& consensus)
00074     : consensus(consensus)
00075     , errors(0)
00076     , previous()
00077 {
00078 }
00079 
00080 Invariants::~Invariants()
00081 {
00082 }
00083 
00084 void
00085 Invariants::checkAll()
00086 {
00087     checkBasic();
00088     checkDelta();
00089     checkPeerBasic();
00090     checkPeerDelta();
00091 }
00092 
00093 void
00094 Invariants::checkBasic()
00095 {
00096     // Log terms and cluster times monotonically increase
00097     uint64_t lastTerm = 0;
00098     uint64_t lastClusterTime = 0;
00099     for (uint64_t index = consensus.log->getLogStartIndex();
00100          index <= consensus.log->getLastLogIndex();
00101          ++index) {
00102         const Storage::Log::Entry& entry = consensus.log->getEntry(index);
00103         expect(entry.term() >= lastTerm);
00104         expect(entry.cluster_time() >= lastClusterTime);
00105         lastTerm = entry.term();
00106         lastClusterTime = entry.cluster_time();
00107     }
00108     // The terms in the log do not exceed currentTerm
00109     expect(lastTerm <= consensus.currentTerm);
00110 
00111     if (consensus.log->getLogStartIndex() <=
00112         consensus.log->getLastLogIndex()) {
00113         expect(lastClusterTime ==
00114                consensus.clusterClock.clusterTimeAtEpoch);
00115     } else {
00116         expect(consensus.lastSnapshotClusterTime ==
00117                consensus.clusterClock.clusterTimeAtEpoch);
00118     }
00119 
00120     // The current configuration should be the last one found in the log
00121     bool found = false;
00122     for (uint64_t index = consensus.log->getLastLogIndex();
00123          index >= consensus.log->getLogStartIndex();
00124          --index) {
00125         const Storage::Log::Entry& entry = consensus.log->getEntry(index);
00126         if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION) {
00127             expect(consensus.configuration->id == index);
00128             expect(consensus.configuration->state !=
00129                    Configuration::State::BLANK);
00130             found = true;
00131             break;
00132         }
00133     }
00134     if (!found) {
00135         if (consensus.log->getLogStartIndex() == 1) {
00136             expect(consensus.configuration->id == 0);
00137             expect(consensus.configuration->state ==
00138                    Configuration::State::BLANK);
00139         } else {
00140             expect(consensus.configuration->id <= consensus.lastSnapshotIndex);
00141         }
00142     }
00143 
00144     // Every configuration present in the log should also be present in the
00145     // configurationDescriptions map.
00146     for (uint64_t index = consensus.log->getLogStartIndex();
00147          index <= consensus.log->getLastLogIndex();
00148          ++index) {
00149         const Storage::Log::Entry& entry = consensus.log->getEntry(index);
00150         if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION) {
00151             auto it = consensus.configurationManager->
00152                                         descriptions.find(index);
00153             expect(it != consensus.configurationManager->descriptions.end());
00154             if (it != consensus.configurationManager->descriptions.end())
00155                 expect(it->second == entry.configuration());
00156         }
00157     }
00158     // The configuration descriptions map shouldn't have anything past the
00159     // snapshot and the log.
00160     expect(consensus.configurationManager->descriptions.upper_bound(
00161                 std::max(consensus.log->getLastLogIndex(),
00162                          consensus.lastSnapshotIndex)) ==
00163            consensus.configurationManager->descriptions.end());
00164 
00165     // Servers with blank configurations should remain passive. Since the first
00166     // entry in every log is a configuration, they should also have empty logs.
00167     if (consensus.configuration->state == Configuration::State::BLANK) {
00168         expect(consensus.state == RaftConsensus::State::FOLLOWER);
00169         expect(consensus.log->getLastLogIndex() == 0);
00170     }
00171 
00172     // The last snapshot covers a committed range.
00173     expect(consensus.commitIndex >= consensus.lastSnapshotIndex);
00174 
00175     // The commitIndex doesn't exceed the length of the log/snapshot.
00176     expect(consensus.commitIndex <= consensus.log->getLastLogIndex());
00177 
00178     // The last log index points at least through the end of the last snapshot.
00179     expect(consensus.log->getLastLogIndex() >= consensus.lastSnapshotIndex);
00180 
00181     // lastLogIndex is either just below the log start (for empty logs) or
00182     // larger (for non-empty logs)
00183     assert(consensus.log->getLastLogIndex() >=
00184            consensus.log->getLogStartIndex() - 1);
00185 
00186     // advanceCommitIndex is called everywhere it needs to be.
00187     if (consensus.state == RaftConsensus::State::LEADER) {
00188         uint64_t majorityEntry =
00189             consensus.configuration->quorumMin(&Server::getMatchIndex);
00190         expect(consensus.commitIndex >= majorityEntry ||
00191                majorityEntry < consensus.log->getLogStartIndex() ||
00192                consensus.log->getEntry(majorityEntry).term() !=
00193                     consensus.currentTerm);
00194     }
00195 
00196     // A leader always points its leaderId at itself.
00197     if (consensus.state == RaftConsensus::State::LEADER)
00198         expect(consensus.leaderId == consensus.serverId);
00199 
00200     // A leader always voted for itself. (Candidates can vote for others when
00201     // they abort an election.)
00202     if (consensus.state == RaftConsensus::State::LEADER) {
00203         expect(consensus.votedFor == consensus.serverId);
00204     }
00205 
00206     // A follower and candidate always has a timer set; a leader has it at
00207     // TimePoint::max().
00208     if (consensus.state == RaftConsensus::State::LEADER) {
00209         expect(consensus.startElectionAt == TimePoint::max());
00210     } else {
00211         expect(consensus.startElectionAt > TimePoint::min());
00212         expect(consensus.startElectionAt <=
00213                Clock::now() + consensus.ELECTION_TIMEOUT * 2);
00214     }
00215 
00216     // Log metadata is updated when the term or vote changes.
00217     expect(consensus.log->metadata.current_term() == consensus.currentTerm);
00218     expect(consensus.log->metadata.voted_for() == consensus.votedFor);
00219 }
00220 
00221 void
00222 Invariants::checkDelta()
00223 {
00224     if (!previous) {
00225         previous.reset(new ConsensusSnapshot(consensus));
00226         return;
00227     }
00228     std::unique_ptr<ConsensusSnapshot> current(
00229                                         new ConsensusSnapshot(consensus));
00230     // Within a term, ...
00231     if (previous->currentTerm == current->currentTerm) {
00232         // the leader is set at most once.
00233         if (previous->leaderId != 0)
00234             expect(previous->leaderId == current->leaderId);
00235         // the vote is set at most once.
00236         if (previous->votedFor != 0)
00237             expect(previous->votedFor == current->votedFor);
00238         // a leader stays a leader.
00239         if (previous->state == RaftConsensus::State::LEADER)
00240             expect(current->state == RaftConsensus::State::LEADER);
00241     }
00242 
00243     // Once exiting is set, it doesn't get unset.
00244     if (previous->exiting)
00245         expect(current->exiting);
00246 
00247     // These variables monotonically increase.
00248     expect(previous->currentTerm <= current->currentTerm);
00249     expect(previous->commitIndex <= current->commitIndex);
00250     expect(previous->currentEpoch <= current->currentEpoch);
00251 
00252     // Change requires condition variable notification:
00253     if (previous->stateChangedCount == current->stateChangedCount) {
00254         expect(previous->currentTerm == current->currentTerm);
00255         expect(previous->state == current->state);
00256         expect(previous->lastLogIndex == current->lastLogIndex);
00257         expect(previous->lastLogTerm == current->lastLogTerm);
00258         expect(previous->commitIndex == current->commitIndex);
00259         expect(previous->exiting == current->exiting);
00260         expect(previous->numPeerThreads <= current->numPeerThreads);
00261         expect(previous->configurationId == current->configurationId);
00262         expect(previous->configurationState == current->configurationState);
00263         expect(previous->startElectionAt == current->startElectionAt);
00264      // TODO(ongaro):
00265      // an acknowledgement from a peer is received.
00266      // a server goes from not caught up to caught up.
00267     }
00268 
00269     previous = std::move(current);
00270 }
00271 
00272 void
00273 Invariants::checkPeerBasic()
00274 {
00275     for (auto it = consensus.configuration->knownServers.begin();
00276          it != consensus.configuration->knownServers.end();
00277          ++it) {
00278         Peer* peer = dynamic_cast<Peer*>(it->second.get()); // NOLINT
00279         if (peer == NULL)
00280             continue;
00281         if (consensus.exiting)
00282             expect(peer->exiting);
00283         if (!peer->requestVoteDone) {
00284             expect(!peer->haveVote_);
00285         }
00286         expect(peer->matchIndex <= consensus.log->getLastLogIndex());
00287         expect(peer->lastAckEpoch <= consensus.currentEpoch);
00288         expect(peer->nextHeartbeatTime <=
00289                Clock::now() + consensus.HEARTBEAT_PERIOD);
00290         expect(peer->backoffUntil <=
00291                Clock::now() + consensus.RPC_FAILURE_BACKOFF);
00292 
00293         // TODO(ongaro): anything about catchup?
00294     }
00295 }
00296 
00297 void
00298 Invariants::checkPeerDelta()
00299 {
00300     // TODO(ongaro): add checks
00301 }
00302 
00303 } // namespace RaftConsensusInternal
00304 
00305 } // namespace LogCabin::Server
00306 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines