LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * Copyright (c) 2015 Scale Computing 00004 * 00005 * Permission to use, copy, modify, and distribute this software for any 00006 * purpose with or without fee is hereby granted, provided that the above 00007 * copyright notice and this permission notice appear in all copies. 00008 * 00009 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00010 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00011 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00012 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00013 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00014 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00015 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00016 */ 00017 00018 #include <algorithm> 00019 #include <fcntl.h> 00020 #include <limits> 00021 #include <string.h> 00022 #include <sys/file.h> 00023 #include <sys/stat.h> 00024 #include <time.h> 00025 #include <unistd.h> 00026 00027 #include "build/Protocol/Raft.pb.h" 00028 #include "build/Server/SnapshotMetadata.pb.h" 00029 #include "Core/Buffer.h" 00030 #include "Core/Debug.h" 00031 #include "Core/ProtoBuf.h" 00032 #include "Core/Random.h" 00033 #include "Core/StringUtil.h" 00034 #include "Core/ThreadId.h" 00035 #include "Core/Util.h" 00036 #include "Protocol/Common.h" 00037 #include "RPC/ClientRPC.h" 00038 #include "RPC/ClientSession.h" 00039 #include "RPC/ServerRPC.h" 00040 #include "Server/RaftConsensus.h" 00041 #include "Server/Globals.h" 00042 #include "Storage/LogFactory.h" 00043 00044 namespace LogCabin { 00045 namespace Server { 00046 00047 typedef Storage::Log Log; 00048 00049 namespace RaftConsensusInternal { 00050 00051 bool startThreads = true; 00052 00053 ////////// Server ////////// 00054 00055 Server::Server(uint64_t serverId) 00056 : serverId(serverId) 00057 , addresses() 00058 , haveStateMachineSupportedVersions(false) 00059 , minStateMachineVersion(std::numeric_limits<uint16_t>::max()) 00060 , maxStateMachineVersion(0) 00061 , gcFlag(false) 00062 { 00063 } 00064 00065 Server::~Server() 00066 { 00067 } 00068 00069 std::ostream& 00070 operator<<(std::ostream& os, const Server& server) 00071 { 00072 return server.dumpToStream(os); 00073 } 00074 00075 ////////// LocalServer ////////// 00076 00077 LocalServer::LocalServer(uint64_t serverId, RaftConsensus& consensus) 00078 : Server(serverId) 00079 , consensus(consensus) 00080 , lastSyncedIndex(0) 00081 { 00082 } 00083 00084 LocalServer::~LocalServer() 00085 { 00086 } 00087 00088 void 00089 LocalServer::beginRequestVote() 00090 { 00091 } 00092 00093 void 00094 LocalServer::beginLeadership() 00095 { 00096 lastSyncedIndex = consensus.log->getLastLogIndex(); 00097 } 00098 00099 void 00100 LocalServer::exit() 00101 { 00102 } 00103 00104 uint64_t 00105 LocalServer::getLastAckEpoch() const 00106 { 00107 return consensus.currentEpoch; 00108 } 00109 00110 uint64_t 00111 LocalServer::getMatchIndex() const 00112 { 00113 return lastSyncedIndex; 00114 } 00115 00116 bool 00117 LocalServer::haveVote() const 00118 { 00119 return (consensus.votedFor == serverId); 00120 } 00121 00122 void 00123 LocalServer::interrupt() 00124 { 00125 } 00126 00127 bool 00128 LocalServer::isCaughtUp() const 00129 { 00130 return true; 00131 } 00132 00133 void 00134 LocalServer::scheduleHeartbeat() 00135 { 00136 } 00137 00138 std::ostream& 00139 LocalServer::dumpToStream(std::ostream& os) const 00140 { 00141 // Nothing interesting to dump. 00142 return os; 00143 } 00144 00145 void 00146 LocalServer::updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats, 00147 Core::Time::SteadyTimeConverter& time) const 00148 { 00149 switch (consensus.state) { 00150 case RaftConsensus::State::FOLLOWER: 00151 break; 00152 case RaftConsensus::State::CANDIDATE: 00153 break; 00154 case RaftConsensus::State::LEADER: 00155 peerStats.set_last_synced_index(lastSyncedIndex); 00156 break; 00157 } 00158 } 00159 00160 ////////// Peer ////////// 00161 00162 Peer::Peer(uint64_t serverId, RaftConsensus& consensus) 00163 : Server(serverId) 00164 , consensus(consensus) 00165 , eventLoop(consensus.globals.eventLoop) 00166 , exiting(false) 00167 , requestVoteDone(false) 00168 , haveVote_(false) 00169 , suppressBulkData(true) 00170 // It's somewhat important to set nextIndex correctly here, since peers 00171 // that are added to the configuration won't go through beginLeadership() 00172 // on the current leader. I say somewhat important because, if nextIndex 00173 // is set incorrectly, it's self-correcting, so it's just a potential 00174 // performance issue. 00175 , nextIndex(consensus.log->getLastLogIndex() + 1) 00176 , matchIndex(0) 00177 , lastAckEpoch(0) 00178 , nextHeartbeatTime(TimePoint::min()) 00179 , backoffUntil(TimePoint::min()) 00180 , rpcFailuresSinceLastWarning(0) 00181 , lastCatchUpIterationMs(~0UL) 00182 , thisCatchUpIterationStart(Clock::now()) 00183 , thisCatchUpIterationGoalId(~0UL) 00184 , isCaughtUp_(false) 00185 , snapshotFile() 00186 , snapshotFileOffset(0) 00187 , lastSnapshotIndex(0) 00188 , session() 00189 , rpc() 00190 { 00191 } 00192 00193 Peer::~Peer() 00194 { 00195 } 00196 00197 void 00198 Peer::beginRequestVote() 00199 { 00200 requestVoteDone = false; 00201 haveVote_ = false; 00202 } 00203 00204 void 00205 Peer::beginLeadership() 00206 { 00207 nextIndex = consensus.log->getLastLogIndex() + 1; 00208 matchIndex = 0; 00209 suppressBulkData = true; 00210 snapshotFile.reset(); 00211 snapshotFileOffset = 0; 00212 lastSnapshotIndex = 0; 00213 } 00214 00215 void 00216 Peer::exit() 00217 { 00218 NOTICE("Flagging peer %lu to exit", serverId); 00219 exiting = true; 00220 // Usually telling peers to exit is paired with an interruptAll(). That can 00221 // be error-prone, however, when you're removing servers from the 00222 // configuration (if the code removes servers and then calls 00223 // interruptAll(), it won't interrupt() the removed servers). So it's 00224 // better to just interrupt() here as well. See 00225 // https://github.com/logcabin/logcabin/issues/183 00226 interrupt(); 00227 } 00228 00229 uint64_t 00230 Peer::getLastAckEpoch() const 00231 { 00232 return lastAckEpoch; 00233 } 00234 00235 uint64_t 00236 Peer::getMatchIndex() const 00237 { 00238 return matchIndex; 00239 } 00240 00241 bool 00242 Peer::haveVote() const 00243 { 00244 return haveVote_; 00245 } 00246 00247 void 00248 Peer::interrupt() 00249 { 00250 rpc.cancel(); 00251 } 00252 00253 bool 00254 Peer::isCaughtUp() const 00255 { 00256 return isCaughtUp_; 00257 } 00258 00259 void 00260 Peer::scheduleHeartbeat() 00261 { 00262 nextHeartbeatTime = Clock::now(); 00263 } 00264 00265 Peer::CallStatus 00266 Peer::callRPC(Protocol::Raft::OpCode opCode, 00267 const google::protobuf::Message& request, 00268 google::protobuf::Message& response, 00269 std::unique_lock<Mutex>& lockGuard) 00270 { 00271 typedef RPC::ClientRPC::Status RPCStatus; 00272 rpc = RPC::ClientRPC(getSession(lockGuard), 00273 Protocol::Common::ServiceId::RAFT_SERVICE, 00274 /* serviceSpecificErrorVersion = */ 0, 00275 opCode, 00276 request); 00277 // release lock for concurrency 00278 Core::MutexUnlock<Mutex> unlockGuard(lockGuard); 00279 switch (rpc.waitForReply(&response, NULL, TimePoint::max())) { 00280 case RPCStatus::OK: 00281 if (rpcFailuresSinceLastWarning > 0) { 00282 WARNING("RPC to server succeeded after %lu failures", 00283 rpcFailuresSinceLastWarning); 00284 rpcFailuresSinceLastWarning = 0; 00285 } 00286 return CallStatus::OK; 00287 case RPCStatus::SERVICE_SPECIFIC_ERROR: 00288 PANIC("unexpected service-specific error"); 00289 case RPCStatus::TIMEOUT: 00290 PANIC("unexpected RPC timeout"); 00291 case RPCStatus::RPC_FAILED: 00292 ++rpcFailuresSinceLastWarning; 00293 if (rpcFailuresSinceLastWarning == 1) { 00294 WARNING("RPC to server failed: %s", 00295 rpc.getErrorMessage().c_str()); 00296 } else if (rpcFailuresSinceLastWarning % 100 == 0) { 00297 WARNING("Last %lu RPCs to server failed. This failure: %s", 00298 rpcFailuresSinceLastWarning, 00299 rpc.getErrorMessage().c_str()); 00300 } 00301 return CallStatus::FAILED; 00302 case RPCStatus::RPC_CANCELED: 00303 return CallStatus::FAILED; 00304 case RPCStatus::INVALID_SERVICE: 00305 PANIC("The server isn't running the RaftService"); 00306 case RPCStatus::INVALID_REQUEST: 00307 return CallStatus::INVALID_REQUEST; 00308 } 00309 PANIC("Unexpected RPC status"); 00310 } 00311 00312 void 00313 Peer::startThread(std::shared_ptr<Peer> self) 00314 { 00315 thisCatchUpIterationStart = Clock::now(); 00316 thisCatchUpIterationGoalId = consensus.log->getLastLogIndex(); 00317 ++consensus.numPeerThreads; 00318 NOTICE("Starting peer thread for server %lu", serverId); 00319 std::thread(&RaftConsensus::peerThreadMain, &consensus, self).detach(); 00320 } 00321 00322 std::shared_ptr<RPC::ClientSession> 00323 Peer::getSession(std::unique_lock<Mutex>& lockGuard) 00324 { 00325 if (!session || !session->getErrorMessage().empty()) { 00326 // Unfortunately, creating a session isn't currently interruptible, so 00327 // we use a timeout to prevent the server from hanging forever if some 00328 // peer thread happens to be creating a session when it's told to exit. 00329 // See https://github.com/logcabin/logcabin/issues/183 for more detail. 00330 TimePoint timeout = Clock::now() + consensus.ELECTION_TIMEOUT; 00331 // release lock for concurrency 00332 Core::MutexUnlock<Mutex> unlockGuard(lockGuard); 00333 RPC::Address target(addresses, Protocol::Common::DEFAULT_PORT); 00334 target.refresh(timeout); 00335 Client::SessionManager::ServerId peerId(serverId); 00336 session = consensus.sessionManager.createSession( 00337 target, 00338 timeout, 00339 &consensus.globals.clusterUUID, 00340 &peerId); 00341 } 00342 return session; 00343 } 00344 00345 std::ostream& 00346 Peer::dumpToStream(std::ostream& os) const 00347 { 00348 os << "Peer " << serverId << std::endl; 00349 os << "addresses: " << addresses << std::endl; 00350 switch (consensus.state) { 00351 case RaftConsensus::State::FOLLOWER: 00352 break; 00353 case RaftConsensus::State::CANDIDATE: 00354 os << "vote: "; 00355 if (requestVoteDone) { 00356 if (haveVote_) 00357 os << "granted"; 00358 else 00359 os << "not granted"; 00360 } else { 00361 os << "no response"; 00362 } 00363 os << std::endl; 00364 break; 00365 case RaftConsensus::State::LEADER: 00366 os << "suppressBulkData: " << suppressBulkData << std::endl; 00367 os << "nextIndex: " << nextIndex << std::endl; 00368 os << "matchIndex: " << matchIndex << std::endl; 00369 break; 00370 } 00371 return os; 00372 } 00373 00374 void 00375 Peer::updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats, 00376 Core::Time::SteadyTimeConverter& time) const 00377 { 00378 switch (consensus.state) { 00379 case RaftConsensus::State::FOLLOWER: 00380 break; 00381 case RaftConsensus::State::CANDIDATE: 00382 break; 00383 case RaftConsensus::State::LEADER: 00384 peerStats.set_suppress_bulk_data(suppressBulkData); 00385 peerStats.set_next_index(nextIndex); 00386 peerStats.set_last_agree_index(matchIndex); 00387 peerStats.set_is_caught_up(isCaughtUp_); 00388 peerStats.set_next_heartbeat_at(time.unixNanos(nextHeartbeatTime)); 00389 break; 00390 } 00391 00392 switch (consensus.state) { 00393 case RaftConsensus::State::FOLLOWER: 00394 break; 00395 case RaftConsensus::State::CANDIDATE: // fallthrough 00396 case RaftConsensus::State::LEADER: 00397 peerStats.set_request_vote_done(requestVoteDone); 00398 peerStats.set_have_vote(haveVote_); 00399 peerStats.set_backoff_until(time.unixNanos(backoffUntil)); 00400 break; 00401 } 00402 } 00403 00404 ////////// Configuration::SimpleConfiguration ////////// 00405 00406 Configuration::SimpleConfiguration::SimpleConfiguration() 00407 : servers() 00408 { 00409 } 00410 00411 Configuration::SimpleConfiguration::~SimpleConfiguration() 00412 { 00413 } 00414 00415 bool 00416 Configuration::SimpleConfiguration::all(const Predicate& predicate) const 00417 { 00418 for (auto it = servers.begin(); it != servers.end(); ++it) { 00419 if (!predicate(**it)) 00420 return false; 00421 } 00422 return true; 00423 } 00424 00425 bool 00426 Configuration::SimpleConfiguration::contains(std::shared_ptr<Server> server) 00427 const 00428 { 00429 for (auto it = servers.begin(); it != servers.end(); ++it) { 00430 if (*it == server) 00431 return true; 00432 } 00433 return false; 00434 } 00435 00436 void 00437 Configuration::SimpleConfiguration::forEach(const SideEffect& sideEffect) 00438 { 00439 for (auto it = servers.begin(); it != servers.end(); ++it) 00440 sideEffect(**it); 00441 } 00442 00443 uint64_t 00444 Configuration::SimpleConfiguration::min(const GetValue& getValue) const 00445 { 00446 if (servers.empty()) 00447 return 0; 00448 uint64_t smallest = ~0UL; 00449 for (auto it = servers.begin(); it != servers.end(); ++it) 00450 smallest = std::min(smallest, getValue(**it)); 00451 return smallest; 00452 } 00453 00454 bool 00455 Configuration::SimpleConfiguration::quorumAll(const Predicate& predicate) const 00456 { 00457 if (servers.empty()) 00458 return true; 00459 uint64_t count = 0; 00460 for (auto it = servers.begin(); it != servers.end(); ++it) 00461 if (predicate(**it)) 00462 ++count; 00463 return (count >= servers.size() / 2 + 1); 00464 } 00465 00466 uint64_t 00467 Configuration::SimpleConfiguration::quorumMin(const GetValue& getValue) const 00468 { 00469 if (servers.empty()) 00470 return 0; 00471 std::vector<uint64_t> values; 00472 for (auto it = servers.begin(); it != servers.end(); ++it) 00473 values.push_back(getValue(**it)); 00474 std::sort(values.begin(), values.end()); 00475 return values.at((values.size() - 1)/ 2); 00476 } 00477 00478 ////////// Configuration ////////// 00479 00480 Configuration::Configuration(uint64_t serverId, RaftConsensus& consensus) 00481 : consensus(consensus) 00482 , knownServers() 00483 , localServer() 00484 , state(State::BLANK) 00485 , id(0) 00486 , description() 00487 , oldServers() 00488 , newServers() 00489 { 00490 localServer.reset(new LocalServer(serverId, consensus)); 00491 knownServers[serverId] = localServer; 00492 } 00493 00494 Configuration::~Configuration() 00495 { 00496 } 00497 00498 void 00499 Configuration::forEach(const SideEffect& sideEffect) 00500 { 00501 for (auto it = knownServers.begin(); it != knownServers.end(); ++it) 00502 sideEffect(*it->second); 00503 } 00504 00505 bool 00506 Configuration::hasVote(std::shared_ptr<Server> server) const 00507 { 00508 if (state == State::TRANSITIONAL) { 00509 return (oldServers.contains(server) || 00510 newServers.contains(server)); 00511 } else { 00512 return oldServers.contains(server); 00513 } 00514 } 00515 00516 std::string 00517 Configuration::lookupAddress(uint64_t serverId) const 00518 { 00519 auto it = knownServers.find(serverId); 00520 if (it != knownServers.end()) 00521 return it->second->addresses; 00522 return ""; 00523 } 00524 00525 bool 00526 Configuration::quorumAll(const Predicate& predicate) const 00527 { 00528 if (state == State::TRANSITIONAL) { 00529 return (oldServers.quorumAll(predicate) && 00530 newServers.quorumAll(predicate)); 00531 } else { 00532 return oldServers.quorumAll(predicate); 00533 } 00534 } 00535 00536 uint64_t 00537 Configuration::quorumMin(const GetValue& getValue) const 00538 { 00539 if (state == State::TRANSITIONAL) { 00540 return std::min(oldServers.quorumMin(getValue), 00541 newServers.quorumMin(getValue)); 00542 } else { 00543 return oldServers.quorumMin(getValue); 00544 } 00545 } 00546 00547 void 00548 Configuration::resetStagingServers() 00549 { 00550 if (state == State::STAGING) { 00551 // staging servers could have changed other servers' addresses, so roll 00552 // back to old description with old addresses 00553 setConfiguration(id, description); 00554 } 00555 } 00556 00557 namespace { 00558 void setGCFlag(Server& server) 00559 { 00560 server.gcFlag = true; 00561 } 00562 } // anonymous namespace 00563 00564 void 00565 Configuration::reset() 00566 { 00567 NOTICE("Resetting to blank configuration"); 00568 state = State::BLANK; 00569 id = 0; 00570 description = {}; 00571 oldServers.servers.clear(); 00572 newServers.servers.clear(); 00573 for (auto it = knownServers.begin(); it != knownServers.end(); ++it) 00574 it->second->exit(); 00575 knownServers.clear(); 00576 knownServers[localServer->serverId] = localServer; 00577 } 00578 00579 void 00580 Configuration::setConfiguration( 00581 uint64_t newId, 00582 const Protocol::Raft::Configuration& newDescription) 00583 { 00584 NOTICE("Activating configuration %lu:\n%s", newId, 00585 Core::ProtoBuf::dumpString(newDescription).c_str()); 00586 00587 if (newDescription.next_configuration().servers().size() == 0) 00588 state = State::STABLE; 00589 else 00590 state = State::TRANSITIONAL; 00591 id = newId; 00592 description = newDescription; 00593 oldServers.servers.clear(); 00594 newServers.servers.clear(); 00595 00596 // Build up the list of old servers 00597 for (auto confIt = description.prev_configuration().servers().begin(); 00598 confIt != description.prev_configuration().servers().end(); 00599 ++confIt) { 00600 std::shared_ptr<Server> server = getServer(confIt->server_id()); 00601 server->addresses = confIt->addresses(); 00602 oldServers.servers.push_back(server); 00603 } 00604 00605 // Build up the list of new servers 00606 for (auto confIt = description.next_configuration().servers().begin(); 00607 confIt != description.next_configuration().servers().end(); 00608 ++confIt) { 00609 std::shared_ptr<Server> server = getServer(confIt->server_id()); 00610 server->addresses = confIt->addresses(); 00611 newServers.servers.push_back(server); 00612 } 00613 00614 // Servers not in the current configuration need to be told to exit 00615 setGCFlag(*localServer); 00616 oldServers.forEach(setGCFlag); 00617 newServers.forEach(setGCFlag); 00618 auto it = knownServers.begin(); 00619 while (it != knownServers.end()) { 00620 std::shared_ptr<Server> server = it->second; 00621 if (!server->gcFlag) { 00622 server->exit(); 00623 it = knownServers.erase(it); 00624 } else { 00625 server->gcFlag = false; // clear flag for next time 00626 ++it; 00627 } 00628 } 00629 } 00630 00631 void 00632 Configuration::setStagingServers( 00633 const Protocol::Raft::SimpleConfiguration& stagingServers) 00634 { 00635 assert(state == State::STABLE); 00636 state = State::STAGING; 00637 for (auto it = stagingServers.servers().begin(); 00638 it != stagingServers.servers().end(); 00639 ++it) { 00640 std::shared_ptr<Server> server = getServer(it->server_id()); 00641 server->addresses = it->addresses(); 00642 newServers.servers.push_back(server); 00643 } 00644 } 00645 00646 bool 00647 Configuration::stagingAll(const Predicate& predicate) const 00648 { 00649 if (state == State::STAGING) 00650 return newServers.all(predicate); 00651 else 00652 return true; 00653 } 00654 00655 uint64_t 00656 Configuration::stagingMin(const GetValue& getValue) const 00657 { 00658 if (state == State::STAGING) 00659 return newServers.min(getValue); 00660 else 00661 return 0; 00662 } 00663 00664 void 00665 Configuration::updateServerStats(Protocol::ServerStats& serverStats, 00666 Core::Time::SteadyTimeConverter& time) const 00667 { 00668 for (auto it = knownServers.begin(); 00669 it != knownServers.end(); 00670 ++it) { 00671 Protocol::ServerStats::Raft::Peer& peerStats = 00672 *serverStats.mutable_raft()->add_peer(); 00673 peerStats.set_server_id(it->first); 00674 const ServerRef& peer = it->second; 00675 peerStats.set_addresses(peer->addresses); 00676 peerStats.set_old_member(oldServers.contains(peer)); 00677 peerStats.set_new_member(state == State::TRANSITIONAL && 00678 newServers.contains(peer)); 00679 peerStats.set_staging_member(state == State::STAGING && 00680 newServers.contains(peer)); 00681 peer->updatePeerStats(peerStats, time); 00682 } 00683 } 00684 00685 std::ostream& 00686 operator<<(std::ostream& os, Configuration::State state) 00687 { 00688 typedef Configuration::State State; 00689 switch (state) { 00690 case State::BLANK: 00691 os << "State::BLANK"; 00692 break; 00693 case State::STABLE: 00694 os << "State::STABLE"; 00695 break; 00696 case State::STAGING: 00697 os << "State::STAGING"; 00698 break; 00699 case State::TRANSITIONAL: 00700 os << "State::TRANSITIONAL"; 00701 break; 00702 } 00703 return os; 00704 } 00705 00706 std::ostream& 00707 operator<<(std::ostream& os, const Configuration& configuration) 00708 { 00709 os << "Configuration: {" << std::endl; 00710 os << " state: " << configuration.state << std::endl; 00711 os << " id: " << configuration.id << std::endl; 00712 os << " description: " << std::endl; 00713 os << Core::ProtoBuf::dumpString(configuration.description); 00714 os << "}" << std::endl; 00715 for (auto it = configuration.knownServers.begin(); 00716 it != configuration.knownServers.end(); 00717 ++it) { 00718 os << *it->second; 00719 } 00720 return os; 00721 } 00722 00723 00724 ////////// Configuration private methods ////////// 00725 00726 std::shared_ptr<Server> 00727 Configuration::getServer(uint64_t newServerId) 00728 { 00729 auto it = knownServers.find(newServerId); 00730 if (it != knownServers.end()) { 00731 return it->second; 00732 } else { 00733 std::shared_ptr<Peer> peer(new Peer(newServerId, consensus)); 00734 if (startThreads) 00735 peer->startThread(peer); 00736 knownServers[newServerId] = peer; 00737 return peer; 00738 } 00739 } 00740 00741 ////////// ConfigurationManager ////////// 00742 00743 ConfigurationManager::ConfigurationManager(Configuration& configuration) 00744 : configuration(configuration) 00745 , descriptions() 00746 , snapshot(0, {}) 00747 { 00748 } 00749 00750 ConfigurationManager::~ConfigurationManager() 00751 { 00752 } 00753 00754 void 00755 ConfigurationManager::add( 00756 uint64_t index, 00757 const Protocol::Raft::Configuration& description) 00758 { 00759 descriptions[index] = description; 00760 restoreInvariants(); 00761 } 00762 00763 void 00764 ConfigurationManager::truncatePrefix(uint64_t firstIndexKept) 00765 { 00766 descriptions.erase(descriptions.begin(), 00767 descriptions.lower_bound(firstIndexKept)); 00768 restoreInvariants(); 00769 } 00770 00771 void 00772 ConfigurationManager::truncateSuffix(uint64_t lastIndexKept) 00773 { 00774 descriptions.erase(descriptions.upper_bound(lastIndexKept), 00775 descriptions.end()); 00776 restoreInvariants(); 00777 } 00778 00779 void 00780 ConfigurationManager::setSnapshot( 00781 uint64_t index, 00782 const Protocol::Raft::Configuration& description) 00783 { 00784 assert(index >= snapshot.first); 00785 snapshot = {index, description}; 00786 restoreInvariants(); 00787 } 00788 00789 std::pair<uint64_t, Protocol::Raft::Configuration> 00790 ConfigurationManager::getLatestConfigurationAsOf( 00791 uint64_t lastIncludedIndex) const 00792 { 00793 if (descriptions.empty()) 00794 return {0, {}}; 00795 auto it = descriptions.upper_bound(lastIncludedIndex); 00796 // 'it' is either an element or end() 00797 if (it == descriptions.begin()) 00798 return {0, {}}; 00799 --it; 00800 return *it; 00801 } 00802 00803 ////////// ConfigurationManager private methods ////////// 00804 00805 void 00806 ConfigurationManager::restoreInvariants() 00807 { 00808 if (snapshot.first != 0) 00809 descriptions.insert(snapshot); 00810 if (descriptions.empty()) { 00811 configuration.reset(); 00812 } else { 00813 auto it = descriptions.rbegin(); 00814 if (configuration.id != it->first) 00815 configuration.setConfiguration(it->first, it->second); 00816 } 00817 } 00818 00819 ////////// ClusterClock ////////// 00820 00821 ClusterClock::ClusterClock() 00822 : clusterTimeAtEpoch(0) 00823 , localTimeAtEpoch(Core::Time::SteadyClock::now()) 00824 { 00825 } 00826 00827 void 00828 ClusterClock::newEpoch(uint64_t clusterTime) 00829 { 00830 clusterTimeAtEpoch = clusterTime; 00831 localTimeAtEpoch = Core::Time::SteadyClock::now(); 00832 } 00833 00834 uint64_t 00835 ClusterClock::leaderStamp() 00836 { 00837 auto localTime = Core::Time::SteadyClock::now(); 00838 uint64_t nanosSinceEpoch = 00839 Core::Util::downCast<uint64_t>(std::chrono::nanoseconds( 00840 localTime - localTimeAtEpoch).count()); 00841 clusterTimeAtEpoch += nanosSinceEpoch; 00842 localTimeAtEpoch = localTime; 00843 return clusterTimeAtEpoch; 00844 } 00845 00846 uint64_t 00847 ClusterClock::interpolate() const 00848 { 00849 auto localTime = Core::Time::SteadyClock::now(); 00850 uint64_t nanosSinceEpoch = 00851 Core::Util::downCast<uint64_t>(std::chrono::nanoseconds( 00852 localTime - localTimeAtEpoch).count()); 00853 return clusterTimeAtEpoch + nanosSinceEpoch; 00854 } 00855 00856 namespace { 00857 00858 struct StagingProgressing { 00859 StagingProgressing(uint64_t epoch, 00860 Protocol::Client::SetConfiguration::Response& response) 00861 : epoch(epoch) 00862 , response(response) 00863 { 00864 } 00865 bool operator()(Server& server) { 00866 uint64_t serverEpoch = server.getLastAckEpoch(); 00867 if (serverEpoch < epoch) { 00868 auto& s = *response.mutable_configuration_bad()->add_bad_servers(); 00869 s.set_server_id(server.serverId); 00870 s.set_addresses(server.addresses); 00871 return false; 00872 } 00873 return true; 00874 } 00875 const uint64_t epoch; 00876 Protocol::Client::SetConfiguration::Response& response; 00877 }; 00878 00879 struct StateMachineVersionIntersection { 00880 StateMachineVersionIntersection() 00881 : missingCount(0) 00882 , allCount(0) 00883 , minVersion(0) 00884 , maxVersion(std::numeric_limits<uint16_t>::max()) { 00885 } 00886 void operator()(Server& server) { 00887 ++allCount; 00888 if (server.haveStateMachineSupportedVersions) { 00889 minVersion = std::max(server.minStateMachineVersion, 00890 minVersion); 00891 maxVersion = std::min(server.maxStateMachineVersion, 00892 maxVersion); 00893 } else { 00894 ++missingCount; 00895 } 00896 } 00897 uint64_t missingCount; 00898 uint64_t allCount; 00899 uint16_t minVersion; 00900 uint16_t maxVersion; 00901 }; 00902 00903 } // anonymous namespace 00904 00905 } // namespace RaftConsensusInternal 00906 00907 ////////// RaftConsensus::Entry ////////// 00908 00909 RaftConsensus::Entry::Entry() 00910 : index(0) 00911 , type(SKIP) 00912 , command() 00913 , snapshotReader() 00914 , clusterTime(0) 00915 { 00916 } 00917 00918 RaftConsensus::Entry::Entry(Entry&& other) 00919 : index(other.index) 00920 , type(other.type) 00921 , command(std::move(other.command)) 00922 , snapshotReader(std::move(other.snapshotReader)) 00923 , clusterTime(other.clusterTime) 00924 { 00925 } 00926 00927 RaftConsensus::Entry::~Entry() 00928 { 00929 } 00930 00931 ////////// RaftConsensus ////////// 00932 00933 RaftConsensus::RaftConsensus(Globals& globals) 00934 : ELECTION_TIMEOUT( 00935 std::chrono::milliseconds( 00936 globals.config.read<uint64_t>( 00937 "electionTimeoutMilliseconds", 00938 500))) 00939 , HEARTBEAT_PERIOD( 00940 globals.config.keyExists("heartbeatPeriodMilliseconds") 00941 ? std::chrono::nanoseconds( 00942 std::chrono::milliseconds( 00943 globals.config.read<uint64_t>( 00944 "heartbeatPeriodMilliseconds"))) 00945 : ELECTION_TIMEOUT / 2) 00946 , MAX_LOG_ENTRIES_PER_REQUEST( 00947 globals.config.read<uint64_t>( 00948 "maxLogEntriesPerRequest", 00949 5000)) 00950 , RPC_FAILURE_BACKOFF( 00951 globals.config.keyExists("rpcFailureBackoffMilliseconds") 00952 ? std::chrono::nanoseconds( 00953 std::chrono::milliseconds( 00954 globals.config.read<uint64_t>( 00955 "rpcFailureBackoffMilliseconds"))) 00956 : (ELECTION_TIMEOUT / 2)) 00957 , STATE_MACHINE_UPDATER_BACKOFF( 00958 std::chrono::milliseconds( 00959 globals.config.read<uint64_t>( 00960 "stateMachineUpdaterBackoffMilliseconds", 00961 10000))) 00962 , SOFT_RPC_SIZE_LIMIT(Protocol::Common::MAX_MESSAGE_LENGTH - 1024) 00963 , serverId(0) 00964 , serverAddresses() 00965 , globals(globals) 00966 , storageLayout() 00967 , sessionManager(globals.eventLoop, 00968 globals.config) 00969 , mutex() 00970 , stateChanged() 00971 , exiting(false) 00972 , numPeerThreads(0) 00973 , log() 00974 , logSyncQueued(false) 00975 , leaderDiskThreadWorking(false) 00976 , configuration() 00977 , configurationManager() 00978 , currentTerm(0) 00979 , state(State::FOLLOWER) 00980 , lastSnapshotIndex(0) 00981 , lastSnapshotTerm(0) 00982 , lastSnapshotClusterTime(0) 00983 , lastSnapshotBytes(0) 00984 , snapshotReader() 00985 , snapshotWriter() 00986 , commitIndex(0) 00987 , leaderId(0) 00988 , votedFor(0) 00989 , currentEpoch(0) 00990 , clusterClock() 00991 , startElectionAt(TimePoint::max()) 00992 , withholdVotesUntil(TimePoint::min()) 00993 , numEntriesTruncated(0) 00994 , leaderDiskThread() 00995 , timerThread() 00996 , stateMachineUpdaterThread() 00997 , stepDownThread() 00998 , invariants(*this) 00999 { 01000 } 01001 01002 RaftConsensus::~RaftConsensus() 01003 { 01004 if (!exiting) 01005 exit(); 01006 if (leaderDiskThread.joinable()) 01007 leaderDiskThread.join(); 01008 if (timerThread.joinable()) 01009 timerThread.join(); 01010 if (stateMachineUpdaterThread.joinable()) 01011 stateMachineUpdaterThread.join(); 01012 if (stepDownThread.joinable()) 01013 stepDownThread.join(); 01014 NOTICE("Joined with disk and timer threads"); 01015 std::unique_lock<Mutex> lockGuard(mutex); 01016 if (numPeerThreads > 0) { 01017 NOTICE("Waiting for %u peer threads to exit", numPeerThreads); 01018 while (numPeerThreads > 0) 01019 stateChanged.wait(lockGuard); 01020 } 01021 NOTICE("Peer threads have exited"); 01022 // issue any outstanding disk flushes 01023 if (logSyncQueued) { 01024 std::unique_ptr<Log::Sync> sync = log->takeSync(); 01025 sync->wait(); 01026 log->syncComplete(std::move(sync)); 01027 } 01028 NOTICE("Completed disk writes"); 01029 } 01030 01031 void 01032 RaftConsensus::init() 01033 { 01034 std::lock_guard<Mutex> lockGuard(mutex); 01035 #if DEBUG 01036 if (globals.config.read<bool>("raftDebug", false)) { 01037 mutex.callback = std::bind(&Invariants::checkAll, &invariants); 01038 } 01039 #endif 01040 01041 NOTICE("My server ID is %lu", serverId); 01042 01043 if (storageLayout.topDir.fd == -1) { 01044 if (globals.config.read("use-temporary-storage", false)) 01045 storageLayout.initTemporary(serverId); // unit tests 01046 else 01047 storageLayout.init(globals.config, serverId); 01048 } 01049 01050 configuration.reset(new Configuration(serverId, *this)); 01051 configurationManager.reset(new ConfigurationManager(*configuration)); 01052 01053 NOTICE("Reading the log"); 01054 if (!log) { // some unit tests pre-set the log; don't overwrite it 01055 log = Storage::LogFactory::makeLog(globals.config, storageLayout); 01056 } 01057 for (uint64_t index = log->getLogStartIndex(); 01058 index <= log->getLastLogIndex(); 01059 ++index) { 01060 const Log::Entry& entry = log->getEntry(index); 01061 if (entry.type() == Protocol::Raft::EntryType::UNKNOWN) { 01062 PANIC("Don't understand the entry type for index %lu (term %lu) " 01063 "found on disk", 01064 index, entry.term()); 01065 } 01066 if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION) { 01067 configurationManager->add(index, entry.configuration()); 01068 } 01069 } 01070 01071 // Restore cluster time epoch from last log entry, if any 01072 if (log->getLastLogIndex() >= log->getLogStartIndex()) { 01073 clusterClock.newEpoch( 01074 log->getEntry(log->getLastLogIndex()).cluster_time()); 01075 } 01076 01077 NOTICE("The log contains indexes %lu through %lu (inclusive)", 01078 log->getLogStartIndex(), log->getLastLogIndex()); 01079 01080 if (log->metadata.has_current_term()) 01081 currentTerm = log->metadata.current_term(); 01082 if (log->metadata.has_voted_for()) 01083 votedFor = log->metadata.voted_for(); 01084 updateLogMetadata(); 01085 01086 // Read snapshot after reading log, since readSnapshot() will get rid of 01087 // conflicting log entries 01088 readSnapshot(); 01089 01090 // Clean up incomplete snapshots left by prior runs. This could be done 01091 // earlier, but maybe it's nicer to make sure we can get to this point 01092 // without PANICing before deleting these files. 01093 Storage::SnapshotFile::discardPartialSnapshots(storageLayout); 01094 01095 if (configuration->id == 0) 01096 NOTICE("No configuration, waiting to receive one."); 01097 01098 stepDown(currentTerm); 01099 if (RaftConsensusInternal::startThreads) { 01100 leaderDiskThread = std::thread( 01101 &RaftConsensus::leaderDiskThreadMain, this); 01102 timerThread = std::thread( 01103 &RaftConsensus::timerThreadMain, this); 01104 if (globals.config.read<bool>("disableStateMachineUpdates", false)) { 01105 NOTICE("Not starting state machine updater thread (state machine " 01106 "updates are disabled in config)"); 01107 } else { 01108 stateMachineUpdaterThread = std::thread( 01109 &RaftConsensus::stateMachineUpdaterThreadMain, this); 01110 } 01111 stepDownThread = std::thread( 01112 &RaftConsensus::stepDownThreadMain, this); 01113 } 01114 // log->path = ""; // hack to disable disk 01115 stateChanged.notify_all(); 01116 printElectionState(); 01117 } 01118 01119 void 01120 RaftConsensus::exit() 01121 { 01122 NOTICE("Shutting down"); 01123 std::lock_guard<Mutex> lockGuard(mutex); 01124 exiting = true; 01125 if (configuration) 01126 configuration->forEach(&Server::exit); 01127 interruptAll(); 01128 } 01129 01130 void 01131 RaftConsensus::bootstrapConfiguration() 01132 { 01133 std::lock_guard<Mutex> lockGuard(mutex); 01134 01135 if (currentTerm != 0 || 01136 log->getLogStartIndex() != 1 || 01137 log->getLastLogIndex() != 0 || 01138 lastSnapshotIndex != 0) { 01139 PANIC("Refusing to bootstrap configuration: it looks like a log or " 01140 "snapshot already exists."); 01141 } 01142 stepDown(1); // satisfies invariants assertions 01143 01144 // Append the configuration entry to the log 01145 Log::Entry entry; 01146 entry.set_term(1); 01147 entry.set_type(Protocol::Raft::EntryType::CONFIGURATION); 01148 entry.set_cluster_time(0); 01149 Protocol::Raft::Configuration& configuration = 01150 *entry.mutable_configuration(); 01151 Protocol::Raft::Server& server = 01152 *configuration.mutable_prev_configuration()->add_servers(); 01153 server.set_server_id(serverId); 01154 server.set_addresses(serverAddresses); 01155 append({&entry}); 01156 } 01157 01158 RaftConsensus::ClientResult 01159 RaftConsensus::getConfiguration( 01160 Protocol::Raft::SimpleConfiguration& currentConfiguration, 01161 uint64_t& id) const 01162 { 01163 std::unique_lock<Mutex> lockGuard(mutex); 01164 if (!upToDateLeader(lockGuard)) 01165 return ClientResult::NOT_LEADER; 01166 if (configuration->state != Configuration::State::STABLE || 01167 commitIndex < configuration->id) { 01168 return ClientResult::RETRY; 01169 } 01170 currentConfiguration = configuration->description.prev_configuration(); 01171 id = configuration->id; 01172 return ClientResult::SUCCESS; 01173 } 01174 01175 std::pair<RaftConsensus::ClientResult, uint64_t> 01176 RaftConsensus::getLastCommitIndex() const 01177 { 01178 std::unique_lock<Mutex> lockGuard(mutex); 01179 if (!upToDateLeader(lockGuard)) 01180 return {ClientResult::NOT_LEADER, 0}; 01181 else 01182 return {ClientResult::SUCCESS, commitIndex}; 01183 } 01184 01185 std::string 01186 RaftConsensus::getLeaderHint() const 01187 { 01188 std::lock_guard<Mutex> lockGuard(mutex); 01189 return configuration->lookupAddress(leaderId); 01190 } 01191 01192 RaftConsensus::Entry 01193 RaftConsensus::getNextEntry(uint64_t lastIndex) const 01194 { 01195 std::unique_lock<Mutex> lockGuard(mutex); 01196 uint64_t nextIndex = lastIndex + 1; 01197 while (true) { 01198 if (exiting) 01199 throw Core::Util::ThreadInterruptedException(); 01200 if (commitIndex >= nextIndex) { 01201 RaftConsensus::Entry entry; 01202 01203 // Make the state machine load a snapshot if we don't have the next 01204 // entry it needs in the log. 01205 if (log->getLogStartIndex() > nextIndex) { 01206 entry.type = Entry::SNAPSHOT; 01207 // For well-behaved state machines, we expect 'snapshotReader' 01208 // to contain a SnapshotFile::Reader that we can return 01209 // directly to the state machine. In the case that a State 01210 // Machine asks for the snapshot again, we have to build a new 01211 // SnapshotFile::Reader again. 01212 entry.snapshotReader = std::move(snapshotReader); 01213 if (!entry.snapshotReader) { 01214 WARNING("State machine asked for same snapshot twice; " 01215 "this shouldn't happen in normal operation. " 01216 "Having to re-read it from disk."); 01217 // readSnapshot() shouldn't have any side effects since the 01218 // snapshot should have already been read, so const_cast 01219 // should be ok (though ugly). 01220 const_cast<RaftConsensus*>(this)->readSnapshot(); 01221 entry.snapshotReader = std::move(snapshotReader); 01222 } 01223 entry.index = lastSnapshotIndex; 01224 entry.clusterTime = lastSnapshotClusterTime; 01225 } else { 01226 // not a snapshot 01227 const Log::Entry& logEntry = log->getEntry(nextIndex); 01228 entry.index = nextIndex; 01229 if (logEntry.type() == Protocol::Raft::EntryType::DATA) { 01230 entry.type = Entry::DATA; 01231 const std::string& s = logEntry.data(); 01232 entry.command = Core::Buffer( 01233 memcpy(new char[s.length()], s.data(), s.length()), 01234 s.length(), 01235 Core::Buffer::deleteArrayFn<char>); 01236 } else { 01237 entry.type = Entry::SKIP; 01238 } 01239 entry.clusterTime = logEntry.cluster_time(); 01240 } 01241 return entry; 01242 } 01243 stateChanged.wait(lockGuard); 01244 } 01245 } 01246 01247 SnapshotStats::SnapshotStats 01248 RaftConsensus::getSnapshotStats() const 01249 { 01250 std::lock_guard<Mutex> lockGuard(mutex); 01251 01252 SnapshotStats::SnapshotStats s; 01253 s.set_last_snapshot_index(lastSnapshotIndex); 01254 s.set_last_snapshot_bytes(lastSnapshotBytes); 01255 s.set_log_start_index(log->getLogStartIndex()); 01256 s.set_last_log_index(log->getLastLogIndex()); 01257 s.set_log_bytes(log->getSizeBytes()); 01258 s.set_is_leader(state == State::LEADER); 01259 return s; 01260 } 01261 01262 void 01263 RaftConsensus::handleAppendEntries( 01264 const Protocol::Raft::AppendEntries::Request& request, 01265 Protocol::Raft::AppendEntries::Response& response) 01266 { 01267 std::lock_guard<Mutex> lockGuard(mutex); 01268 assert(!exiting); 01269 01270 // Set response to a rejection. We'll overwrite these later if we end up 01271 // accepting the request. 01272 response.set_term(currentTerm); 01273 response.set_success(false); 01274 response.set_last_log_index(log->getLastLogIndex()); 01275 01276 // Piggy-back server capabilities. 01277 { 01278 auto& cap = *response.mutable_server_capabilities(); 01279 auto& s = *configuration->localServer; 01280 if (s.haveStateMachineSupportedVersions) { 01281 cap.set_min_supported_state_machine_version( 01282 s.minStateMachineVersion); 01283 cap.set_max_supported_state_machine_version( 01284 s.maxStateMachineVersion); 01285 } 01286 } 01287 01288 // If the caller's term is stale, just return our term to it. 01289 if (request.term() < currentTerm) { 01290 VERBOSE("Caller(%lu) is stale. Our term is %lu, theirs is %lu", 01291 request.server_id(), currentTerm, request.term()); 01292 return; // response was set to a rejection above 01293 } 01294 if (request.term() > currentTerm) { 01295 NOTICE("Received AppendEntries request from server %lu in term %lu " 01296 "(this server's term was %lu)", 01297 request.server_id(), request.term(), currentTerm); 01298 // We're about to bump our term in the stepDown below: update 01299 // 'response' accordingly. 01300 response.set_term(request.term()); 01301 } 01302 // This request is a sign of life from the current leader. Update 01303 // our term and convert to follower if necessary; reset the 01304 // election timer. set it here in case request we exit the 01305 // function early, we will set it again after the disk write. 01306 stepDown(request.term()); 01307 setElectionTimer(); 01308 withholdVotesUntil = Clock::now() + ELECTION_TIMEOUT; 01309 01310 // Record the leader ID as a hint for clients. 01311 if (leaderId == 0) { 01312 leaderId = request.server_id(); 01313 NOTICE("All hail leader %lu for term %lu", leaderId, currentTerm); 01314 printElectionState(); 01315 } else { 01316 assert(leaderId == request.server_id()); 01317 } 01318 01319 // For an entry to fit into our log, it must not leave a gap. 01320 if (request.prev_log_index() > log->getLastLogIndex()) { 01321 VERBOSE("Rejecting AppendEntries RPC: would leave gap"); 01322 return; // response was set to a rejection above 01323 } 01324 // It must also agree with the previous entry in the log (and, inductively 01325 // all prior entries). 01326 // Always match on index 0, and always match on any discarded indexes: 01327 // since we know those were committed, the leader must agree with them. 01328 // We could truncate the log here, but there's no real advantage to doing 01329 // that. 01330 if (request.prev_log_index() >= log->getLogStartIndex() && 01331 log->getEntry(request.prev_log_index()).term() != 01332 request.prev_log_term()) { 01333 VERBOSE("Rejecting AppendEntries RPC: terms don't agree"); 01334 return; // response was set to a rejection above 01335 } 01336 01337 // If we got this far, we're accepting the request. 01338 response.set_success(true); 01339 01340 // This needs to be able to handle duplicated RPC requests. We compare the 01341 // entries' terms to know if we need to do the operation; otherwise, 01342 // reapplying requests can result in data loss. 01343 // 01344 // The first problem this solves is that an old AppendEntries request may be 01345 // duplicated and received after a newer request, which could cause 01346 // undesirable data loss. For example, suppose the leader appends entry 4 01347 // and then entry 5, but the follower receives 4, then 5, then 4 again. 01348 // Without this extra guard, the follower would truncate 5 out of its 01349 // log. 01350 // 01351 // The second problem is more subtle: if the same request is duplicated but 01352 // the leader processes an earlier response, it will assume the 01353 // acknowledged data is safe. However, there is a window of vulnerability 01354 // on the follower's disk between the truncate and append operations (which 01355 // are not done atomically) when the follower processes the later request. 01356 uint64_t index = request.prev_log_index(); 01357 for (auto it = request.entries().begin(); 01358 it != request.entries().end(); 01359 ++it) { 01360 ++index; 01361 const Protocol::Raft::Entry& entry = *it; 01362 if (entry.has_index()) { 01363 // This precaution was added after #160: "Packing entries into 01364 // AppendEntries requests is broken (critical)". 01365 assert(entry.index() == index); 01366 } 01367 if (index < log->getLogStartIndex()) { 01368 // We already snapshotted and discarded this index, so presumably 01369 // we've received a committed entry we once already had. 01370 continue; 01371 } 01372 if (log->getLastLogIndex() >= index) { 01373 if (log->getEntry(index).term() == entry.term()) 01374 continue; 01375 // should never truncate committed entries: 01376 assert(commitIndex < index); 01377 uint64_t lastIndexKept = index - 1; 01378 uint64_t numTruncating = log->getLastLogIndex() - lastIndexKept; 01379 NOTICE("Truncating %lu entries after %lu from the log", 01380 numTruncating, 01381 lastIndexKept); 01382 numEntriesTruncated += numTruncating; 01383 log->truncateSuffix(lastIndexKept); 01384 configurationManager->truncateSuffix(lastIndexKept); 01385 } 01386 01387 // Append this and all following entries. 01388 std::vector<const Protocol::Raft::Entry*> entries; 01389 do { 01390 const Protocol::Raft::Entry& entry = *it; 01391 if (entry.type() == Protocol::Raft::EntryType::UNKNOWN) { 01392 PANIC("Leader %lu is trying to send us an unknown log entry " 01393 "type for index %lu (term %lu). It shouldn't do that, " 01394 "and there's not a good way forward. There's some hope " 01395 "that if this server reboots, it'll come back up with a " 01396 "newer version of the code that understands the entry.", 01397 index, 01398 entry.term(), 01399 leaderId); 01400 } 01401 entries.push_back(&entry); 01402 ++it; 01403 ++index; 01404 } while (it != request.entries().end()); 01405 append(entries); 01406 clusterClock.newEpoch(entries.back()->cluster_time()); 01407 break; 01408 } 01409 response.set_last_log_index(log->getLastLogIndex()); 01410 01411 // Set our committed ID from the request's. In rare cases, this would make 01412 // our committed ID decrease. For example, this could happen with a new 01413 // leader who has not yet replicated one of its own entries. While that'd 01414 // be perfectly safe, guarding against it with an if statement lets us 01415 // make stronger assertions. 01416 if (commitIndex < request.commit_index()) { 01417 commitIndex = request.commit_index(); 01418 assert(commitIndex <= log->getLastLogIndex()); 01419 stateChanged.notify_all(); 01420 VERBOSE("New commitIndex: %lu", commitIndex); 01421 } 01422 01423 // reset election timer to avoid punishing the leader for our own 01424 // long disk writes 01425 setElectionTimer(); 01426 withholdVotesUntil = Clock::now() + ELECTION_TIMEOUT; 01427 } 01428 01429 void 01430 RaftConsensus::handleInstallSnapshot( 01431 const Protocol::Raft::InstallSnapshot::Request& request, 01432 Protocol::Raft::InstallSnapshot::Response& response) 01433 { 01434 std::lock_guard<Mutex> lockGuard(mutex); 01435 assert(!exiting); 01436 01437 response.set_term(currentTerm); 01438 01439 // If the caller's term is stale, just return our term to it. 01440 if (request.term() < currentTerm) { 01441 VERBOSE("Caller(%lu) is stale. Our term is %lu, theirs is %lu", 01442 request.server_id(), currentTerm, request.term()); 01443 return; 01444 } 01445 if (request.term() > currentTerm) { 01446 NOTICE("Received InstallSnapshot request from server %lu in " 01447 "term %lu (this server's term was %lu)", 01448 request.server_id(), request.term(), currentTerm); 01449 // We're about to bump our term in the stepDown below: update 01450 // 'response' accordingly. 01451 response.set_term(request.term()); 01452 } 01453 // This request is a sign of life from the current leader. Update our term 01454 // and convert to follower if necessary; reset the election timer. 01455 stepDown(request.term()); 01456 setElectionTimer(); 01457 withholdVotesUntil = Clock::now() + ELECTION_TIMEOUT; 01458 01459 // Record the leader ID as a hint for clients. 01460 if (leaderId == 0) { 01461 leaderId = request.server_id(); 01462 NOTICE("All hail leader %lu for term %lu", leaderId, currentTerm); 01463 printElectionState(); 01464 } else { 01465 assert(leaderId == request.server_id()); 01466 } 01467 01468 if (!snapshotWriter) { 01469 snapshotWriter.reset( 01470 new Storage::SnapshotFile::Writer(storageLayout)); 01471 } 01472 response.set_bytes_stored(snapshotWriter->getBytesWritten()); 01473 01474 if (request.byte_offset() < snapshotWriter->getBytesWritten()) { 01475 WARNING("Ignoring stale snapshot chunk for byte offset %lu when the " 01476 "next byte needed is %lu", 01477 request.byte_offset(), 01478 snapshotWriter->getBytesWritten()); 01479 return; 01480 } 01481 if (request.byte_offset() > snapshotWriter->getBytesWritten()) { 01482 WARNING("Leader tried to send snapshot chunk at byte offset %lu but " 01483 "the next byte needed is %lu. Discarding the chunk.", 01484 request.byte_offset(), 01485 snapshotWriter->getBytesWritten()); 01486 if (!request.has_version() || request.version() < 2) { 01487 // For compatibility with InstallSnapshot version 1 leader: such a 01488 // leader assumes the InstallSnapshot RPC succeeded if the terms 01489 // match (it ignores the 'bytes_stored' field). InstallSnapshot 01490 // hasn't succeeded here, so we can't respond ok. 01491 WARNING("Incrementing our term (to %lu) to force the leader " 01492 "(of %lu) to step down and forget about the partial " 01493 "snapshot it's sending", 01494 currentTerm + 1, 01495 currentTerm); 01496 stepDown(currentTerm + 1); 01497 // stepDown() changed currentTerm to currentTerm + 1 01498 response.set_term(currentTerm); 01499 } 01500 return; 01501 } 01502 snapshotWriter->writeRaw(request.data().data(), request.data().length()); 01503 response.set_bytes_stored(snapshotWriter->getBytesWritten()); 01504 01505 if (request.done()) { 01506 if (request.last_snapshot_index() < lastSnapshotIndex) { 01507 WARNING("The leader sent us a snapshot, but it's stale: it only " 01508 "covers up through index %lu and we already have one " 01509 "through %lu. A well-behaved leader shouldn't do that. " 01510 "Discarding the snapshot.", 01511 request.last_snapshot_index(), 01512 lastSnapshotIndex); 01513 snapshotWriter->discard(); 01514 snapshotWriter.reset(); 01515 return; 01516 } 01517 NOTICE("Loading in new snapshot from leader"); 01518 snapshotWriter->save(); 01519 snapshotWriter.reset(); 01520 readSnapshot(); 01521 stateChanged.notify_all(); 01522 } 01523 } 01524 01525 void 01526 RaftConsensus::handleRequestVote( 01527 const Protocol::Raft::RequestVote::Request& request, 01528 Protocol::Raft::RequestVote::Response& response) 01529 { 01530 std::lock_guard<Mutex> lockGuard(mutex); 01531 assert(!exiting); 01532 01533 // If the caller has a less complete log, we can't give it our vote. 01534 uint64_t lastLogIndex = log->getLastLogIndex(); 01535 uint64_t lastLogTerm = getLastLogTerm(); 01536 bool logIsOk = (request.last_log_term() > lastLogTerm || 01537 (request.last_log_term() == lastLogTerm && 01538 request.last_log_index() >= lastLogIndex)); 01539 01540 if (withholdVotesUntil > Clock::now()) { 01541 NOTICE("Rejecting RequestVote for term %lu from server %lu, since " 01542 "this server (which is in term %lu) recently heard from a " 01543 "leader (%lu). Should server %lu be shut down?", 01544 request.term(), request.server_id(), currentTerm, 01545 leaderId, request.server_id()); 01546 response.set_term(currentTerm); 01547 response.set_granted(false); 01548 response.set_log_ok(logIsOk); 01549 return; 01550 } 01551 01552 if (request.term() > currentTerm) { 01553 NOTICE("Received RequestVote request from server %lu in term %lu " 01554 "(this server's term was %lu)", 01555 request.server_id(), request.term(), currentTerm); 01556 stepDown(request.term()); 01557 } 01558 01559 // At this point, if leaderId != 0, we could tell the caller to step down. 01560 // However, this is just an optimization that does not affect correctness 01561 // or really even efficiency, so it's not worth the trouble. 01562 01563 if (request.term() == currentTerm) { 01564 if (logIsOk && votedFor == 0) { 01565 // Give caller our vote 01566 NOTICE("Voting for %lu in term %lu", 01567 request.server_id(), currentTerm); 01568 stepDown(currentTerm); 01569 setElectionTimer(); 01570 votedFor = request.server_id(); 01571 updateLogMetadata(); 01572 printElectionState(); 01573 } 01574 } 01575 01576 // Fill in response. 01577 response.set_term(currentTerm); 01578 // don't strictly need the first condition 01579 response.set_granted(request.term() == currentTerm && 01580 votedFor == request.server_id()); 01581 response.set_log_ok(logIsOk); 01582 } 01583 01584 std::pair<RaftConsensus::ClientResult, uint64_t> 01585 RaftConsensus::replicate(const Core::Buffer& operation) 01586 { 01587 std::unique_lock<Mutex> lockGuard(mutex); 01588 Log::Entry entry; 01589 entry.set_type(Protocol::Raft::EntryType::DATA); 01590 entry.set_data(operation.getData(), operation.getLength()); 01591 return replicateEntry(entry, lockGuard); 01592 } 01593 01594 RaftConsensus::ClientResult 01595 RaftConsensus::setConfiguration( 01596 const Protocol::Client::SetConfiguration::Request& request, 01597 Protocol::Client::SetConfiguration::Response& response) 01598 { 01599 std::unique_lock<Mutex> lockGuard(mutex); 01600 01601 if (exiting || state != State::LEADER) { 01602 // caller fills out response 01603 return ClientResult::NOT_LEADER; 01604 } 01605 if (configuration->id != request.old_id()) { 01606 // configurations has changed in the meantime 01607 response.mutable_configuration_changed()->set_error( 01608 Core::StringUtil::format( 01609 "The current configuration has ID %lu (no longer %lu) " 01610 "and it's %s", 01611 configuration->id, 01612 request.old_id(), 01613 Core::StringUtil::toString(configuration->state).c_str())); 01614 return ClientResult::FAIL; 01615 } 01616 if (configuration->state != Configuration::State::STABLE) { 01617 response.mutable_configuration_changed()->set_error( 01618 Core::StringUtil::format( 01619 "The current configuration (%lu) is not stable (it's %s)", 01620 configuration->id, 01621 Core::StringUtil::toString(configuration->state).c_str())); 01622 return ClientResult::FAIL; 01623 } 01624 01625 NOTICE("Attempting to change the configuration from %lu", 01626 configuration->id); 01627 01628 // Set the staging servers in the configuration. 01629 Protocol::Raft::SimpleConfiguration nextConfiguration; 01630 for (auto it = request.new_servers().begin(); 01631 it != request.new_servers().end(); 01632 ++it) { 01633 NOTICE("Adding server %lu at %s to staging servers", 01634 it->server_id(), it->addresses().c_str()); 01635 Protocol::Raft::Server* s = nextConfiguration.add_servers(); 01636 s->set_server_id(it->server_id()); 01637 s->set_addresses(it->addresses()); 01638 } 01639 configuration->setStagingServers(nextConfiguration); 01640 stateChanged.notify_all(); 01641 01642 // Wait for new servers to be caught up. This will abort if not every 01643 // server makes progress in a ELECTION_TIMEOUT period. 01644 uint64_t term = currentTerm; 01645 ++currentEpoch; 01646 uint64_t epoch = currentEpoch; 01647 TimePoint checkProgressAt = Clock::now() + ELECTION_TIMEOUT; 01648 while (true) { 01649 if (exiting || term != currentTerm) { 01650 NOTICE("Lost leadership, aborting configuration change"); 01651 // caller will fill in response 01652 return ClientResult::NOT_LEADER; 01653 } 01654 if (configuration->stagingAll(&Server::isCaughtUp)) { 01655 NOTICE("Done catching up servers"); 01656 break; 01657 } 01658 if (Clock::now() >= checkProgressAt) { 01659 using RaftConsensusInternal::StagingProgressing; 01660 StagingProgressing progressing(epoch, response); 01661 if (!configuration->stagingAll(progressing)) { 01662 NOTICE("Failed to catch up new servers, aborting " 01663 "configuration change"); 01664 configuration->resetStagingServers(); 01665 stateChanged.notify_all(); 01666 // progressing filled in response 01667 return ClientResult::FAIL; 01668 } else { 01669 ++currentEpoch; 01670 epoch = currentEpoch; 01671 checkProgressAt = Clock::now() + ELECTION_TIMEOUT; 01672 } 01673 } 01674 stateChanged.wait_until(lockGuard, checkProgressAt); 01675 } 01676 01677 // Write and commit transitional configuration 01678 NOTICE("Writing transitional configuration entry"); 01679 Protocol::Raft::Configuration newConfiguration; 01680 *newConfiguration.mutable_prev_configuration() = 01681 configuration->description.prev_configuration(); 01682 *newConfiguration.mutable_next_configuration() = nextConfiguration; 01683 Log::Entry entry; 01684 entry.set_type(Protocol::Raft::EntryType::CONFIGURATION); 01685 *entry.mutable_configuration() = newConfiguration; 01686 std::pair<ClientResult, uint64_t> result = 01687 replicateEntry(entry, lockGuard); 01688 if (result.first != ClientResult::SUCCESS) { 01689 NOTICE("Failed to commit transitional configuration entry, aborting " 01690 "configuration change (%s)", 01691 Core::StringUtil::toString(result.first).c_str()); 01692 if (result.first == ClientResult::NOT_LEADER) { 01693 // caller will fill in response 01694 } else { 01695 response.mutable_configuration_changed()->set_error( 01696 Core::StringUtil::format( 01697 "Couldn't successfully replicate the transitional " 01698 "configuration (%s)", 01699 Core::StringUtil::toString(result.first).c_str())); 01700 } 01701 return result.first; 01702 } 01703 uint64_t transitionalId = result.second; 01704 01705 // Wait until the configuration that removes the old servers has been 01706 // committed. This is the first configuration with ID greater than 01707 // transitionalId. 01708 NOTICE("Waiting for stable configuration to commit"); 01709 while (true) { 01710 // Check this first: if the new configuration excludes us so we've 01711 // stepped down upon committing it, we still want to return success. 01712 if (configuration->id > transitionalId && 01713 commitIndex >= configuration->id) { 01714 response.mutable_ok(); 01715 NOTICE("Stable configuration committed. Configuration change " 01716 "completed successfully"); 01717 return ClientResult::SUCCESS; 01718 } 01719 if (exiting || term != currentTerm) { 01720 NOTICE("Lost leadership"); 01721 // caller fills in response 01722 return ClientResult::NOT_LEADER; 01723 } 01724 stateChanged.wait(lockGuard); 01725 } 01726 } 01727 01728 void 01729 RaftConsensus::setSupportedStateMachineVersions(uint16_t minSupported, 01730 uint16_t maxSupported) 01731 { 01732 std::lock_guard<Mutex> lockGuard(mutex); 01733 auto& s = *configuration->localServer; 01734 if (!s.haveStateMachineSupportedVersions || 01735 s.minStateMachineVersion != minSupported || 01736 s.maxStateMachineVersion != maxSupported) { 01737 01738 s.haveStateMachineSupportedVersions = true; 01739 s.minStateMachineVersion = minSupported; 01740 s.maxStateMachineVersion = maxSupported; 01741 stateChanged.notify_all(); 01742 } 01743 } 01744 01745 std::unique_ptr<Storage::SnapshotFile::Writer> 01746 RaftConsensus::beginSnapshot(uint64_t lastIncludedIndex) 01747 { 01748 std::lock_guard<Mutex> lockGuard(mutex); 01749 01750 NOTICE("Creating new snapshot through log index %lu (inclusive)", 01751 lastIncludedIndex); 01752 std::unique_ptr<Storage::SnapshotFile::Writer> writer( 01753 new Storage::SnapshotFile::Writer(storageLayout)); 01754 01755 // Only committed entries may be snapshotted. 01756 // (This check relies on commitIndex monotonically increasing.) 01757 if (lastIncludedIndex > commitIndex) { 01758 PANIC("Attempted to snapshot uncommitted entries (%lu requested but " 01759 "%lu is last committed entry)", lastIncludedIndex, commitIndex); 01760 } 01761 01762 // Format version of snapshot file is 1. 01763 uint8_t version = 1; 01764 writer->writeRaw(&version, sizeof(version)); 01765 01766 // set header fields 01767 SnapshotMetadata::Header header; 01768 header.set_last_included_index(lastIncludedIndex); 01769 // Set last_included_term and last_cluster_time: 01770 if (lastIncludedIndex >= log->getLogStartIndex() && 01771 lastIncludedIndex <= log->getLastLogIndex()) { 01772 const Log::Entry& entry = log->getEntry(lastIncludedIndex); 01773 header.set_last_included_term(entry.term()); 01774 header.set_last_cluster_time(entry.cluster_time()); 01775 } else if (lastIncludedIndex == 0) { 01776 WARNING("Taking a snapshot covering no log entries"); 01777 header.set_last_included_term(0); 01778 header.set_last_cluster_time(0); 01779 } else if (lastIncludedIndex == lastSnapshotIndex) { 01780 WARNING("Taking a snapshot where we already have one, covering " 01781 "entries 1 through %lu (inclusive)", lastIncludedIndex); 01782 header.set_last_included_term(lastSnapshotTerm); 01783 header.set_last_cluster_time(lastSnapshotClusterTime); 01784 } else { 01785 WARNING("We've already discarded the entries that the state machine " 01786 "wants to snapshot. This can happen in rare cases if the " 01787 "leader already sent us a newer snapshot. We'll go ahead and " 01788 "compute the snapshot, but it'll be discarded later in " 01789 "snapshotDone(). Setting the last included term in the " 01790 "snapshot header to 0 (a bogus value)."); 01791 // If this turns out to be common, we should return NULL instead and 01792 // change the state machines to deal with that. 01793 header.set_last_included_term(0); 01794 header.set_last_cluster_time(0); 01795 } 01796 01797 // Copy the configuration as of lastIncludedIndex to the header. 01798 std::pair<uint64_t, Protocol::Raft::Configuration> c = 01799 configurationManager->getLatestConfigurationAsOf(lastIncludedIndex); 01800 if (c.first == 0) { 01801 WARNING("Taking snapshot with no configuration. " 01802 "This should have been the first thing in the log."); 01803 } else { 01804 header.set_configuration_index(c.first); 01805 *header.mutable_configuration() = c.second; 01806 } 01807 01808 // write header to file 01809 writer->writeMessage(header); 01810 return writer; 01811 } 01812 01813 void 01814 RaftConsensus::snapshotDone( 01815 uint64_t lastIncludedIndex, 01816 std::unique_ptr<Storage::SnapshotFile::Writer> writer) 01817 { 01818 std::lock_guard<Mutex> lockGuard(mutex); 01819 if (lastIncludedIndex <= lastSnapshotIndex) { 01820 NOTICE("Discarding snapshot through %lu since we already have one " 01821 "(presumably from another server) through %lu", 01822 lastIncludedIndex, lastSnapshotIndex); 01823 writer->discard(); 01824 return; 01825 } 01826 01827 // log->getEntry(lastIncludedIndex) is safe: 01828 // If the log prefix for this snapshot was truncated, that means we have a 01829 // newer snapshot (handled above). 01830 assert(lastIncludedIndex >= log->getLogStartIndex()); 01831 // We never truncate committed entries from the end of our log, and 01832 // beginSnapshot() made sure that lastIncludedIndex covers only committed 01833 // entries. 01834 assert(lastIncludedIndex <= log->getLastLogIndex()); 01835 01836 lastSnapshotBytes = writer->save(); 01837 lastSnapshotIndex = lastIncludedIndex; 01838 const Log::Entry& lastEntry = log->getEntry(lastIncludedIndex); 01839 lastSnapshotTerm = lastEntry.term(); 01840 lastSnapshotClusterTime = lastEntry.cluster_time(); 01841 01842 // It's easier to grab this configuration out of the manager again than to 01843 // carry it around after writing the header. 01844 std::pair<uint64_t, Protocol::Raft::Configuration> c = 01845 configurationManager->getLatestConfigurationAsOf(lastIncludedIndex); 01846 if (c.first == 0) { 01847 WARNING("Could not find the latest configuration as of index %lu " 01848 "(inclusive). This shouldn't happen if the snapshot was " 01849 "created with a configuration, as they should be.", 01850 lastIncludedIndex); 01851 } else { 01852 configurationManager->setSnapshot(c.first, c.second); 01853 } 01854 01855 NOTICE("Completed snapshot through log index %lu (inclusive)", 01856 lastSnapshotIndex); 01857 01858 // It may be beneficial to defer discarding entries if some followers are 01859 // a little bit slow, to avoid having to send them a snapshot when a few 01860 // entries would do the trick. Best to avoid premature optimization though. 01861 discardUnneededEntries(); 01862 } 01863 01864 void 01865 RaftConsensus::updateServerStats(Protocol::ServerStats& serverStats) const 01866 { 01867 std::lock_guard<Mutex> lockGuard(mutex); 01868 Core::Time::SteadyTimeConverter time; 01869 serverStats.clear_raft(); 01870 Protocol::ServerStats::Raft& raftStats = *serverStats.mutable_raft(); 01871 01872 raftStats.set_current_term(currentTerm); 01873 switch (state) { 01874 case State::FOLLOWER: 01875 raftStats.set_state(Protocol::ServerStats::Raft::FOLLOWER); 01876 break; 01877 case State::CANDIDATE: 01878 raftStats.set_state(Protocol::ServerStats::Raft::CANDIDATE); 01879 break; 01880 case State::LEADER: 01881 raftStats.set_state(Protocol::ServerStats::Raft::LEADER); 01882 break; 01883 } 01884 raftStats.set_commit_index(commitIndex); 01885 raftStats.set_last_log_index(log->getLastLogIndex()); 01886 raftStats.set_leader_id(leaderId); 01887 raftStats.set_voted_for(votedFor); 01888 raftStats.set_start_election_at(time.unixNanos(startElectionAt)); 01889 raftStats.set_withhold_votes_until(time.unixNanos(withholdVotesUntil)); 01890 raftStats.set_cluster_time_epoch(clusterClock.clusterTimeAtEpoch); 01891 raftStats.set_cluster_time(clusterClock.interpolate()); 01892 01893 raftStats.set_last_snapshot_index(lastSnapshotIndex); 01894 raftStats.set_last_snapshot_term(lastSnapshotTerm); 01895 raftStats.set_last_snapshot_cluster_time(lastSnapshotClusterTime); 01896 raftStats.set_last_snapshot_bytes(lastSnapshotBytes); 01897 raftStats.set_num_entries_truncated(numEntriesTruncated); 01898 raftStats.set_log_start_index(log->getLogStartIndex()); 01899 raftStats.set_log_bytes(log->getSizeBytes()); 01900 configuration->updateServerStats(serverStats, time); 01901 log->updateServerStats(serverStats); 01902 } 01903 01904 std::ostream& 01905 operator<<(std::ostream& os, const RaftConsensus& raft) 01906 { 01907 std::lock_guard<RaftConsensus::Mutex> lockGuard(raft.mutex); 01908 typedef RaftConsensus::State State; 01909 os << "server id: " << raft.serverId << std::endl; 01910 os << "term: " << raft.currentTerm << std::endl; 01911 os << "state: " << raft.state << std::endl; 01912 os << "leader: " << raft.leaderId << std::endl; 01913 os << "lastSnapshotIndex: " << raft.lastSnapshotIndex << std::endl; 01914 os << "lastSnapshotTerm: " << raft.lastSnapshotTerm << std::endl; 01915 os << "lastSnapshotClusterTime: " << raft.lastSnapshotClusterTime 01916 << std::endl; 01917 os << "commitIndex: " << raft.commitIndex << std::endl; 01918 switch (raft.state) { 01919 case State::FOLLOWER: 01920 os << "vote: "; 01921 if (raft.votedFor == 0) 01922 os << "available"; 01923 else 01924 os << "given to " << raft.votedFor; 01925 os << std::endl; 01926 break; 01927 case State::CANDIDATE: 01928 break; 01929 case State::LEADER: 01930 break; 01931 } 01932 os << *raft.log; 01933 os << *raft.configuration; 01934 return os; 01935 } 01936 01937 01938 //// RaftConsensus private methods that MUST acquire the lock 01939 01940 void 01941 RaftConsensus::stateMachineUpdaterThreadMain() 01942 { 01943 // This implementation might create many spurious entries, since this 01944 // process will append a state machine version if it hasn't appended that 01945 // same version before during this boot. That should be fine for most use 01946 // cases. If the state machine's num_redundant_advance_version_entries 01947 // server stat gets to be large, this may need to be revisited. 01948 std::unique_lock<Mutex> lockGuard(mutex); 01949 Core::ThreadId::setName("StateMachineUpdater"); 01950 uint64_t lastVersionCommitted = 0; 01951 TimePoint backoffUntil = TimePoint::min(); 01952 while (!exiting) { 01953 TimePoint now = Clock::now(); 01954 if (backoffUntil <= now && state == State::LEADER) { 01955 using RaftConsensusInternal::StateMachineVersionIntersection; 01956 StateMachineVersionIntersection s; 01957 configuration->forEach(std::ref(s)); 01958 if (s.missingCount == 0) { 01959 if (s.minVersion > s.maxVersion) { 01960 ERROR("The state machines on the %lu servers do not " 01961 "currently support a common version " 01962 "(max of mins=%u, min of maxes=%u). Will wait to " 01963 "change the state machine version for at least " 01964 "another backoff period", 01965 s.allCount, 01966 s.minVersion, 01967 s.maxVersion); 01968 backoffUntil = now + STATE_MACHINE_UPDATER_BACKOFF; 01969 } else { // s.maxVersion is the one we want 01970 if (s.maxVersion > lastVersionCommitted) { 01971 NOTICE("Appending log entry to advance state machine " 01972 "version to %u (it may be set to %u already, " 01973 "but it's hard to check that and not much " 01974 "overhead to just do it again)", 01975 s.maxVersion, 01976 s.maxVersion); 01977 Log::Entry entry; 01978 entry.set_term(currentTerm); 01979 entry.set_type(Protocol::Raft::EntryType::DATA); 01980 entry.set_cluster_time(clusterClock.leaderStamp()); 01981 Protocol::Client::StateMachineCommand::Request command; 01982 command.mutable_advance_version()-> 01983 set_requested_version(s.maxVersion); 01984 Core::Buffer cmdBuf; 01985 Core::ProtoBuf::serialize(command, cmdBuf); 01986 entry.set_data(cmdBuf.getData(), cmdBuf.getLength()); 01987 01988 std::pair<ClientResult, uint64_t> result = 01989 replicateEntry(entry, lockGuard); 01990 if (result.first == ClientResult::SUCCESS) { 01991 lastVersionCommitted = s.maxVersion; 01992 } else { 01993 using Core::StringUtil::toString; 01994 WARNING("Failed to commit entry to advance state " 01995 "machine version to version %u (%s). " 01996 "Will retry later after backoff period", 01997 s.maxVersion, 01998 toString(result.first).c_str()); 01999 backoffUntil = now + STATE_MACHINE_UPDATER_BACKOFF; 02000 } 02001 continue; 02002 } else { 02003 // We're in good shape, go back to sleep. 02004 } 02005 } 02006 } else { // missing info from at least one server 02007 // Do nothing until we have info from everyone else 02008 // (stateChanged will be notified). The backoff is here just to 02009 // avoid spamming the NOTICE message. 02010 NOTICE("Waiting to receive state machine supported version " 02011 "information from all peers (missing %lu of %lu)", 02012 s.missingCount, s.allCount); 02013 backoffUntil = now + STATE_MACHINE_UPDATER_BACKOFF; 02014 } 02015 } 02016 if (backoffUntil <= now) 02017 stateChanged.wait(lockGuard); 02018 else 02019 stateChanged.wait_until(lockGuard, backoffUntil); 02020 } 02021 NOTICE("Exiting"); 02022 } 02023 02024 void 02025 RaftConsensus::leaderDiskThreadMain() 02026 { 02027 std::unique_lock<Mutex> lockGuard(mutex); 02028 Core::ThreadId::setName("LeaderDisk"); 02029 // Each iteration of this loop syncs the log to disk once or sleeps until 02030 // that is necessary. 02031 while (!exiting) { 02032 if (state == State::LEADER && logSyncQueued) { 02033 uint64_t term = currentTerm; 02034 std::unique_ptr<Log::Sync> sync = log->takeSync(); 02035 logSyncQueued = false; 02036 leaderDiskThreadWorking = true; 02037 { 02038 Core::MutexUnlock<Mutex> unlockGuard(lockGuard); 02039 sync->wait(); 02040 // Mark this false before re-acquiring RaftConsensus lock, 02041 // since stepDown() polls on this to go false while holding the 02042 // lock. 02043 leaderDiskThreadWorking = false; 02044 } 02045 if (state == State::LEADER && currentTerm == term) { 02046 configuration->localServer->lastSyncedIndex = sync->lastIndex; 02047 advanceCommitIndex(); 02048 } 02049 log->syncComplete(std::move(sync)); 02050 continue; 02051 } 02052 stateChanged.wait(lockGuard); 02053 } 02054 } 02055 02056 void 02057 RaftConsensus::timerThreadMain() 02058 { 02059 std::unique_lock<Mutex> lockGuard(mutex); 02060 Core::ThreadId::setName("startNewElection"); 02061 while (!exiting) { 02062 if (Clock::now() >= startElectionAt) 02063 startNewElection(); 02064 stateChanged.wait_until(lockGuard, startElectionAt); 02065 } 02066 } 02067 02068 void 02069 RaftConsensus::peerThreadMain(std::shared_ptr<Peer> peer) 02070 { 02071 std::unique_lock<Mutex> lockGuard(mutex); 02072 Core::ThreadId::setName( 02073 Core::StringUtil::format("Peer(%lu)", peer->serverId)); 02074 NOTICE("Peer thread for server %lu started", peer->serverId); 02075 02076 // Each iteration of this loop issues a new RPC or sleeps on the condition 02077 // variable. 02078 while (!peer->exiting) { 02079 TimePoint now = Clock::now(); 02080 TimePoint waitUntil = TimePoint::min(); 02081 02082 if (peer->backoffUntil > now) { 02083 waitUntil = peer->backoffUntil; 02084 } else { 02085 switch (state) { 02086 // Followers don't issue RPCs. 02087 case State::FOLLOWER: 02088 waitUntil = TimePoint::max(); 02089 break; 02090 02091 // Candidates request votes. 02092 case State::CANDIDATE: 02093 if (!peer->requestVoteDone) 02094 requestVote(lockGuard, *peer); 02095 else 02096 waitUntil = TimePoint::max(); 02097 break; 02098 02099 // Leaders replicate entries and periodically send heartbeats. 02100 case State::LEADER: 02101 if (peer->getMatchIndex() < log->getLastLogIndex() || 02102 peer->nextHeartbeatTime < now) { 02103 // appendEntries delegates to installSnapshot if we 02104 // need to send a snapshot instead 02105 appendEntries(lockGuard, *peer); 02106 } else { 02107 waitUntil = peer->nextHeartbeatTime; 02108 } 02109 break; 02110 } 02111 } 02112 02113 stateChanged.wait_until(lockGuard, waitUntil); 02114 } 02115 02116 // must return immediately after this 02117 --numPeerThreads; 02118 stateChanged.notify_all(); 02119 NOTICE("Peer thread for server %lu exiting", peer->serverId); 02120 } 02121 02122 void 02123 RaftConsensus::stepDownThreadMain() 02124 { 02125 std::unique_lock<Mutex> lockGuard(mutex); 02126 Core::ThreadId::setName("stepDown"); 02127 while (true) { 02128 // Wait until this server is the leader and is not the only server in 02129 // the cluster. 02130 while (true) { 02131 if (exiting) 02132 return; 02133 if (state == State::LEADER) { 02134 // If this local server forms a quorum (it is the only server 02135 // in the configuration), we need to sleep. Without this guard, 02136 // this method would not relinquish the CPU. 02137 ++currentEpoch; 02138 if (configuration->quorumMin(&Server::getLastAckEpoch) < 02139 currentEpoch) { 02140 break; 02141 } 02142 } 02143 stateChanged.wait(lockGuard); 02144 } 02145 // Now, if an election timeout goes by without confirming leadership, 02146 // step down. The election timeout is a reasonable amount of time, 02147 // since it's about when other servers will start elections and bump 02148 // the term. 02149 TimePoint stepDownAt = Clock::now() + ELECTION_TIMEOUT; 02150 uint64_t term = currentTerm; 02151 uint64_t epoch = currentEpoch; // currentEpoch was incremented above 02152 while (true) { 02153 if (exiting) 02154 return; 02155 if (currentTerm > term) 02156 break; 02157 if (configuration->quorumMin(&Server::getLastAckEpoch) >= epoch) 02158 break; 02159 if (Clock::now() >= stepDownAt) { 02160 NOTICE("No broadcast for a timeout, stepping down from leader " 02161 "of term %lu (converting to follower in term %lu)", 02162 currentTerm, currentTerm + 1); 02163 stepDown(currentTerm + 1); 02164 break; 02165 } 02166 stateChanged.wait_until(lockGuard, stepDownAt); 02167 } 02168 } 02169 } 02170 02171 //// RaftConsensus private methods that MUST NOT acquire the lock 02172 02173 void 02174 RaftConsensus::advanceCommitIndex() 02175 { 02176 if (state != State::LEADER) { 02177 // getMatchIndex is undefined unless we're leader 02178 WARNING("advanceCommitIndex called as %s", 02179 Core::StringUtil::toString(state).c_str()); 02180 return; 02181 } 02182 02183 // calculate the largest entry ID stored on a quorum of servers 02184 uint64_t newCommitIndex = 02185 configuration->quorumMin(&Server::getMatchIndex); 02186 if (commitIndex >= newCommitIndex) 02187 return; 02188 // If we have discarded the entry, it's because we already knew it was 02189 // committed. 02190 assert(newCommitIndex >= log->getLogStartIndex()); 02191 // At least one of these entries must also be from the current term to 02192 // guarantee that no server without them can be elected. 02193 if (log->getEntry(newCommitIndex).term() != currentTerm) 02194 return; 02195 commitIndex = newCommitIndex; 02196 VERBOSE("New commitIndex: %lu", commitIndex); 02197 assert(commitIndex <= log->getLastLogIndex()); 02198 stateChanged.notify_all(); 02199 02200 if (state == State::LEADER && commitIndex >= configuration->id) { 02201 // Upon committing a configuration that excludes itself, the leader 02202 // steps down. 02203 if (!configuration->hasVote(configuration->localServer)) { 02204 NOTICE("Newly committed configuration does not include self. " 02205 "Stepping down as leader"); 02206 stepDown(currentTerm + 1); 02207 return; 02208 } 02209 02210 // Upon committing a reconfiguration (Cold,new) entry, the leader 02211 // creates the next configuration (Cnew) entry. 02212 if (configuration->state == Configuration::State::TRANSITIONAL) { 02213 Log::Entry entry; 02214 entry.set_term(currentTerm); 02215 entry.set_type(Protocol::Raft::EntryType::CONFIGURATION); 02216 entry.set_cluster_time(clusterClock.leaderStamp()); 02217 *entry.mutable_configuration()->mutable_prev_configuration() = 02218 configuration->description.next_configuration(); 02219 append({&entry}); 02220 return; 02221 } 02222 } 02223 } 02224 02225 void 02226 RaftConsensus::append(const std::vector<const Log::Entry*>& entries) 02227 { 02228 for (auto it = entries.begin(); it != entries.end(); ++it) 02229 assert((*it)->term() != 0); 02230 std::pair<uint64_t, uint64_t> range = log->append(entries); 02231 if (state == State::LEADER) { // defer log sync 02232 logSyncQueued = true; 02233 } else { // sync log now 02234 std::unique_ptr<Log::Sync> sync = log->takeSync(); 02235 sync->wait(); 02236 log->syncComplete(std::move(sync)); 02237 } 02238 uint64_t index = range.first; 02239 for (auto it = entries.begin(); it != entries.end(); ++it) { 02240 const Log::Entry& entry = **it; 02241 if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION) 02242 configurationManager->add(index, entry.configuration()); 02243 ++index; 02244 } 02245 stateChanged.notify_all(); 02246 } 02247 02248 void 02249 RaftConsensus::appendEntries(std::unique_lock<Mutex>& lockGuard, 02250 Peer& peer) 02251 { 02252 uint64_t lastLogIndex = log->getLastLogIndex(); 02253 uint64_t prevLogIndex = peer.nextIndex - 1; 02254 assert(prevLogIndex <= lastLogIndex); 02255 02256 // Don't have needed entry: send a snapshot instead. 02257 if (peer.nextIndex < log->getLogStartIndex()) { 02258 installSnapshot(lockGuard, peer); 02259 return; 02260 } 02261 02262 // Find prevLogTerm or fall back to sending a snapshot. 02263 uint64_t prevLogTerm; 02264 if (prevLogIndex >= log->getLogStartIndex()) { 02265 prevLogTerm = log->getEntry(prevLogIndex).term(); 02266 } else if (prevLogIndex == 0) { 02267 prevLogTerm = 0; 02268 } else if (prevLogIndex == lastSnapshotIndex) { 02269 prevLogTerm = lastSnapshotTerm; 02270 } else { 02271 // Don't have needed entry for prevLogTerm: send snapshot instead. 02272 installSnapshot(lockGuard, peer); 02273 return; 02274 } 02275 02276 // Build up request 02277 Protocol::Raft::AppendEntries::Request request; 02278 request.set_server_id(serverId); 02279 request.set_term(currentTerm); 02280 request.set_prev_log_term(prevLogTerm); 02281 request.set_prev_log_index(prevLogIndex); 02282 uint64_t numEntries = 0; 02283 if (!peer.suppressBulkData) 02284 numEntries = packEntries(peer.nextIndex, request); 02285 request.set_commit_index(std::min(commitIndex, prevLogIndex + numEntries)); 02286 02287 // Execute RPC 02288 Protocol::Raft::AppendEntries::Response response; 02289 TimePoint start = Clock::now(); 02290 uint64_t epoch = currentEpoch; 02291 Peer::CallStatus status = peer.callRPC( 02292 Protocol::Raft::OpCode::APPEND_ENTRIES, 02293 request, response, 02294 lockGuard); 02295 switch (status) { 02296 case Peer::CallStatus::OK: 02297 break; 02298 case Peer::CallStatus::FAILED: 02299 peer.suppressBulkData = true; 02300 peer.backoffUntil = start + RPC_FAILURE_BACKOFF; 02301 return; 02302 case Peer::CallStatus::INVALID_REQUEST: 02303 PANIC("The server's RaftService doesn't support the AppendEntries " 02304 "RPC or claims the request is malformed"); 02305 } 02306 02307 // Process response 02308 02309 if (currentTerm != request.term() || peer.exiting) { 02310 // we don't care about result of RPC 02311 return; 02312 } 02313 // Since we were leader in this term before, we must still be leader in 02314 // this term. 02315 assert(state == State::LEADER); 02316 if (response.term() > currentTerm) { 02317 NOTICE("Received AppendEntries response from server %lu in term %lu " 02318 "(this server's term was %lu)", 02319 peer.serverId, response.term(), currentTerm); 02320 stepDown(response.term()); 02321 } else { 02322 assert(response.term() == currentTerm); 02323 peer.lastAckEpoch = epoch; 02324 stateChanged.notify_all(); 02325 peer.nextHeartbeatTime = start + HEARTBEAT_PERIOD; 02326 if (response.success()) { 02327 if (peer.matchIndex > prevLogIndex + numEntries) { 02328 // Revisit this warning if we pipeline AppendEntries RPCs for 02329 // performance. 02330 WARNING("matchIndex should monotonically increase within a " 02331 "term, since servers don't forget entries. But it " 02332 "didn't."); 02333 } else { 02334 peer.matchIndex = prevLogIndex + numEntries; 02335 advanceCommitIndex(); 02336 } 02337 peer.nextIndex = peer.matchIndex + 1; 02338 peer.suppressBulkData = false; 02339 02340 if (!peer.isCaughtUp_ && 02341 peer.thisCatchUpIterationGoalId <= peer.matchIndex) { 02342 Clock::duration duration = 02343 Clock::now() - peer.thisCatchUpIterationStart; 02344 uint64_t thisCatchUpIterationMs = 02345 uint64_t(std::chrono::duration_cast< 02346 std::chrono::milliseconds>(duration).count()); 02347 if (labs(int64_t(peer.lastCatchUpIterationMs - 02348 thisCatchUpIterationMs)) * 1000L * 1000L < 02349 std::chrono::nanoseconds(ELECTION_TIMEOUT).count()) { 02350 peer.isCaughtUp_ = true; 02351 stateChanged.notify_all(); 02352 } else { 02353 peer.lastCatchUpIterationMs = thisCatchUpIterationMs; 02354 peer.thisCatchUpIterationStart = Clock::now(); 02355 peer.thisCatchUpIterationGoalId = log->getLastLogIndex(); 02356 } 02357 } 02358 } else { 02359 if (peer.nextIndex > 1) 02360 --peer.nextIndex; 02361 // A server that hasn't been around for a while might have a much 02362 // shorter log than ours. The AppendEntries reply contains the 02363 // index of its last log entry, and there's no reason for us to 02364 // set nextIndex to be more than 1 past that (that would leave a 02365 // gap, so it will always be rejected). 02366 if (response.has_last_log_index() && 02367 peer.nextIndex > response.last_log_index() + 1) { 02368 peer.nextIndex = response.last_log_index() + 1; 02369 } 02370 } 02371 } 02372 if (response.has_server_capabilities()) { 02373 auto& cap = response.server_capabilities(); 02374 if (cap.has_min_supported_state_machine_version() && 02375 cap.has_max_supported_state_machine_version()) { 02376 peer.haveStateMachineSupportedVersions = true; 02377 peer.minStateMachineVersion = Core::Util::downCast<uint16_t>( 02378 cap.min_supported_state_machine_version()); 02379 peer.maxStateMachineVersion = Core::Util::downCast<uint16_t>( 02380 cap.max_supported_state_machine_version()); 02381 stateChanged.notify_all(); 02382 } 02383 } 02384 } 02385 02386 void 02387 RaftConsensus::installSnapshot(std::unique_lock<Mutex>& lockGuard, 02388 Peer& peer) 02389 { 02390 // Build up request 02391 Protocol::Raft::InstallSnapshot::Request request; 02392 request.set_server_id(serverId); 02393 request.set_term(currentTerm); 02394 request.set_version(2); 02395 02396 // Open the latest snapshot if we haven't already. Stash a copy of the 02397 // lastSnapshotIndex that goes along with the file, since it's possible 02398 // that this will change while we're transferring chunks). 02399 if (!peer.snapshotFile) { 02400 namespace FS = Storage::FilesystemUtil; 02401 peer.snapshotFile.reset(new FS::FileContents( 02402 FS::openFile(storageLayout.snapshotDir, "snapshot", O_RDONLY))); 02403 peer.snapshotFileOffset = 0; 02404 peer.lastSnapshotIndex = lastSnapshotIndex; 02405 NOTICE("Beginning to send snapshot of %lu bytes up through index %lu " 02406 "to follower", 02407 peer.snapshotFile->getFileLength(), 02408 lastSnapshotIndex); 02409 } 02410 request.set_last_snapshot_index(peer.lastSnapshotIndex); 02411 request.set_byte_offset(peer.snapshotFileOffset); 02412 uint64_t numDataBytes = 0; 02413 if (!peer.suppressBulkData) { 02414 // The amount of data we can send is bounded by the remaining bytes in 02415 // the file and the maximum length for RPCs. 02416 numDataBytes = std::min( 02417 peer.snapshotFile->getFileLength() - peer.snapshotFileOffset, 02418 SOFT_RPC_SIZE_LIMIT); 02419 } 02420 request.set_data(peer.snapshotFile->get<char>(peer.snapshotFileOffset, 02421 numDataBytes), 02422 numDataBytes); 02423 request.set_done(peer.snapshotFileOffset + numDataBytes == 02424 peer.snapshotFile->getFileLength()); 02425 02426 // Execute RPC 02427 Protocol::Raft::InstallSnapshot::Response response; 02428 TimePoint start = Clock::now(); 02429 uint64_t epoch = currentEpoch; 02430 Peer::CallStatus status = peer.callRPC( 02431 Protocol::Raft::OpCode::INSTALL_SNAPSHOT, 02432 request, response, 02433 lockGuard); 02434 switch (status) { 02435 case Peer::CallStatus::OK: 02436 break; 02437 case Peer::CallStatus::FAILED: 02438 peer.suppressBulkData = true; 02439 peer.backoffUntil = start + RPC_FAILURE_BACKOFF; 02440 return; 02441 case Peer::CallStatus::INVALID_REQUEST: 02442 PANIC("The server's RaftService doesn't support the " 02443 "InstallSnapshot RPC or claims the request is malformed"); 02444 } 02445 02446 // Process response 02447 02448 if (currentTerm != request.term() || peer.exiting) { 02449 // we don't care about result of RPC 02450 return; 02451 } 02452 // Since we were leader in this term before, we must still be leader in 02453 // this term. 02454 assert(state == State::LEADER); 02455 if (response.term() > currentTerm) { 02456 NOTICE("Received InstallSnapshot response from server %lu in " 02457 "term %lu (this server's term was %lu)", 02458 peer.serverId, response.term(), currentTerm); 02459 stepDown(response.term()); 02460 } else { 02461 assert(response.term() == currentTerm); 02462 peer.lastAckEpoch = epoch; 02463 stateChanged.notify_all(); 02464 peer.nextHeartbeatTime = start + HEARTBEAT_PERIOD; 02465 peer.suppressBulkData = false; 02466 if (response.has_bytes_stored()) { 02467 // Normal path (since InstallSnapshot version 2). 02468 peer.snapshotFileOffset = response.bytes_stored(); 02469 } else { 02470 // This is the old path for InstallSnapshot version 1 followers 02471 // only. The leader would just assume the snapshot chunk was always 02472 // appended to the file if the terms matched. 02473 peer.snapshotFileOffset += numDataBytes; 02474 } 02475 if (peer.snapshotFileOffset == peer.snapshotFile->getFileLength()) { 02476 NOTICE("Done sending snapshot through index %lu to follower", 02477 peer.lastSnapshotIndex); 02478 peer.matchIndex = peer.lastSnapshotIndex; 02479 peer.nextIndex = peer.lastSnapshotIndex + 1; 02480 // These entries are already committed if they're in a snapshot, so 02481 // the commitIndex shouldn't advance, but let's just follow the 02482 // simple rule that bumping matchIndex should always be 02483 // followed by a call to advanceCommitIndex(): 02484 advanceCommitIndex(); 02485 peer.snapshotFile.reset(); 02486 peer.snapshotFileOffset = 0; 02487 peer.lastSnapshotIndex = 0; 02488 } 02489 } 02490 } 02491 02492 void 02493 RaftConsensus::becomeLeader() 02494 { 02495 assert(state == State::CANDIDATE); 02496 NOTICE("Now leader for term %lu (appending no-op at index %lu)", 02497 currentTerm, 02498 log->getLastLogIndex() + 1); 02499 state = State::LEADER; 02500 leaderId = serverId; 02501 printElectionState(); 02502 startElectionAt = TimePoint::max(); 02503 withholdVotesUntil = TimePoint::max(); 02504 02505 // Our local cluster time clock has been ticking ever since we got the last 02506 // log entry/snapshot. Set the clock back to when that happened, since we 02507 // don't really want to count that time (the cluster probably had no leader 02508 // for most of it). 02509 clusterClock.newEpoch(clusterClock.clusterTimeAtEpoch); 02510 02511 // The ordering is pretty important here: First set nextIndex and 02512 // matchIndex for ourselves and each follower, then append the no op. 02513 // Otherwise we'll set our localServer's last agree index too high. 02514 configuration->forEach(&Server::beginLeadership); 02515 02516 // Append a new entry so that commitment is not delayed indefinitely. 02517 // Otherwise, if the leader never gets anything to append, it will never 02518 // return to read-only operations (it can't prove that its committed index 02519 // is up-to-date). 02520 Log::Entry entry; 02521 entry.set_term(currentTerm); 02522 entry.set_type(Protocol::Raft::EntryType::NOOP); 02523 entry.set_cluster_time(clusterClock.leaderStamp()); 02524 append({&entry}); 02525 02526 // Outstanding RequestVote RPCs are no longer needed. 02527 interruptAll(); 02528 } 02529 02530 void 02531 RaftConsensus::discardUnneededEntries() 02532 { 02533 if (log->getLogStartIndex() <= lastSnapshotIndex) { 02534 NOTICE("Removing log entries through %lu (inclusive) since " 02535 "they're no longer needed", lastSnapshotIndex); 02536 log->truncatePrefix(lastSnapshotIndex + 1); 02537 configurationManager->truncatePrefix(lastSnapshotIndex + 1); 02538 stateChanged.notify_all(); 02539 if (state == State::LEADER) { // defer log sync 02540 logSyncQueued = true; 02541 } else { // sync log now 02542 std::unique_ptr<Log::Sync> sync = log->takeSync(); 02543 sync->wait(); 02544 log->syncComplete(std::move(sync)); 02545 } 02546 } 02547 } 02548 02549 uint64_t 02550 RaftConsensus::getLastLogTerm() const 02551 { 02552 uint64_t lastLogIndex = log->getLastLogIndex(); 02553 if (lastLogIndex >= log->getLogStartIndex()) { 02554 return log->getEntry(lastLogIndex).term(); 02555 } else { 02556 assert(lastLogIndex == lastSnapshotIndex); // potentially 0 02557 return lastSnapshotTerm; 02558 } 02559 } 02560 02561 void 02562 RaftConsensus::interruptAll() 02563 { 02564 stateChanged.notify_all(); 02565 // A configuration is sometimes missing for unit tests. 02566 if (configuration) 02567 configuration->forEach(&Server::interrupt); 02568 } 02569 02570 uint64_t 02571 RaftConsensus::packEntries( 02572 uint64_t nextIndex, 02573 Protocol::Raft::AppendEntries::Request& request) const 02574 { 02575 // Add as many as entries as will fit comfortably in the request. It's 02576 // easiest to add one entry at a time until the RPC gets too big, then back 02577 // the last one out. 02578 02579 // Calculating the size of the request ProtoBuf is a bit expensive, so this 02580 // estimates high, then if it reaches the size limit, corrects the estimate 02581 // and keeps going. This is a dumb algorithm but does well enough. It gets 02582 // the number of calls to request.ByteSize() down to about 15 even with 02583 // extremely small entries (10 bytes of payload data in each of 50,000 02584 // entries filling to a 1MB max). 02585 02586 // Processing 19000 entries here with 10 bytes of data each (total request 02587 // size of 1MB) still takes about 42 milliseconds on an overloaded laptop 02588 // when compiling in DEBUG mode. That's a bit slow, in case someone has 02589 // aggressive election timeouts. As a result, the total number of entries 02590 // in a request is now limited to MAX_LOG_ENTRIES_PER_REQUEST=5000, which 02591 // amortizes RPC overhead well enough anyhow. This limit will only kick in 02592 // when the entry size drops below 200 bytes, since 1M/5K=200. 02593 02594 using Core::Util::downCast; 02595 uint64_t lastIndex = std::min(log->getLastLogIndex(), 02596 nextIndex + MAX_LOG_ENTRIES_PER_REQUEST - 1); 02597 google::protobuf::RepeatedPtrField<Protocol::Raft::Entry>& requestEntries = 02598 *request.mutable_entries(); 02599 02600 uint64_t numEntries = 0; 02601 uint64_t currentSize = downCast<uint64_t>(request.ByteSize()); 02602 02603 for (uint64_t index = nextIndex; index <= lastIndex; ++index) { 02604 const Log::Entry& entry = log->getEntry(index); 02605 *requestEntries.Add() = entry; 02606 02607 // Each member of a repeated message field is encoded with a tag 02608 // and a length. We conservatively assume the tag and length will 02609 // be up to 10 bytes each (2^64), though in practice the tag is 02610 // probably one byte and the length is probably two. 02611 currentSize += uint64_t(entry.ByteSize()) + 20; 02612 02613 if (currentSize >= SOFT_RPC_SIZE_LIMIT) { 02614 // The message might be too big: calculate more exact but more 02615 // expensive size. 02616 uint64_t actualSize = downCast<uint64_t>(request.ByteSize()); 02617 assert(currentSize >= actualSize); 02618 currentSize = actualSize; 02619 if (currentSize >= SOFT_RPC_SIZE_LIMIT && numEntries > 0) { 02620 // This entry doesn't fit and we've already got some 02621 // entries to send: discard this one and stop adding more. 02622 requestEntries.RemoveLast(); 02623 break; 02624 } 02625 } 02626 // This entry fit, so we'll send it. 02627 ++numEntries; 02628 } 02629 02630 assert(numEntries == uint64_t(requestEntries.size())); 02631 return numEntries; 02632 } 02633 02634 void 02635 RaftConsensus::readSnapshot() 02636 { 02637 std::unique_ptr<Storage::SnapshotFile::Reader> reader; 02638 if (storageLayout.serverDir.fd != -1) { 02639 try { 02640 reader.reset(new Storage::SnapshotFile::Reader(storageLayout)); 02641 } catch (const std::runtime_error& e) { // file not found 02642 NOTICE("%s", e.what()); 02643 } 02644 } 02645 if (reader) { 02646 // Check that this snapshot uses format version 1 02647 uint8_t version = 0; 02648 uint64_t bytesRead = reader->readRaw(&version, sizeof(version)); 02649 if (bytesRead < 1) { 02650 PANIC("Found completely empty snapshot file (it doesn't even " 02651 "have a version field)"); 02652 } else { 02653 if (version != 1) { 02654 PANIC("Snapshot format version read was %u, but this code can " 02655 "only read version 1", 02656 version); 02657 } 02658 } 02659 02660 // load header contents 02661 SnapshotMetadata::Header header; 02662 std::string error = reader->readMessage(header); 02663 if (!error.empty()) { 02664 PANIC("Couldn't read snapshot header: %s", error.c_str()); 02665 } 02666 if (header.last_included_index() < lastSnapshotIndex) { 02667 PANIC("Trying to load a snapshot that is more stale than one this " 02668 "server loaded earlier. The earlier snapshot covers through " 02669 "log index %lu (inclusive); this one covers through log " 02670 "index %lu (inclusive)", 02671 lastSnapshotIndex, 02672 header.last_included_index()); 02673 02674 } 02675 lastSnapshotIndex = header.last_included_index(); 02676 lastSnapshotTerm = header.last_included_term(); 02677 lastSnapshotClusterTime = header.last_cluster_time(); 02678 lastSnapshotBytes = reader->getSizeBytes(); 02679 commitIndex = std::max(lastSnapshotIndex, commitIndex); 02680 02681 NOTICE("Reading snapshot which covers log entries 1 through %lu " 02682 "(inclusive)", lastSnapshotIndex); 02683 02684 // We should keep log entries if they might be needed for a quorum. So: 02685 // 1. Discard log if it is shorter than the snapshot. 02686 // 2. Discard log if its lastSnapshotIndex entry disagrees with the 02687 // lastSnapshotTerm. 02688 if (log->getLastLogIndex() < lastSnapshotIndex || 02689 (log->getLogStartIndex() <= lastSnapshotIndex && 02690 log->getEntry(lastSnapshotIndex).term() != lastSnapshotTerm)) { 02691 // The NOTICE message can be confusing if the log is empty, so 02692 // don't print it in that case. We still want to shift the log 02693 // start index, though. 02694 if (log->getLogStartIndex() <= log->getLastLogIndex()) { 02695 NOTICE("Discarding the entire log, since it's not known to be " 02696 "consistent with the snapshot that is being read"); 02697 } 02698 // Discard the entire log, setting the log start to point to the 02699 // right place. 02700 log->truncatePrefix(lastSnapshotIndex + 1); 02701 log->truncateSuffix(lastSnapshotIndex); 02702 configurationManager->truncatePrefix(lastSnapshotIndex + 1); 02703 configurationManager->truncateSuffix(lastSnapshotIndex); 02704 // Clean up resources. 02705 if (state == State::LEADER) { // defer log sync 02706 logSyncQueued = true; 02707 } else { // sync log now 02708 std::unique_ptr<Log::Sync> sync = log->takeSync(); 02709 sync->wait(); 02710 log->syncComplete(std::move(sync)); 02711 } 02712 clusterClock.newEpoch(lastSnapshotClusterTime); 02713 } 02714 02715 discardUnneededEntries(); 02716 02717 if (header.has_configuration_index() && header.has_configuration()) { 02718 configurationManager->setSnapshot(header.configuration_index(), 02719 header.configuration()); 02720 } else { 02721 WARNING("No configuration. This is unexpected, since any snapshot " 02722 "should contain a configuration (they're the first thing " 02723 "found in any log)."); 02724 } 02725 02726 stateChanged.notify_all(); 02727 } 02728 if (log->getLogStartIndex() > lastSnapshotIndex + 1) { 02729 PANIC("The newest snapshot on this server covers up through log index " 02730 "%lu (inclusive), but its log starts at index %lu. This " 02731 "should never happen and indicates a corrupt disk state. If you " 02732 "want this server to participate in your cluster, you should " 02733 "back up all of its state, delete it, and add the server back " 02734 "as a new cluster member using the reconfiguration mechanism.", 02735 lastSnapshotIndex, log->getLogStartIndex()); 02736 } 02737 02738 snapshotReader = std::move(reader); 02739 } 02740 02741 std::pair<RaftConsensus::ClientResult, uint64_t> 02742 RaftConsensus::replicateEntry(Log::Entry& entry, 02743 std::unique_lock<Mutex>& lockGuard) 02744 { 02745 if (state == State::LEADER) { 02746 entry.set_term(currentTerm); 02747 entry.set_cluster_time(clusterClock.leaderStamp()); 02748 append({&entry}); 02749 uint64_t index = log->getLastLogIndex(); 02750 while (!exiting && currentTerm == entry.term()) { 02751 if (commitIndex >= index) { 02752 VERBOSE("replicate succeeded"); 02753 return {ClientResult::SUCCESS, index}; 02754 } 02755 stateChanged.wait(lockGuard); 02756 } 02757 } 02758 return {ClientResult::NOT_LEADER, 0}; 02759 } 02760 02761 void 02762 RaftConsensus::requestVote(std::unique_lock<Mutex>& lockGuard, Peer& peer) 02763 { 02764 Protocol::Raft::RequestVote::Request request; 02765 request.set_server_id(serverId); 02766 request.set_term(currentTerm); 02767 request.set_last_log_term(getLastLogTerm()); 02768 request.set_last_log_index(log->getLastLogIndex()); 02769 02770 Protocol::Raft::RequestVote::Response response; 02771 VERBOSE("requestVote start"); 02772 TimePoint start = Clock::now(); 02773 uint64_t epoch = currentEpoch; 02774 Peer::CallStatus status = peer.callRPC( 02775 Protocol::Raft::OpCode::REQUEST_VOTE, 02776 request, response, 02777 lockGuard); 02778 VERBOSE("requestVote done"); 02779 switch (status) { 02780 case Peer::CallStatus::OK: 02781 break; 02782 case Peer::CallStatus::FAILED: 02783 peer.suppressBulkData = true; 02784 peer.backoffUntil = start + RPC_FAILURE_BACKOFF; 02785 return; 02786 case Peer::CallStatus::INVALID_REQUEST: 02787 PANIC("The server's RaftService doesn't support the RequestVote " 02788 "RPC or claims the request is malformed"); 02789 } 02790 02791 if (currentTerm != request.term() || state != State::CANDIDATE || 02792 peer.exiting) { 02793 VERBOSE("ignore RPC result"); 02794 // we don't care about result of RPC 02795 return; 02796 } 02797 02798 if (response.term() > currentTerm) { 02799 NOTICE("Received RequestVote response from server %lu in " 02800 "term %lu (this server's term was %lu)", 02801 peer.serverId, response.term(), currentTerm); 02802 stepDown(response.term()); 02803 } else { 02804 peer.requestVoteDone = true; 02805 peer.lastAckEpoch = epoch; 02806 stateChanged.notify_all(); 02807 02808 if (response.granted()) { 02809 peer.haveVote_ = true; 02810 NOTICE("Got vote from server %lu for term %lu", 02811 peer.serverId, currentTerm); 02812 if (configuration->quorumAll(&Server::haveVote)) 02813 becomeLeader(); 02814 } else { 02815 NOTICE("Vote denied by server %lu for term %lu", 02816 peer.serverId, currentTerm); 02817 } 02818 } 02819 } 02820 02821 void 02822 RaftConsensus::setElectionTimer() 02823 { 02824 std::chrono::nanoseconds duration( 02825 Core::Random::randomRange( 02826 uint64_t(std::chrono::nanoseconds(ELECTION_TIMEOUT).count()), 02827 uint64_t(std::chrono::nanoseconds(ELECTION_TIMEOUT).count()) * 2)); 02828 VERBOSE("Will become candidate in %s", 02829 Core::StringUtil::toString(duration).c_str()); 02830 startElectionAt = Clock::now() + duration; 02831 stateChanged.notify_all(); 02832 } 02833 02834 void 02835 RaftConsensus::printElectionState() const 02836 { 02837 const char* s = NULL; 02838 switch (state) { 02839 case State::FOLLOWER: 02840 s = "FOLLOWER, "; 02841 break; 02842 case State::CANDIDATE: 02843 s = "CANDIDATE,"; 02844 break; 02845 case State::LEADER: 02846 s = "LEADER, "; 02847 break; 02848 } 02849 NOTICE("server=%lu, term=%lu, state=%s leader=%lu, vote=%lu", 02850 serverId, 02851 currentTerm, 02852 s, 02853 leaderId, 02854 votedFor); 02855 } 02856 02857 void 02858 RaftConsensus::startNewElection() 02859 { 02860 if (configuration->id == 0) { 02861 // Don't have a configuration: go back to sleep. 02862 setElectionTimer(); 02863 return; 02864 } 02865 02866 if (commitIndex >= configuration->id && 02867 !configuration->hasVote(configuration->localServer)) { 02868 // we are not in the latest configuration, do not start an election 02869 setElectionTimer(); 02870 return; 02871 } 02872 02873 if (leaderId > 0) { 02874 NOTICE("Running for election in term %lu " 02875 "(haven't heard from leader %lu lately)", 02876 currentTerm + 1, 02877 leaderId); 02878 } else if (state == State::CANDIDATE) { 02879 NOTICE("Running for election in term %lu " 02880 "(previous candidacy for term %lu timed out)", 02881 currentTerm + 1, 02882 currentTerm); 02883 } else { 02884 NOTICE("Running for election in term %lu", 02885 currentTerm + 1); 02886 } 02887 ++currentTerm; 02888 state = State::CANDIDATE; 02889 leaderId = 0; 02890 votedFor = serverId; 02891 printElectionState(); 02892 setElectionTimer(); 02893 configuration->forEach(&Server::beginRequestVote); 02894 if (snapshotWriter) { 02895 snapshotWriter->discard(); 02896 snapshotWriter.reset(); 02897 } 02898 updateLogMetadata(); 02899 interruptAll(); 02900 02901 // if we're the only server, this election is already done 02902 if (configuration->quorumAll(&Server::haveVote)) 02903 becomeLeader(); 02904 } 02905 02906 void 02907 RaftConsensus::stepDown(uint64_t newTerm) 02908 { 02909 assert(currentTerm <= newTerm); 02910 if (currentTerm < newTerm) { 02911 VERBOSE("stepDown(%lu)", newTerm); 02912 currentTerm = newTerm; 02913 leaderId = 0; 02914 votedFor = 0; 02915 updateLogMetadata(); 02916 configuration->resetStagingServers(); 02917 if (snapshotWriter) { 02918 snapshotWriter->discard(); 02919 snapshotWriter.reset(); 02920 } 02921 state = State::FOLLOWER; 02922 printElectionState(); 02923 } else { 02924 if (state != State::FOLLOWER) { 02925 state = State::FOLLOWER; 02926 printElectionState(); 02927 } 02928 } 02929 if (startElectionAt == TimePoint::max()) // was leader 02930 setElectionTimer(); 02931 if (withholdVotesUntil == TimePoint::max()) // was leader 02932 withholdVotesUntil = TimePoint::min(); 02933 interruptAll(); 02934 02935 // If the leader disk thread is currently writing to disk, wait for it to 02936 // finish. We poll here because we don't want to release the lock (this 02937 // server would then believe its writes have been flushed when they 02938 // haven't). 02939 while (leaderDiskThreadWorking) 02940 usleep(500); 02941 02942 // If a recent append has been queued, empty it here. Do this after waiting 02943 // for leaderDiskThread to preserve FIFO ordering of Log::Sync objects. 02944 // Don't bother updating the localServer's lastSyncedIndex, since it 02945 // doesn't matter for non-leaders. 02946 if (logSyncQueued) { 02947 std::unique_ptr<Log::Sync> sync = log->takeSync(); 02948 sync->wait(); 02949 log->syncComplete(std::move(sync)); 02950 logSyncQueued = false; 02951 } 02952 } 02953 02954 void 02955 RaftConsensus::updateLogMetadata() 02956 { 02957 log->metadata.set_current_term(currentTerm); 02958 log->metadata.set_voted_for(votedFor); 02959 VERBOSE("updateMetadata start"); 02960 log->updateMetadata(); 02961 VERBOSE("updateMetadata end"); 02962 } 02963 02964 bool 02965 RaftConsensus::upToDateLeader(std::unique_lock<Mutex>& lockGuard) const 02966 { 02967 ++currentEpoch; 02968 uint64_t epoch = currentEpoch; 02969 // schedule a heartbeat now so that this returns quickly 02970 configuration->forEach(&Server::scheduleHeartbeat); 02971 stateChanged.notify_all(); 02972 while (true) { 02973 if (exiting || state != State::LEADER) 02974 return false; 02975 if (configuration->quorumMin(&Server::getLastAckEpoch) >= epoch) { 02976 // So we know we're the current leader, but do we have an 02977 // up-to-date commitIndex yet? What we'd like to check is whether 02978 // the entry's term at commitIndex matches our currentTerm, but 02979 // snapshots mean that we may not have the entry in our log. Since 02980 // commitIndex >= lastSnapshotIndex, we split into two cases: 02981 uint64_t commitTerm; 02982 if (commitIndex == lastSnapshotIndex) { 02983 commitTerm = lastSnapshotTerm; 02984 } else { 02985 assert(commitIndex > lastSnapshotIndex); 02986 assert(commitIndex >= log->getLogStartIndex()); 02987 assert(commitIndex <= log->getLastLogIndex()); 02988 commitTerm = log->getEntry(commitIndex).term(); 02989 } 02990 if (commitTerm == currentTerm) 02991 return true; 02992 } 02993 stateChanged.wait(lockGuard); 02994 } 02995 } 02996 02997 std::ostream& 02998 operator<<(std::ostream& os, RaftConsensus::ClientResult clientResult) 02999 { 03000 typedef RaftConsensus::ClientResult ClientResult; 03001 switch (clientResult) { 03002 case ClientResult::SUCCESS: 03003 os << "ClientResult::SUCCESS"; 03004 break; 03005 case ClientResult::FAIL: 03006 os << "ClientResult::FAIL"; 03007 break; 03008 case ClientResult::RETRY: 03009 os << "ClientResult::RETRY"; 03010 break; 03011 case ClientResult::NOT_LEADER: 03012 os << "ClientResult::NOT_LEADER"; 03013 break; 03014 } 03015 return os; 03016 } 03017 03018 std::ostream& 03019 operator<<(std::ostream& os, RaftConsensus::State state) 03020 { 03021 typedef RaftConsensus::State State; 03022 switch (state) { 03023 case State::FOLLOWER: 03024 os << "State::FOLLOWER"; 03025 break; 03026 case State::CANDIDATE: 03027 os << "State::CANDIDATE"; 03028 break; 03029 case State::LEADER: 03030 os << "State::LEADER"; 03031 break; 03032 } 03033 return os; 03034 } 03035 03036 } // namespace LogCabin::Server 03037 } // namespace LogCabin