LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include "Core/Debug.h" 00017 #include "Core/ProtoBuf.h" 00018 #include "Server/RaftConsensus.h" 00019 00020 namespace LogCabin { 00021 namespace Server { 00022 namespace RaftConsensusInternal { 00023 00024 #define expect(expr) do { \ 00025 if (!(expr)) { \ 00026 WARNING("`%s' is false", #expr); \ 00027 ++errors; \ 00028 } \ 00029 } while (0) 00030 00031 struct Invariants::ConsensusSnapshot { 00032 explicit ConsensusSnapshot(const RaftConsensus& consensus) 00033 : stateChangedCount(consensus.stateChanged.notificationCount) 00034 , exiting(consensus.exiting) 00035 , numPeerThreads(consensus.numPeerThreads) 00036 , lastLogIndex(consensus.log->getLastLogIndex()) 00037 , lastLogTerm(0) 00038 , configurationId(consensus.configuration->id) 00039 , configurationState(consensus.configuration->state) 00040 , currentTerm(consensus.currentTerm) 00041 , state(consensus.state) 00042 , commitIndex(consensus.commitIndex) 00043 , leaderId(consensus.leaderId) 00044 , votedFor(consensus.votedFor) 00045 , currentEpoch(consensus.currentEpoch) 00046 , startElectionAt(consensus.startElectionAt) 00047 { 00048 if (consensus.log->getLastLogIndex() >= 00049 consensus.log->getLogStartIndex()) { 00050 lastLogTerm = consensus.log->getEntry( 00051 consensus.log->getLastLogIndex()).term(); 00052 } 00053 00054 } 00055 00056 uint64_t stateChangedCount; 00057 bool exiting; 00058 uint32_t numPeerThreads; 00059 uint64_t lastLogIndex; 00060 uint64_t lastLogTerm; 00061 uint64_t configurationId; 00062 Configuration::State configurationState; 00063 uint64_t currentTerm; 00064 RaftConsensus::State state; 00065 uint64_t commitIndex; 00066 uint64_t leaderId; 00067 uint64_t votedFor; 00068 uint64_t currentEpoch; 00069 TimePoint startElectionAt; 00070 }; 00071 00072 00073 Invariants::Invariants(RaftConsensus& consensus) 00074 : consensus(consensus) 00075 , errors(0) 00076 , previous() 00077 { 00078 } 00079 00080 Invariants::~Invariants() 00081 { 00082 } 00083 00084 void 00085 Invariants::checkAll() 00086 { 00087 checkBasic(); 00088 checkDelta(); 00089 checkPeerBasic(); 00090 checkPeerDelta(); 00091 } 00092 00093 void 00094 Invariants::checkBasic() 00095 { 00096 // Log terms and cluster times monotonically increase 00097 uint64_t lastTerm = 0; 00098 uint64_t lastClusterTime = 0; 00099 for (uint64_t index = consensus.log->getLogStartIndex(); 00100 index <= consensus.log->getLastLogIndex(); 00101 ++index) { 00102 const Storage::Log::Entry& entry = consensus.log->getEntry(index); 00103 expect(entry.term() >= lastTerm); 00104 expect(entry.cluster_time() >= lastClusterTime); 00105 lastTerm = entry.term(); 00106 lastClusterTime = entry.cluster_time(); 00107 } 00108 // The terms in the log do not exceed currentTerm 00109 expect(lastTerm <= consensus.currentTerm); 00110 00111 if (consensus.log->getLogStartIndex() <= 00112 consensus.log->getLastLogIndex()) { 00113 expect(lastClusterTime == 00114 consensus.clusterClock.clusterTimeAtEpoch); 00115 } else { 00116 expect(consensus.lastSnapshotClusterTime == 00117 consensus.clusterClock.clusterTimeAtEpoch); 00118 } 00119 00120 // The current configuration should be the last one found in the log 00121 bool found = false; 00122 for (uint64_t index = consensus.log->getLastLogIndex(); 00123 index >= consensus.log->getLogStartIndex(); 00124 --index) { 00125 const Storage::Log::Entry& entry = consensus.log->getEntry(index); 00126 if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION) { 00127 expect(consensus.configuration->id == index); 00128 expect(consensus.configuration->state != 00129 Configuration::State::BLANK); 00130 found = true; 00131 break; 00132 } 00133 } 00134 if (!found) { 00135 if (consensus.log->getLogStartIndex() == 1) { 00136 expect(consensus.configuration->id == 0); 00137 expect(consensus.configuration->state == 00138 Configuration::State::BLANK); 00139 } else { 00140 expect(consensus.configuration->id <= consensus.lastSnapshotIndex); 00141 } 00142 } 00143 00144 // Every configuration present in the log should also be present in the 00145 // configurationDescriptions map. 00146 for (uint64_t index = consensus.log->getLogStartIndex(); 00147 index <= consensus.log->getLastLogIndex(); 00148 ++index) { 00149 const Storage::Log::Entry& entry = consensus.log->getEntry(index); 00150 if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION) { 00151 auto it = consensus.configurationManager-> 00152 descriptions.find(index); 00153 expect(it != consensus.configurationManager->descriptions.end()); 00154 if (it != consensus.configurationManager->descriptions.end()) 00155 expect(it->second == entry.configuration()); 00156 } 00157 } 00158 // The configuration descriptions map shouldn't have anything past the 00159 // snapshot and the log. 00160 expect(consensus.configurationManager->descriptions.upper_bound( 00161 std::max(consensus.log->getLastLogIndex(), 00162 consensus.lastSnapshotIndex)) == 00163 consensus.configurationManager->descriptions.end()); 00164 00165 // Servers with blank configurations should remain passive. Since the first 00166 // entry in every log is a configuration, they should also have empty logs. 00167 if (consensus.configuration->state == Configuration::State::BLANK) { 00168 expect(consensus.state == RaftConsensus::State::FOLLOWER); 00169 expect(consensus.log->getLastLogIndex() == 0); 00170 } 00171 00172 // The last snapshot covers a committed range. 00173 expect(consensus.commitIndex >= consensus.lastSnapshotIndex); 00174 00175 // The commitIndex doesn't exceed the length of the log/snapshot. 00176 expect(consensus.commitIndex <= consensus.log->getLastLogIndex()); 00177 00178 // The last log index points at least through the end of the last snapshot. 00179 expect(consensus.log->getLastLogIndex() >= consensus.lastSnapshotIndex); 00180 00181 // lastLogIndex is either just below the log start (for empty logs) or 00182 // larger (for non-empty logs) 00183 assert(consensus.log->getLastLogIndex() >= 00184 consensus.log->getLogStartIndex() - 1); 00185 00186 // advanceCommitIndex is called everywhere it needs to be. 00187 if (consensus.state == RaftConsensus::State::LEADER) { 00188 uint64_t majorityEntry = 00189 consensus.configuration->quorumMin(&Server::getMatchIndex); 00190 expect(consensus.commitIndex >= majorityEntry || 00191 majorityEntry < consensus.log->getLogStartIndex() || 00192 consensus.log->getEntry(majorityEntry).term() != 00193 consensus.currentTerm); 00194 } 00195 00196 // A leader always points its leaderId at itself. 00197 if (consensus.state == RaftConsensus::State::LEADER) 00198 expect(consensus.leaderId == consensus.serverId); 00199 00200 // A leader always voted for itself. (Candidates can vote for others when 00201 // they abort an election.) 00202 if (consensus.state == RaftConsensus::State::LEADER) { 00203 expect(consensus.votedFor == consensus.serverId); 00204 } 00205 00206 // A follower and candidate always has a timer set; a leader has it at 00207 // TimePoint::max(). 00208 if (consensus.state == RaftConsensus::State::LEADER) { 00209 expect(consensus.startElectionAt == TimePoint::max()); 00210 } else { 00211 expect(consensus.startElectionAt > TimePoint::min()); 00212 expect(consensus.startElectionAt <= 00213 Clock::now() + consensus.ELECTION_TIMEOUT * 2); 00214 } 00215 00216 // Log metadata is updated when the term or vote changes. 00217 expect(consensus.log->metadata.current_term() == consensus.currentTerm); 00218 expect(consensus.log->metadata.voted_for() == consensus.votedFor); 00219 } 00220 00221 void 00222 Invariants::checkDelta() 00223 { 00224 if (!previous) { 00225 previous.reset(new ConsensusSnapshot(consensus)); 00226 return; 00227 } 00228 std::unique_ptr<ConsensusSnapshot> current( 00229 new ConsensusSnapshot(consensus)); 00230 // Within a term, ... 00231 if (previous->currentTerm == current->currentTerm) { 00232 // the leader is set at most once. 00233 if (previous->leaderId != 0) 00234 expect(previous->leaderId == current->leaderId); 00235 // the vote is set at most once. 00236 if (previous->votedFor != 0) 00237 expect(previous->votedFor == current->votedFor); 00238 // a leader stays a leader. 00239 if (previous->state == RaftConsensus::State::LEADER) 00240 expect(current->state == RaftConsensus::State::LEADER); 00241 } 00242 00243 // Once exiting is set, it doesn't get unset. 00244 if (previous->exiting) 00245 expect(current->exiting); 00246 00247 // These variables monotonically increase. 00248 expect(previous->currentTerm <= current->currentTerm); 00249 expect(previous->commitIndex <= current->commitIndex); 00250 expect(previous->currentEpoch <= current->currentEpoch); 00251 00252 // Change requires condition variable notification: 00253 if (previous->stateChangedCount == current->stateChangedCount) { 00254 expect(previous->currentTerm == current->currentTerm); 00255 expect(previous->state == current->state); 00256 expect(previous->lastLogIndex == current->lastLogIndex); 00257 expect(previous->lastLogTerm == current->lastLogTerm); 00258 expect(previous->commitIndex == current->commitIndex); 00259 expect(previous->exiting == current->exiting); 00260 expect(previous->numPeerThreads <= current->numPeerThreads); 00261 expect(previous->configurationId == current->configurationId); 00262 expect(previous->configurationState == current->configurationState); 00263 expect(previous->startElectionAt == current->startElectionAt); 00264 // TODO(ongaro): 00265 // an acknowledgement from a peer is received. 00266 // a server goes from not caught up to caught up. 00267 } 00268 00269 previous = std::move(current); 00270 } 00271 00272 void 00273 Invariants::checkPeerBasic() 00274 { 00275 for (auto it = consensus.configuration->knownServers.begin(); 00276 it != consensus.configuration->knownServers.end(); 00277 ++it) { 00278 Peer* peer = dynamic_cast<Peer*>(it->second.get()); // NOLINT 00279 if (peer == NULL) 00280 continue; 00281 if (consensus.exiting) 00282 expect(peer->exiting); 00283 if (!peer->requestVoteDone) { 00284 expect(!peer->haveVote_); 00285 } 00286 expect(peer->matchIndex <= consensus.log->getLastLogIndex()); 00287 expect(peer->lastAckEpoch <= consensus.currentEpoch); 00288 expect(peer->nextHeartbeatTime <= 00289 Clock::now() + consensus.HEARTBEAT_PERIOD); 00290 expect(peer->backoffUntil <= 00291 Clock::now() + consensus.RPC_FAILURE_BACKOFF); 00292 00293 // TODO(ongaro): anything about catchup? 00294 } 00295 } 00296 00297 void 00298 Invariants::checkPeerDelta() 00299 { 00300 // TODO(ongaro): add checks 00301 } 00302 00303 } // namespace RaftConsensusInternal 00304 00305 } // namespace LogCabin::Server 00306 } // namespace LogCabin