LogCabin
Server/RaftConsensus.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  * Copyright (c) 2015 Scale Computing
00004  *
00005  * Permission to use, copy, modify, and distribute this software for any
00006  * purpose with or without fee is hereby granted, provided that the above
00007  * copyright notice and this permission notice appear in all copies.
00008  *
00009  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00010  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00011  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00012  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00013  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00014  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00015  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00016  */
00017 
00018 #include <algorithm>
00019 #include <fcntl.h>
00020 #include <limits>
00021 #include <string.h>
00022 #include <sys/file.h>
00023 #include <sys/stat.h>
00024 #include <time.h>
00025 #include <unistd.h>
00026 
00027 #include "build/Protocol/Raft.pb.h"
00028 #include "build/Server/SnapshotMetadata.pb.h"
00029 #include "Core/Buffer.h"
00030 #include "Core/Debug.h"
00031 #include "Core/ProtoBuf.h"
00032 #include "Core/Random.h"
00033 #include "Core/StringUtil.h"
00034 #include "Core/ThreadId.h"
00035 #include "Core/Util.h"
00036 #include "Protocol/Common.h"
00037 #include "RPC/ClientRPC.h"
00038 #include "RPC/ClientSession.h"
00039 #include "RPC/ServerRPC.h"
00040 #include "Server/RaftConsensus.h"
00041 #include "Server/Globals.h"
00042 #include "Storage/LogFactory.h"
00043 
00044 namespace LogCabin {
00045 namespace Server {
00046 
00047 typedef Storage::Log Log;
00048 
00049 namespace RaftConsensusInternal {
00050 
00051 bool startThreads = true;
00052 
00053 ////////// Server //////////
00054 
00055 Server::Server(uint64_t serverId)
00056     : serverId(serverId)
00057     , addresses()
00058     , haveStateMachineSupportedVersions(false)
00059     , minStateMachineVersion(std::numeric_limits<uint16_t>::max())
00060     , maxStateMachineVersion(0)
00061     , gcFlag(false)
00062 {
00063 }
00064 
00065 Server::~Server()
00066 {
00067 }
00068 
00069 std::ostream&
00070 operator<<(std::ostream& os, const Server& server)
00071 {
00072     return server.dumpToStream(os);
00073 }
00074 
00075 ////////// LocalServer //////////
00076 
00077 LocalServer::LocalServer(uint64_t serverId, RaftConsensus& consensus)
00078     : Server(serverId)
00079     , consensus(consensus)
00080     , lastSyncedIndex(0)
00081 {
00082 }
00083 
00084 LocalServer::~LocalServer()
00085 {
00086 }
00087 
00088 void
00089 LocalServer::beginRequestVote()
00090 {
00091 }
00092 
00093 void
00094 LocalServer::beginLeadership()
00095 {
00096     lastSyncedIndex = consensus.log->getLastLogIndex();
00097 }
00098 
00099 void
00100 LocalServer::exit()
00101 {
00102 }
00103 
00104 uint64_t
00105 LocalServer::getLastAckEpoch() const
00106 {
00107     return consensus.currentEpoch;
00108 }
00109 
00110 uint64_t
00111 LocalServer::getMatchIndex() const
00112 {
00113     return lastSyncedIndex;
00114 }
00115 
00116 bool
00117 LocalServer::haveVote() const
00118 {
00119     return (consensus.votedFor == serverId);
00120 }
00121 
00122 void
00123 LocalServer::interrupt()
00124 {
00125 }
00126 
00127 bool
00128 LocalServer::isCaughtUp() const
00129 {
00130     return true;
00131 }
00132 
00133 void
00134 LocalServer::scheduleHeartbeat()
00135 {
00136 }
00137 
00138 std::ostream&
00139 LocalServer::dumpToStream(std::ostream& os) const
00140 {
00141     // Nothing interesting to dump.
00142     return os;
00143 }
00144 
00145 void
00146 LocalServer::updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats,
00147                              Core::Time::SteadyTimeConverter& time) const
00148 {
00149     switch (consensus.state) {
00150         case RaftConsensus::State::FOLLOWER:
00151             break;
00152         case RaftConsensus::State::CANDIDATE:
00153             break;
00154         case RaftConsensus::State::LEADER:
00155             peerStats.set_last_synced_index(lastSyncedIndex);
00156             break;
00157     }
00158 }
00159 
00160 ////////// Peer //////////
00161 
00162 Peer::Peer(uint64_t serverId, RaftConsensus& consensus)
00163     : Server(serverId)
00164     , consensus(consensus)
00165     , eventLoop(consensus.globals.eventLoop)
00166     , exiting(false)
00167     , requestVoteDone(false)
00168     , haveVote_(false)
00169     , suppressBulkData(true)
00170       // It's somewhat important to set nextIndex correctly here, since peers
00171       // that are added to the configuration won't go through beginLeadership()
00172       // on the current leader. I say somewhat important because, if nextIndex
00173       // is set incorrectly, it's self-correcting, so it's just a potential
00174       // performance issue.
00175     , nextIndex(consensus.log->getLastLogIndex() + 1)
00176     , matchIndex(0)
00177     , lastAckEpoch(0)
00178     , nextHeartbeatTime(TimePoint::min())
00179     , backoffUntil(TimePoint::min())
00180     , rpcFailuresSinceLastWarning(0)
00181     , lastCatchUpIterationMs(~0UL)
00182     , thisCatchUpIterationStart(Clock::now())
00183     , thisCatchUpIterationGoalId(~0UL)
00184     , isCaughtUp_(false)
00185     , snapshotFile()
00186     , snapshotFileOffset(0)
00187     , lastSnapshotIndex(0)
00188     , session()
00189     , rpc()
00190 {
00191 }
00192 
00193 Peer::~Peer()
00194 {
00195 }
00196 
00197 void
00198 Peer::beginRequestVote()
00199 {
00200     requestVoteDone = false;
00201     haveVote_ = false;
00202 }
00203 
00204 void
00205 Peer::beginLeadership()
00206 {
00207     nextIndex = consensus.log->getLastLogIndex() + 1;
00208     matchIndex = 0;
00209     suppressBulkData = true;
00210     snapshotFile.reset();
00211     snapshotFileOffset = 0;
00212     lastSnapshotIndex = 0;
00213 }
00214 
00215 void
00216 Peer::exit()
00217 {
00218     NOTICE("Flagging peer %lu to exit", serverId);
00219     exiting = true;
00220     // Usually telling peers to exit is paired with an interruptAll(). That can
00221     // be error-prone, however, when you're removing servers from the
00222     // configuration (if the code removes servers and then calls
00223     // interruptAll(), it won't interrupt() the removed servers). So it's
00224     // better to just interrupt() here as well. See
00225     // https://github.com/logcabin/logcabin/issues/183
00226     interrupt();
00227 }
00228 
00229 uint64_t
00230 Peer::getLastAckEpoch() const
00231 {
00232     return lastAckEpoch;
00233 }
00234 
00235 uint64_t
00236 Peer::getMatchIndex() const
00237 {
00238     return matchIndex;
00239 }
00240 
00241 bool
00242 Peer::haveVote() const
00243 {
00244     return haveVote_;
00245 }
00246 
00247 void
00248 Peer::interrupt()
00249 {
00250     rpc.cancel();
00251 }
00252 
00253 bool
00254 Peer::isCaughtUp() const
00255 {
00256     return isCaughtUp_;
00257 }
00258 
00259 void
00260 Peer::scheduleHeartbeat()
00261 {
00262     nextHeartbeatTime = Clock::now();
00263 }
00264 
00265 Peer::CallStatus
00266 Peer::callRPC(Protocol::Raft::OpCode opCode,
00267               const google::protobuf::Message& request,
00268               google::protobuf::Message& response,
00269               std::unique_lock<Mutex>& lockGuard)
00270 {
00271     typedef RPC::ClientRPC::Status RPCStatus;
00272     rpc = RPC::ClientRPC(getSession(lockGuard),
00273                          Protocol::Common::ServiceId::RAFT_SERVICE,
00274                          /* serviceSpecificErrorVersion = */ 0,
00275                          opCode,
00276                          request);
00277     // release lock for concurrency
00278     Core::MutexUnlock<Mutex> unlockGuard(lockGuard);
00279     switch (rpc.waitForReply(&response, NULL, TimePoint::max())) {
00280         case RPCStatus::OK:
00281             if (rpcFailuresSinceLastWarning > 0) {
00282                 WARNING("RPC to server succeeded after %lu failures",
00283                         rpcFailuresSinceLastWarning);
00284                 rpcFailuresSinceLastWarning = 0;
00285             }
00286             return CallStatus::OK;
00287         case RPCStatus::SERVICE_SPECIFIC_ERROR:
00288             PANIC("unexpected service-specific error");
00289         case RPCStatus::TIMEOUT:
00290             PANIC("unexpected RPC timeout");
00291         case RPCStatus::RPC_FAILED:
00292             ++rpcFailuresSinceLastWarning;
00293             if (rpcFailuresSinceLastWarning == 1) {
00294                 WARNING("RPC to server failed: %s",
00295                         rpc.getErrorMessage().c_str());
00296             } else if (rpcFailuresSinceLastWarning % 100 == 0) {
00297                 WARNING("Last %lu RPCs to server failed. This failure: %s",
00298                         rpcFailuresSinceLastWarning,
00299                         rpc.getErrorMessage().c_str());
00300             }
00301             return CallStatus::FAILED;
00302         case RPCStatus::RPC_CANCELED:
00303             return CallStatus::FAILED;
00304         case RPCStatus::INVALID_SERVICE:
00305             PANIC("The server isn't running the RaftService");
00306         case RPCStatus::INVALID_REQUEST:
00307             return CallStatus::INVALID_REQUEST;
00308     }
00309     PANIC("Unexpected RPC status");
00310 }
00311 
00312 void
00313 Peer::startThread(std::shared_ptr<Peer> self)
00314 {
00315     thisCatchUpIterationStart = Clock::now();
00316     thisCatchUpIterationGoalId = consensus.log->getLastLogIndex();
00317     ++consensus.numPeerThreads;
00318     NOTICE("Starting peer thread for server %lu", serverId);
00319     std::thread(&RaftConsensus::peerThreadMain, &consensus, self).detach();
00320 }
00321 
00322 std::shared_ptr<RPC::ClientSession>
00323 Peer::getSession(std::unique_lock<Mutex>& lockGuard)
00324 {
00325     if (!session || !session->getErrorMessage().empty()) {
00326         // Unfortunately, creating a session isn't currently interruptible, so
00327         // we use a timeout to prevent the server from hanging forever if some
00328         // peer thread happens to be creating a session when it's told to exit.
00329         // See https://github.com/logcabin/logcabin/issues/183 for more detail.
00330         TimePoint timeout = Clock::now() + consensus.ELECTION_TIMEOUT;
00331         // release lock for concurrency
00332         Core::MutexUnlock<Mutex> unlockGuard(lockGuard);
00333         RPC::Address target(addresses, Protocol::Common::DEFAULT_PORT);
00334         target.refresh(timeout);
00335         Client::SessionManager::ServerId peerId(serverId);
00336         session = consensus.sessionManager.createSession(
00337             target,
00338             timeout,
00339             &consensus.globals.clusterUUID,
00340             &peerId);
00341     }
00342     return session;
00343 }
00344 
00345 std::ostream&
00346 Peer::dumpToStream(std::ostream& os) const
00347 {
00348     os << "Peer " << serverId << std::endl;
00349     os << "addresses: " << addresses << std::endl;
00350     switch (consensus.state) {
00351         case RaftConsensus::State::FOLLOWER:
00352             break;
00353         case RaftConsensus::State::CANDIDATE:
00354             os << "vote: ";
00355             if (requestVoteDone) {
00356                 if (haveVote_)
00357                     os << "granted";
00358                 else
00359                     os << "not granted";
00360             } else {
00361                 os << "no response";
00362             }
00363             os << std::endl;
00364             break;
00365         case RaftConsensus::State::LEADER:
00366             os << "suppressBulkData: " << suppressBulkData << std::endl;
00367             os << "nextIndex: " << nextIndex << std::endl;
00368             os << "matchIndex: " << matchIndex << std::endl;
00369             break;
00370     }
00371     return os;
00372 }
00373 
00374 void
00375 Peer::updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats,
00376                       Core::Time::SteadyTimeConverter& time) const
00377 {
00378     switch (consensus.state) {
00379         case RaftConsensus::State::FOLLOWER:
00380             break;
00381         case RaftConsensus::State::CANDIDATE:
00382             break;
00383         case RaftConsensus::State::LEADER:
00384             peerStats.set_suppress_bulk_data(suppressBulkData);
00385             peerStats.set_next_index(nextIndex);
00386             peerStats.set_last_agree_index(matchIndex);
00387             peerStats.set_is_caught_up(isCaughtUp_);
00388             peerStats.set_next_heartbeat_at(time.unixNanos(nextHeartbeatTime));
00389             break;
00390     }
00391 
00392     switch (consensus.state) {
00393         case RaftConsensus::State::FOLLOWER:
00394             break;
00395         case RaftConsensus::State::CANDIDATE: // fallthrough
00396         case RaftConsensus::State::LEADER:
00397             peerStats.set_request_vote_done(requestVoteDone);
00398             peerStats.set_have_vote(haveVote_);
00399             peerStats.set_backoff_until(time.unixNanos(backoffUntil));
00400             break;
00401     }
00402 }
00403 
00404 ////////// Configuration::SimpleConfiguration //////////
00405 
00406 Configuration::SimpleConfiguration::SimpleConfiguration()
00407     : servers()
00408 {
00409 }
00410 
00411 Configuration::SimpleConfiguration::~SimpleConfiguration()
00412 {
00413 }
00414 
00415 bool
00416 Configuration::SimpleConfiguration::all(const Predicate& predicate) const
00417 {
00418     for (auto it = servers.begin(); it != servers.end(); ++it) {
00419         if (!predicate(**it))
00420             return false;
00421     }
00422     return true;
00423 }
00424 
00425 bool
00426 Configuration::SimpleConfiguration::contains(std::shared_ptr<Server> server)
00427                                                                           const
00428 {
00429     for (auto it = servers.begin(); it != servers.end(); ++it) {
00430         if (*it == server)
00431             return true;
00432     }
00433     return false;
00434 }
00435 
00436 void
00437 Configuration::SimpleConfiguration::forEach(const SideEffect& sideEffect)
00438 {
00439     for (auto it = servers.begin(); it != servers.end(); ++it)
00440         sideEffect(**it);
00441 }
00442 
00443 uint64_t
00444 Configuration::SimpleConfiguration::min(const GetValue& getValue) const
00445 {
00446     if (servers.empty())
00447         return 0;
00448     uint64_t smallest = ~0UL;
00449     for (auto it = servers.begin(); it != servers.end(); ++it)
00450         smallest = std::min(smallest, getValue(**it));
00451     return smallest;
00452 }
00453 
00454 bool
00455 Configuration::SimpleConfiguration::quorumAll(const Predicate& predicate) const
00456 {
00457     if (servers.empty())
00458         return true;
00459     uint64_t count = 0;
00460     for (auto it = servers.begin(); it != servers.end(); ++it)
00461         if (predicate(**it))
00462             ++count;
00463     return (count >= servers.size() / 2 + 1);
00464 }
00465 
00466 uint64_t
00467 Configuration::SimpleConfiguration::quorumMin(const GetValue& getValue) const
00468 {
00469     if (servers.empty())
00470         return 0;
00471     std::vector<uint64_t> values;
00472     for (auto it = servers.begin(); it != servers.end(); ++it)
00473         values.push_back(getValue(**it));
00474     std::sort(values.begin(), values.end());
00475     return values.at((values.size() - 1)/ 2);
00476 }
00477 
00478 ////////// Configuration //////////
00479 
00480 Configuration::Configuration(uint64_t serverId, RaftConsensus& consensus)
00481     : consensus(consensus)
00482     , knownServers()
00483     , localServer()
00484     , state(State::BLANK)
00485     , id(0)
00486     , description()
00487     , oldServers()
00488     , newServers()
00489 {
00490     localServer.reset(new LocalServer(serverId, consensus));
00491     knownServers[serverId] = localServer;
00492 }
00493 
00494 Configuration::~Configuration()
00495 {
00496 }
00497 
00498 void
00499 Configuration::forEach(const SideEffect& sideEffect)
00500 {
00501     for (auto it = knownServers.begin(); it != knownServers.end(); ++it)
00502         sideEffect(*it->second);
00503 }
00504 
00505 bool
00506 Configuration::hasVote(std::shared_ptr<Server> server) const
00507 {
00508     if (state == State::TRANSITIONAL) {
00509         return (oldServers.contains(server) ||
00510                 newServers.contains(server));
00511     } else {
00512         return oldServers.contains(server);
00513     }
00514 }
00515 
00516 std::string
00517 Configuration::lookupAddress(uint64_t serverId) const
00518 {
00519     auto it = knownServers.find(serverId);
00520     if (it != knownServers.end())
00521         return it->second->addresses;
00522     return "";
00523 }
00524 
00525 bool
00526 Configuration::quorumAll(const Predicate& predicate) const
00527 {
00528     if (state == State::TRANSITIONAL) {
00529         return (oldServers.quorumAll(predicate) &&
00530                 newServers.quorumAll(predicate));
00531     } else {
00532         return oldServers.quorumAll(predicate);
00533     }
00534 }
00535 
00536 uint64_t
00537 Configuration::quorumMin(const GetValue& getValue) const
00538 {
00539     if (state == State::TRANSITIONAL) {
00540         return std::min(oldServers.quorumMin(getValue),
00541                         newServers.quorumMin(getValue));
00542     } else {
00543         return oldServers.quorumMin(getValue);
00544     }
00545 }
00546 
00547 void
00548 Configuration::resetStagingServers()
00549 {
00550     if (state == State::STAGING) {
00551         // staging servers could have changed other servers' addresses, so roll
00552         // back to old description with old addresses
00553         setConfiguration(id, description);
00554     }
00555 }
00556 
00557 namespace {
00558 void setGCFlag(Server& server)
00559 {
00560     server.gcFlag = true;
00561 }
00562 } // anonymous namespace
00563 
00564 void
00565 Configuration::reset()
00566 {
00567     NOTICE("Resetting to blank configuration");
00568     state = State::BLANK;
00569     id = 0;
00570     description = {};
00571     oldServers.servers.clear();
00572     newServers.servers.clear();
00573     for (auto it = knownServers.begin(); it != knownServers.end(); ++it)
00574         it->second->exit();
00575     knownServers.clear();
00576     knownServers[localServer->serverId] = localServer;
00577 }
00578 
00579 void
00580 Configuration::setConfiguration(
00581         uint64_t newId,
00582         const Protocol::Raft::Configuration& newDescription)
00583 {
00584     NOTICE("Activating configuration %lu:\n%s", newId,
00585            Core::ProtoBuf::dumpString(newDescription).c_str());
00586 
00587     if (newDescription.next_configuration().servers().size() == 0)
00588         state = State::STABLE;
00589     else
00590         state = State::TRANSITIONAL;
00591     id = newId;
00592     description = newDescription;
00593     oldServers.servers.clear();
00594     newServers.servers.clear();
00595 
00596     // Build up the list of old servers
00597     for (auto confIt = description.prev_configuration().servers().begin();
00598          confIt != description.prev_configuration().servers().end();
00599          ++confIt) {
00600         std::shared_ptr<Server> server = getServer(confIt->server_id());
00601         server->addresses = confIt->addresses();
00602         oldServers.servers.push_back(server);
00603     }
00604 
00605     // Build up the list of new servers
00606     for (auto confIt = description.next_configuration().servers().begin();
00607          confIt != description.next_configuration().servers().end();
00608          ++confIt) {
00609         std::shared_ptr<Server> server = getServer(confIt->server_id());
00610         server->addresses = confIt->addresses();
00611         newServers.servers.push_back(server);
00612     }
00613 
00614     // Servers not in the current configuration need to be told to exit
00615     setGCFlag(*localServer);
00616     oldServers.forEach(setGCFlag);
00617     newServers.forEach(setGCFlag);
00618     auto it = knownServers.begin();
00619     while (it != knownServers.end()) {
00620         std::shared_ptr<Server> server = it->second;
00621         if (!server->gcFlag) {
00622             server->exit();
00623             it = knownServers.erase(it);
00624         } else {
00625             server->gcFlag = false; // clear flag for next time
00626             ++it;
00627         }
00628     }
00629 }
00630 
00631 void
00632 Configuration::setStagingServers(
00633         const Protocol::Raft::SimpleConfiguration& stagingServers)
00634 {
00635     assert(state == State::STABLE);
00636     state = State::STAGING;
00637     for (auto it = stagingServers.servers().begin();
00638          it != stagingServers.servers().end();
00639          ++it) {
00640         std::shared_ptr<Server> server = getServer(it->server_id());
00641         server->addresses = it->addresses();
00642         newServers.servers.push_back(server);
00643     }
00644 }
00645 
00646 bool
00647 Configuration::stagingAll(const Predicate& predicate) const
00648 {
00649     if (state == State::STAGING)
00650         return newServers.all(predicate);
00651     else
00652         return true;
00653 }
00654 
00655 uint64_t
00656 Configuration::stagingMin(const GetValue& getValue) const
00657 {
00658     if (state == State::STAGING)
00659         return newServers.min(getValue);
00660     else
00661         return 0;
00662 }
00663 
00664 void
00665 Configuration::updateServerStats(Protocol::ServerStats& serverStats,
00666                                  Core::Time::SteadyTimeConverter& time) const
00667 {
00668     for (auto it = knownServers.begin();
00669          it != knownServers.end();
00670          ++it) {
00671         Protocol::ServerStats::Raft::Peer& peerStats =
00672             *serverStats.mutable_raft()->add_peer();
00673         peerStats.set_server_id(it->first);
00674         const ServerRef& peer = it->second;
00675         peerStats.set_addresses(peer->addresses);
00676         peerStats.set_old_member(oldServers.contains(peer));
00677         peerStats.set_new_member(state == State::TRANSITIONAL &&
00678                                  newServers.contains(peer));
00679         peerStats.set_staging_member(state == State::STAGING &&
00680                                      newServers.contains(peer));
00681         peer->updatePeerStats(peerStats, time);
00682     }
00683 }
00684 
00685 std::ostream&
00686 operator<<(std::ostream& os, Configuration::State state)
00687 {
00688     typedef Configuration::State State;
00689     switch (state) {
00690         case State::BLANK:
00691             os << "State::BLANK";
00692             break;
00693         case State::STABLE:
00694             os << "State::STABLE";
00695             break;
00696         case State::STAGING:
00697             os << "State::STAGING";
00698             break;
00699         case State::TRANSITIONAL:
00700             os << "State::TRANSITIONAL";
00701             break;
00702     }
00703     return os;
00704 }
00705 
00706 std::ostream&
00707 operator<<(std::ostream& os, const Configuration& configuration)
00708 {
00709     os << "Configuration: {" << std::endl;
00710     os << "  state: " << configuration.state << std::endl;
00711     os << "  id: " << configuration.id << std::endl;
00712     os << "  description: " << std::endl;
00713     os << Core::ProtoBuf::dumpString(configuration.description);
00714     os << "}" << std::endl;
00715     for (auto it = configuration.knownServers.begin();
00716          it != configuration.knownServers.end();
00717          ++it) {
00718         os << *it->second;
00719     }
00720     return os;
00721 }
00722 
00723 
00724 ////////// Configuration private methods //////////
00725 
00726 std::shared_ptr<Server>
00727 Configuration::getServer(uint64_t newServerId)
00728 {
00729     auto it = knownServers.find(newServerId);
00730     if (it != knownServers.end()) {
00731         return it->second;
00732     } else {
00733         std::shared_ptr<Peer> peer(new Peer(newServerId, consensus));
00734         if (startThreads)
00735             peer->startThread(peer);
00736         knownServers[newServerId] = peer;
00737         return peer;
00738     }
00739 }
00740 
00741 ////////// ConfigurationManager //////////
00742 
00743 ConfigurationManager::ConfigurationManager(Configuration& configuration)
00744     : configuration(configuration)
00745     , descriptions()
00746     , snapshot(0, {})
00747 {
00748 }
00749 
00750 ConfigurationManager::~ConfigurationManager()
00751 {
00752 }
00753 
00754 void
00755 ConfigurationManager::add(
00756     uint64_t index,
00757     const Protocol::Raft::Configuration& description)
00758 {
00759     descriptions[index] = description;
00760     restoreInvariants();
00761 }
00762 
00763 void
00764 ConfigurationManager::truncatePrefix(uint64_t firstIndexKept)
00765 {
00766     descriptions.erase(descriptions.begin(),
00767                        descriptions.lower_bound(firstIndexKept));
00768     restoreInvariants();
00769 }
00770 
00771 void
00772 ConfigurationManager::truncateSuffix(uint64_t lastIndexKept)
00773 {
00774     descriptions.erase(descriptions.upper_bound(lastIndexKept),
00775                        descriptions.end());
00776     restoreInvariants();
00777 }
00778 
00779 void
00780 ConfigurationManager::setSnapshot(
00781     uint64_t index,
00782     const Protocol::Raft::Configuration& description)
00783 {
00784     assert(index >= snapshot.first);
00785     snapshot = {index, description};
00786     restoreInvariants();
00787 }
00788 
00789 std::pair<uint64_t, Protocol::Raft::Configuration>
00790 ConfigurationManager::getLatestConfigurationAsOf(
00791                                         uint64_t lastIncludedIndex) const
00792 {
00793     if (descriptions.empty())
00794         return {0, {}};
00795     auto it = descriptions.upper_bound(lastIncludedIndex);
00796     // 'it' is either an element or end()
00797     if (it == descriptions.begin())
00798         return {0, {}};
00799     --it;
00800     return *it;
00801 }
00802 
00803 ////////// ConfigurationManager private methods //////////
00804 
00805 void
00806 ConfigurationManager::restoreInvariants()
00807 {
00808     if (snapshot.first != 0)
00809         descriptions.insert(snapshot);
00810     if (descriptions.empty()) {
00811         configuration.reset();
00812     } else {
00813         auto it = descriptions.rbegin();
00814         if (configuration.id != it->first)
00815             configuration.setConfiguration(it->first, it->second);
00816     }
00817 }
00818 
00819 ////////// ClusterClock //////////
00820 
00821 ClusterClock::ClusterClock()
00822     : clusterTimeAtEpoch(0)
00823     , localTimeAtEpoch(Core::Time::SteadyClock::now())
00824 {
00825 }
00826 
00827 void
00828 ClusterClock::newEpoch(uint64_t clusterTime)
00829 {
00830     clusterTimeAtEpoch = clusterTime;
00831     localTimeAtEpoch = Core::Time::SteadyClock::now();
00832 }
00833 
00834 uint64_t
00835 ClusterClock::leaderStamp()
00836 {
00837     auto localTime = Core::Time::SteadyClock::now();
00838     uint64_t nanosSinceEpoch =
00839         Core::Util::downCast<uint64_t>(std::chrono::nanoseconds(
00840             localTime - localTimeAtEpoch).count());
00841     clusterTimeAtEpoch += nanosSinceEpoch;
00842     localTimeAtEpoch = localTime;
00843     return clusterTimeAtEpoch;
00844 }
00845 
00846 uint64_t
00847 ClusterClock::interpolate() const
00848 {
00849     auto localTime = Core::Time::SteadyClock::now();
00850     uint64_t nanosSinceEpoch =
00851         Core::Util::downCast<uint64_t>(std::chrono::nanoseconds(
00852             localTime - localTimeAtEpoch).count());
00853     return clusterTimeAtEpoch + nanosSinceEpoch;
00854 }
00855 
00856 namespace {
00857 
00858 struct StagingProgressing {
00859     StagingProgressing(uint64_t epoch,
00860                        Protocol::Client::SetConfiguration::Response& response)
00861         : epoch(epoch)
00862         , response(response)
00863     {
00864     }
00865     bool operator()(Server& server) {
00866         uint64_t serverEpoch = server.getLastAckEpoch();
00867         if (serverEpoch < epoch) {
00868             auto& s = *response.mutable_configuration_bad()->add_bad_servers();
00869             s.set_server_id(server.serverId);
00870             s.set_addresses(server.addresses);
00871             return false;
00872         }
00873         return true;
00874     }
00875     const uint64_t epoch;
00876     Protocol::Client::SetConfiguration::Response& response;
00877 };
00878 
00879 struct StateMachineVersionIntersection {
00880     StateMachineVersionIntersection()
00881         : missingCount(0)
00882         , allCount(0)
00883         , minVersion(0)
00884         , maxVersion(std::numeric_limits<uint16_t>::max()) {
00885     }
00886     void operator()(Server& server) {
00887         ++allCount;
00888         if (server.haveStateMachineSupportedVersions) {
00889             minVersion = std::max(server.minStateMachineVersion,
00890                                   minVersion);
00891             maxVersion = std::min(server.maxStateMachineVersion,
00892                                   maxVersion);
00893         } else {
00894             ++missingCount;
00895         }
00896     }
00897     uint64_t missingCount;
00898     uint64_t allCount;
00899     uint16_t minVersion;
00900     uint16_t maxVersion;
00901 };
00902 
00903 } // anonymous namespace
00904 
00905 } // namespace RaftConsensusInternal
00906 
00907 ////////// RaftConsensus::Entry //////////
00908 
00909 RaftConsensus::Entry::Entry()
00910     : index(0)
00911     , type(SKIP)
00912     , command()
00913     , snapshotReader()
00914     , clusterTime(0)
00915 {
00916 }
00917 
00918 RaftConsensus::Entry::Entry(Entry&& other)
00919     : index(other.index)
00920     , type(other.type)
00921     , command(std::move(other.command))
00922     , snapshotReader(std::move(other.snapshotReader))
00923     , clusterTime(other.clusterTime)
00924 {
00925 }
00926 
00927 RaftConsensus::Entry::~Entry()
00928 {
00929 }
00930 
00931 ////////// RaftConsensus //////////
00932 
00933 RaftConsensus::RaftConsensus(Globals& globals)
00934     : ELECTION_TIMEOUT(
00935         std::chrono::milliseconds(
00936             globals.config.read<uint64_t>(
00937                 "electionTimeoutMilliseconds",
00938                 500)))
00939     , HEARTBEAT_PERIOD(
00940         globals.config.keyExists("heartbeatPeriodMilliseconds")
00941             ? std::chrono::nanoseconds(
00942                 std::chrono::milliseconds(
00943                     globals.config.read<uint64_t>(
00944                         "heartbeatPeriodMilliseconds")))
00945             : ELECTION_TIMEOUT / 2)
00946     , MAX_LOG_ENTRIES_PER_REQUEST(
00947         globals.config.read<uint64_t>(
00948             "maxLogEntriesPerRequest",
00949             5000))
00950     , RPC_FAILURE_BACKOFF(
00951         globals.config.keyExists("rpcFailureBackoffMilliseconds")
00952             ? std::chrono::nanoseconds(
00953                 std::chrono::milliseconds(
00954                     globals.config.read<uint64_t>(
00955                         "rpcFailureBackoffMilliseconds")))
00956             : (ELECTION_TIMEOUT / 2))
00957     , STATE_MACHINE_UPDATER_BACKOFF(
00958         std::chrono::milliseconds(
00959             globals.config.read<uint64_t>(
00960                 "stateMachineUpdaterBackoffMilliseconds",
00961                 10000)))
00962     , SOFT_RPC_SIZE_LIMIT(Protocol::Common::MAX_MESSAGE_LENGTH - 1024)
00963     , serverId(0)
00964     , serverAddresses()
00965     , globals(globals)
00966     , storageLayout()
00967     , sessionManager(globals.eventLoop,
00968                      globals.config)
00969     , mutex()
00970     , stateChanged()
00971     , exiting(false)
00972     , numPeerThreads(0)
00973     , log()
00974     , logSyncQueued(false)
00975     , leaderDiskThreadWorking(false)
00976     , configuration()
00977     , configurationManager()
00978     , currentTerm(0)
00979     , state(State::FOLLOWER)
00980     , lastSnapshotIndex(0)
00981     , lastSnapshotTerm(0)
00982     , lastSnapshotClusterTime(0)
00983     , lastSnapshotBytes(0)
00984     , snapshotReader()
00985     , snapshotWriter()
00986     , commitIndex(0)
00987     , leaderId(0)
00988     , votedFor(0)
00989     , currentEpoch(0)
00990     , clusterClock()
00991     , startElectionAt(TimePoint::max())
00992     , withholdVotesUntil(TimePoint::min())
00993     , numEntriesTruncated(0)
00994     , leaderDiskThread()
00995     , timerThread()
00996     , stateMachineUpdaterThread()
00997     , stepDownThread()
00998     , invariants(*this)
00999 {
01000 }
01001 
01002 RaftConsensus::~RaftConsensus()
01003 {
01004     if (!exiting)
01005         exit();
01006     if (leaderDiskThread.joinable())
01007         leaderDiskThread.join();
01008     if (timerThread.joinable())
01009         timerThread.join();
01010     if (stateMachineUpdaterThread.joinable())
01011         stateMachineUpdaterThread.join();
01012     if (stepDownThread.joinable())
01013         stepDownThread.join();
01014     NOTICE("Joined with disk and timer threads");
01015     std::unique_lock<Mutex> lockGuard(mutex);
01016     if (numPeerThreads > 0) {
01017         NOTICE("Waiting for %u peer threads to exit", numPeerThreads);
01018         while (numPeerThreads > 0)
01019             stateChanged.wait(lockGuard);
01020     }
01021     NOTICE("Peer threads have exited");
01022     // issue any outstanding disk flushes
01023     if (logSyncQueued) {
01024         std::unique_ptr<Log::Sync> sync = log->takeSync();
01025         sync->wait();
01026         log->syncComplete(std::move(sync));
01027     }
01028     NOTICE("Completed disk writes");
01029 }
01030 
01031 void
01032 RaftConsensus::init()
01033 {
01034     std::lock_guard<Mutex> lockGuard(mutex);
01035 #if DEBUG
01036     if (globals.config.read<bool>("raftDebug", false)) {
01037         mutex.callback = std::bind(&Invariants::checkAll, &invariants);
01038     }
01039 #endif
01040 
01041     NOTICE("My server ID is %lu", serverId);
01042 
01043     if (storageLayout.topDir.fd == -1) {
01044         if (globals.config.read("use-temporary-storage", false))
01045             storageLayout.initTemporary(serverId); // unit tests
01046         else
01047             storageLayout.init(globals.config, serverId);
01048     }
01049 
01050     configuration.reset(new Configuration(serverId, *this));
01051     configurationManager.reset(new ConfigurationManager(*configuration));
01052 
01053     NOTICE("Reading the log");
01054     if (!log) { // some unit tests pre-set the log; don't overwrite it
01055         log = Storage::LogFactory::makeLog(globals.config, storageLayout);
01056     }
01057     for (uint64_t index = log->getLogStartIndex();
01058          index <= log->getLastLogIndex();
01059          ++index) {
01060         const Log::Entry& entry = log->getEntry(index);
01061         if (entry.type() == Protocol::Raft::EntryType::UNKNOWN) {
01062             PANIC("Don't understand the entry type for index %lu (term %lu) "
01063                   "found on disk",
01064                   index, entry.term());
01065         }
01066         if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION) {
01067             configurationManager->add(index, entry.configuration());
01068         }
01069     }
01070 
01071     // Restore cluster time epoch from last log entry, if any
01072     if (log->getLastLogIndex() >= log->getLogStartIndex()) {
01073         clusterClock.newEpoch(
01074             log->getEntry(log->getLastLogIndex()).cluster_time());
01075     }
01076 
01077     NOTICE("The log contains indexes %lu through %lu (inclusive)",
01078            log->getLogStartIndex(), log->getLastLogIndex());
01079 
01080     if (log->metadata.has_current_term())
01081         currentTerm = log->metadata.current_term();
01082     if (log->metadata.has_voted_for())
01083         votedFor = log->metadata.voted_for();
01084     updateLogMetadata();
01085 
01086     // Read snapshot after reading log, since readSnapshot() will get rid of
01087     // conflicting log entries
01088     readSnapshot();
01089 
01090     // Clean up incomplete snapshots left by prior runs. This could be done
01091     // earlier, but maybe it's nicer to make sure we can get to this point
01092     // without PANICing before deleting these files.
01093     Storage::SnapshotFile::discardPartialSnapshots(storageLayout);
01094 
01095     if (configuration->id == 0)
01096         NOTICE("No configuration, waiting to receive one.");
01097 
01098     stepDown(currentTerm);
01099     if (RaftConsensusInternal::startThreads) {
01100         leaderDiskThread = std::thread(
01101             &RaftConsensus::leaderDiskThreadMain, this);
01102         timerThread = std::thread(
01103             &RaftConsensus::timerThreadMain, this);
01104         if (globals.config.read<bool>("disableStateMachineUpdates", false)) {
01105             NOTICE("Not starting state machine updater thread (state machine "
01106                    "updates are disabled in config)");
01107         } else {
01108             stateMachineUpdaterThread = std::thread(
01109                 &RaftConsensus::stateMachineUpdaterThreadMain, this);
01110         }
01111         stepDownThread = std::thread(
01112             &RaftConsensus::stepDownThreadMain, this);
01113     }
01114     // log->path = ""; // hack to disable disk
01115     stateChanged.notify_all();
01116     printElectionState();
01117 }
01118 
01119 void
01120 RaftConsensus::exit()
01121 {
01122     NOTICE("Shutting down");
01123     std::lock_guard<Mutex> lockGuard(mutex);
01124     exiting = true;
01125     if (configuration)
01126         configuration->forEach(&Server::exit);
01127     interruptAll();
01128 }
01129 
01130 void
01131 RaftConsensus::bootstrapConfiguration()
01132 {
01133     std::lock_guard<Mutex> lockGuard(mutex);
01134 
01135     if (currentTerm != 0 ||
01136         log->getLogStartIndex() != 1 ||
01137         log->getLastLogIndex() != 0 ||
01138         lastSnapshotIndex != 0) {
01139         PANIC("Refusing to bootstrap configuration: it looks like a log or "
01140               "snapshot already exists.");
01141     }
01142     stepDown(1); // satisfies invariants assertions
01143 
01144     // Append the configuration entry to the log
01145     Log::Entry entry;
01146     entry.set_term(1);
01147     entry.set_type(Protocol::Raft::EntryType::CONFIGURATION);
01148     entry.set_cluster_time(0);
01149     Protocol::Raft::Configuration& configuration =
01150         *entry.mutable_configuration();
01151     Protocol::Raft::Server& server =
01152         *configuration.mutable_prev_configuration()->add_servers();
01153     server.set_server_id(serverId);
01154     server.set_addresses(serverAddresses);
01155     append({&entry});
01156 }
01157 
01158 RaftConsensus::ClientResult
01159 RaftConsensus::getConfiguration(
01160         Protocol::Raft::SimpleConfiguration& currentConfiguration,
01161         uint64_t& id) const
01162 {
01163     std::unique_lock<Mutex> lockGuard(mutex);
01164     if (!upToDateLeader(lockGuard))
01165         return ClientResult::NOT_LEADER;
01166     if (configuration->state != Configuration::State::STABLE ||
01167         commitIndex < configuration->id) {
01168         return ClientResult::RETRY;
01169     }
01170     currentConfiguration = configuration->description.prev_configuration();
01171     id = configuration->id;
01172     return ClientResult::SUCCESS;
01173 }
01174 
01175 std::pair<RaftConsensus::ClientResult, uint64_t>
01176 RaftConsensus::getLastCommitIndex() const
01177 {
01178     std::unique_lock<Mutex> lockGuard(mutex);
01179     if (!upToDateLeader(lockGuard))
01180         return {ClientResult::NOT_LEADER, 0};
01181     else
01182         return {ClientResult::SUCCESS, commitIndex};
01183 }
01184 
01185 std::string
01186 RaftConsensus::getLeaderHint() const
01187 {
01188     std::lock_guard<Mutex> lockGuard(mutex);
01189     return configuration->lookupAddress(leaderId);
01190 }
01191 
01192 RaftConsensus::Entry
01193 RaftConsensus::getNextEntry(uint64_t lastIndex) const
01194 {
01195     std::unique_lock<Mutex> lockGuard(mutex);
01196     uint64_t nextIndex = lastIndex + 1;
01197     while (true) {
01198         if (exiting)
01199             throw Core::Util::ThreadInterruptedException();
01200         if (commitIndex >= nextIndex) {
01201             RaftConsensus::Entry entry;
01202 
01203             // Make the state machine load a snapshot if we don't have the next
01204             // entry it needs in the log.
01205             if (log->getLogStartIndex() > nextIndex) {
01206                 entry.type = Entry::SNAPSHOT;
01207                 // For well-behaved state machines, we expect 'snapshotReader'
01208                 // to contain a SnapshotFile::Reader that we can return
01209                 // directly to the state machine. In the case that a State
01210                 // Machine asks for the snapshot again, we have to build a new
01211                 // SnapshotFile::Reader again.
01212                 entry.snapshotReader = std::move(snapshotReader);
01213                 if (!entry.snapshotReader) {
01214                     WARNING("State machine asked for same snapshot twice; "
01215                             "this shouldn't happen in normal operation. "
01216                             "Having to re-read it from disk.");
01217                     // readSnapshot() shouldn't have any side effects since the
01218                     // snapshot should have already been read, so const_cast
01219                     // should be ok (though ugly).
01220                     const_cast<RaftConsensus*>(this)->readSnapshot();
01221                     entry.snapshotReader = std::move(snapshotReader);
01222                 }
01223                 entry.index = lastSnapshotIndex;
01224                 entry.clusterTime = lastSnapshotClusterTime;
01225             } else {
01226                 // not a snapshot
01227                 const Log::Entry& logEntry = log->getEntry(nextIndex);
01228                 entry.index = nextIndex;
01229                 if (logEntry.type() == Protocol::Raft::EntryType::DATA) {
01230                     entry.type = Entry::DATA;
01231                     const std::string& s = logEntry.data();
01232                     entry.command = Core::Buffer(
01233                         memcpy(new char[s.length()], s.data(), s.length()),
01234                         s.length(),
01235                         Core::Buffer::deleteArrayFn<char>);
01236                 } else {
01237                     entry.type = Entry::SKIP;
01238                 }
01239                 entry.clusterTime = logEntry.cluster_time();
01240             }
01241             return entry;
01242         }
01243         stateChanged.wait(lockGuard);
01244     }
01245 }
01246 
01247 SnapshotStats::SnapshotStats
01248 RaftConsensus::getSnapshotStats() const
01249 {
01250     std::lock_guard<Mutex> lockGuard(mutex);
01251 
01252     SnapshotStats::SnapshotStats s;
01253     s.set_last_snapshot_index(lastSnapshotIndex);
01254     s.set_last_snapshot_bytes(lastSnapshotBytes);
01255     s.set_log_start_index(log->getLogStartIndex());
01256     s.set_last_log_index(log->getLastLogIndex());
01257     s.set_log_bytes(log->getSizeBytes());
01258     s.set_is_leader(state == State::LEADER);
01259     return s;
01260 }
01261 
01262 void
01263 RaftConsensus::handleAppendEntries(
01264                     const Protocol::Raft::AppendEntries::Request& request,
01265                     Protocol::Raft::AppendEntries::Response& response)
01266 {
01267     std::lock_guard<Mutex> lockGuard(mutex);
01268     assert(!exiting);
01269 
01270     // Set response to a rejection. We'll overwrite these later if we end up
01271     // accepting the request.
01272     response.set_term(currentTerm);
01273     response.set_success(false);
01274     response.set_last_log_index(log->getLastLogIndex());
01275 
01276     // Piggy-back server capabilities.
01277     {
01278         auto& cap = *response.mutable_server_capabilities();
01279         auto& s = *configuration->localServer;
01280         if (s.haveStateMachineSupportedVersions) {
01281             cap.set_min_supported_state_machine_version(
01282                     s.minStateMachineVersion);
01283             cap.set_max_supported_state_machine_version(
01284                     s.maxStateMachineVersion);
01285         }
01286     }
01287 
01288     // If the caller's term is stale, just return our term to it.
01289     if (request.term() < currentTerm) {
01290         VERBOSE("Caller(%lu) is stale. Our term is %lu, theirs is %lu",
01291                  request.server_id(), currentTerm, request.term());
01292         return; // response was set to a rejection above
01293     }
01294     if (request.term() > currentTerm) {
01295         NOTICE("Received AppendEntries request from server %lu in term %lu "
01296                "(this server's term was %lu)",
01297                 request.server_id(), request.term(), currentTerm);
01298         // We're about to bump our term in the stepDown below: update
01299         // 'response' accordingly.
01300         response.set_term(request.term());
01301     }
01302     // This request is a sign of life from the current leader. Update
01303     // our term and convert to follower if necessary; reset the
01304     // election timer. set it here in case request we exit the
01305     // function early, we will set it again after the disk write.
01306     stepDown(request.term());
01307     setElectionTimer();
01308     withholdVotesUntil = Clock::now() + ELECTION_TIMEOUT;
01309 
01310     // Record the leader ID as a hint for clients.
01311     if (leaderId == 0) {
01312         leaderId = request.server_id();
01313         NOTICE("All hail leader %lu for term %lu", leaderId, currentTerm);
01314         printElectionState();
01315     } else {
01316         assert(leaderId == request.server_id());
01317     }
01318 
01319     // For an entry to fit into our log, it must not leave a gap.
01320     if (request.prev_log_index() > log->getLastLogIndex()) {
01321         VERBOSE("Rejecting AppendEntries RPC: would leave gap");
01322         return; // response was set to a rejection above
01323     }
01324     // It must also agree with the previous entry in the log (and, inductively
01325     // all prior entries).
01326     // Always match on index 0, and always match on any discarded indexes:
01327     // since we know those were committed, the leader must agree with them.
01328     // We could truncate the log here, but there's no real advantage to doing
01329     // that.
01330     if (request.prev_log_index() >= log->getLogStartIndex() &&
01331         log->getEntry(request.prev_log_index()).term() !=
01332             request.prev_log_term()) {
01333         VERBOSE("Rejecting AppendEntries RPC: terms don't agree");
01334         return; // response was set to a rejection above
01335     }
01336 
01337     // If we got this far, we're accepting the request.
01338     response.set_success(true);
01339 
01340     // This needs to be able to handle duplicated RPC requests. We compare the
01341     // entries' terms to know if we need to do the operation; otherwise,
01342     // reapplying requests can result in data loss.
01343     //
01344     // The first problem this solves is that an old AppendEntries request may be
01345     // duplicated and received after a newer request, which could cause
01346     // undesirable data loss. For example, suppose the leader appends entry 4
01347     // and then entry 5, but the follower receives 4, then 5, then 4 again.
01348     // Without this extra guard, the follower would truncate 5 out of its
01349     // log.
01350     //
01351     // The second problem is more subtle: if the same request is duplicated but
01352     // the leader processes an earlier response, it will assume the
01353     // acknowledged data is safe. However, there is a window of vulnerability
01354     // on the follower's disk between the truncate and append operations (which
01355     // are not done atomically) when the follower processes the later request.
01356     uint64_t index = request.prev_log_index();
01357     for (auto it = request.entries().begin();
01358          it != request.entries().end();
01359          ++it) {
01360         ++index;
01361         const Protocol::Raft::Entry& entry = *it;
01362         if (entry.has_index()) {
01363             // This precaution was added after #160: "Packing entries into
01364             // AppendEntries requests is broken (critical)".
01365             assert(entry.index() == index);
01366         }
01367         if (index < log->getLogStartIndex()) {
01368             // We already snapshotted and discarded this index, so presumably
01369             // we've received a committed entry we once already had.
01370             continue;
01371         }
01372         if (log->getLastLogIndex() >= index) {
01373             if (log->getEntry(index).term() == entry.term())
01374                 continue;
01375             // should never truncate committed entries:
01376             assert(commitIndex < index);
01377             uint64_t lastIndexKept = index - 1;
01378             uint64_t numTruncating = log->getLastLogIndex() - lastIndexKept;
01379             NOTICE("Truncating %lu entries after %lu from the log",
01380                    numTruncating,
01381                    lastIndexKept);
01382             numEntriesTruncated += numTruncating;
01383             log->truncateSuffix(lastIndexKept);
01384             configurationManager->truncateSuffix(lastIndexKept);
01385         }
01386 
01387         // Append this and all following entries.
01388         std::vector<const Protocol::Raft::Entry*> entries;
01389         do {
01390             const Protocol::Raft::Entry& entry = *it;
01391             if (entry.type() == Protocol::Raft::EntryType::UNKNOWN) {
01392                 PANIC("Leader %lu is trying to send us an unknown log entry "
01393                       "type for index %lu (term %lu). It shouldn't do that, "
01394                       "and there's not a good way forward. There's some hope "
01395                       "that if this server reboots, it'll come back up with a "
01396                       "newer version of the code that understands the entry.",
01397                       index,
01398                       entry.term(),
01399                       leaderId);
01400             }
01401             entries.push_back(&entry);
01402             ++it;
01403             ++index;
01404         } while (it != request.entries().end());
01405         append(entries);
01406         clusterClock.newEpoch(entries.back()->cluster_time());
01407         break;
01408     }
01409     response.set_last_log_index(log->getLastLogIndex());
01410 
01411     // Set our committed ID from the request's. In rare cases, this would make
01412     // our committed ID decrease. For example, this could happen with a new
01413     // leader who has not yet replicated one of its own entries. While that'd
01414     // be perfectly safe, guarding against it with an if statement lets us
01415     // make stronger assertions.
01416     if (commitIndex < request.commit_index()) {
01417         commitIndex = request.commit_index();
01418         assert(commitIndex <= log->getLastLogIndex());
01419         stateChanged.notify_all();
01420         VERBOSE("New commitIndex: %lu", commitIndex);
01421     }
01422 
01423     // reset election timer to avoid punishing the leader for our own
01424     // long disk writes
01425     setElectionTimer();
01426     withholdVotesUntil = Clock::now() + ELECTION_TIMEOUT;
01427 }
01428 
01429 void
01430 RaftConsensus::handleInstallSnapshot(
01431         const Protocol::Raft::InstallSnapshot::Request& request,
01432         Protocol::Raft::InstallSnapshot::Response& response)
01433 {
01434     std::lock_guard<Mutex> lockGuard(mutex);
01435     assert(!exiting);
01436 
01437     response.set_term(currentTerm);
01438 
01439     // If the caller's term is stale, just return our term to it.
01440     if (request.term() < currentTerm) {
01441         VERBOSE("Caller(%lu) is stale. Our term is %lu, theirs is %lu",
01442                  request.server_id(), currentTerm, request.term());
01443         return;
01444     }
01445     if (request.term() > currentTerm) {
01446         NOTICE("Received InstallSnapshot request from server %lu in "
01447                "term %lu (this server's term was %lu)",
01448                 request.server_id(), request.term(), currentTerm);
01449         // We're about to bump our term in the stepDown below: update
01450         // 'response' accordingly.
01451         response.set_term(request.term());
01452     }
01453     // This request is a sign of life from the current leader. Update our term
01454     // and convert to follower if necessary; reset the election timer.
01455     stepDown(request.term());
01456     setElectionTimer();
01457     withholdVotesUntil = Clock::now() + ELECTION_TIMEOUT;
01458 
01459     // Record the leader ID as a hint for clients.
01460     if (leaderId == 0) {
01461         leaderId = request.server_id();
01462         NOTICE("All hail leader %lu for term %lu", leaderId, currentTerm);
01463         printElectionState();
01464     } else {
01465         assert(leaderId == request.server_id());
01466     }
01467 
01468     if (!snapshotWriter) {
01469         snapshotWriter.reset(
01470             new Storage::SnapshotFile::Writer(storageLayout));
01471     }
01472     response.set_bytes_stored(snapshotWriter->getBytesWritten());
01473 
01474     if (request.byte_offset() < snapshotWriter->getBytesWritten()) {
01475         WARNING("Ignoring stale snapshot chunk for byte offset %lu when the "
01476                 "next byte needed is %lu",
01477                 request.byte_offset(),
01478                 snapshotWriter->getBytesWritten());
01479         return;
01480     }
01481     if (request.byte_offset() > snapshotWriter->getBytesWritten()) {
01482         WARNING("Leader tried to send snapshot chunk at byte offset %lu but "
01483                 "the next byte needed is %lu. Discarding the chunk.",
01484                 request.byte_offset(),
01485                 snapshotWriter->getBytesWritten());
01486         if (!request.has_version() || request.version() < 2) {
01487             // For compatibility with InstallSnapshot version 1 leader: such a
01488             // leader assumes the InstallSnapshot RPC succeeded if the terms
01489             // match (it ignores the 'bytes_stored' field). InstallSnapshot
01490             // hasn't succeeded here, so we can't respond ok.
01491             WARNING("Incrementing our term (to %lu) to force the leader "
01492                     "(of %lu) to step down and forget about the partial "
01493                     "snapshot it's sending",
01494                     currentTerm + 1,
01495                     currentTerm);
01496             stepDown(currentTerm + 1);
01497             // stepDown() changed currentTerm to currentTerm + 1
01498             response.set_term(currentTerm);
01499         }
01500         return;
01501     }
01502     snapshotWriter->writeRaw(request.data().data(), request.data().length());
01503     response.set_bytes_stored(snapshotWriter->getBytesWritten());
01504 
01505     if (request.done()) {
01506         if (request.last_snapshot_index() < lastSnapshotIndex) {
01507             WARNING("The leader sent us a snapshot, but it's stale: it only "
01508                     "covers up through index %lu and we already have one "
01509                     "through %lu. A well-behaved leader shouldn't do that. "
01510                     "Discarding the snapshot.",
01511                     request.last_snapshot_index(),
01512                     lastSnapshotIndex);
01513             snapshotWriter->discard();
01514             snapshotWriter.reset();
01515             return;
01516         }
01517         NOTICE("Loading in new snapshot from leader");
01518         snapshotWriter->save();
01519         snapshotWriter.reset();
01520         readSnapshot();
01521         stateChanged.notify_all();
01522     }
01523 }
01524 
01525 void
01526 RaftConsensus::handleRequestVote(
01527                     const Protocol::Raft::RequestVote::Request& request,
01528                     Protocol::Raft::RequestVote::Response& response)
01529 {
01530     std::lock_guard<Mutex> lockGuard(mutex);
01531     assert(!exiting);
01532 
01533     // If the caller has a less complete log, we can't give it our vote.
01534     uint64_t lastLogIndex = log->getLastLogIndex();
01535     uint64_t lastLogTerm = getLastLogTerm();
01536     bool logIsOk = (request.last_log_term() > lastLogTerm ||
01537                     (request.last_log_term() == lastLogTerm &&
01538                      request.last_log_index() >= lastLogIndex));
01539 
01540     if (withholdVotesUntil > Clock::now()) {
01541         NOTICE("Rejecting RequestVote for term %lu from server %lu, since "
01542                "this server (which is in term %lu) recently heard from a "
01543                "leader (%lu). Should server %lu be shut down?",
01544                request.term(), request.server_id(), currentTerm,
01545                leaderId, request.server_id());
01546         response.set_term(currentTerm);
01547         response.set_granted(false);
01548         response.set_log_ok(logIsOk);
01549         return;
01550     }
01551 
01552     if (request.term() > currentTerm) {
01553         NOTICE("Received RequestVote request from server %lu in term %lu "
01554                "(this server's term was %lu)",
01555                 request.server_id(), request.term(), currentTerm);
01556         stepDown(request.term());
01557     }
01558 
01559     // At this point, if leaderId != 0, we could tell the caller to step down.
01560     // However, this is just an optimization that does not affect correctness
01561     // or really even efficiency, so it's not worth the trouble.
01562 
01563     if (request.term() == currentTerm) {
01564         if (logIsOk && votedFor == 0) {
01565             // Give caller our vote
01566             NOTICE("Voting for %lu in term %lu",
01567                    request.server_id(), currentTerm);
01568             stepDown(currentTerm);
01569             setElectionTimer();
01570             votedFor = request.server_id();
01571             updateLogMetadata();
01572             printElectionState();
01573         }
01574     }
01575 
01576     // Fill in response.
01577     response.set_term(currentTerm);
01578     // don't strictly need the first condition
01579     response.set_granted(request.term() == currentTerm &&
01580                          votedFor == request.server_id());
01581     response.set_log_ok(logIsOk);
01582 }
01583 
01584 std::pair<RaftConsensus::ClientResult, uint64_t>
01585 RaftConsensus::replicate(const Core::Buffer& operation)
01586 {
01587     std::unique_lock<Mutex> lockGuard(mutex);
01588     Log::Entry entry;
01589     entry.set_type(Protocol::Raft::EntryType::DATA);
01590     entry.set_data(operation.getData(), operation.getLength());
01591     return replicateEntry(entry, lockGuard);
01592 }
01593 
01594 RaftConsensus::ClientResult
01595 RaftConsensus::setConfiguration(
01596         const Protocol::Client::SetConfiguration::Request& request,
01597         Protocol::Client::SetConfiguration::Response& response)
01598 {
01599     std::unique_lock<Mutex> lockGuard(mutex);
01600 
01601     if (exiting || state != State::LEADER) {
01602         // caller fills out response
01603         return ClientResult::NOT_LEADER;
01604     }
01605     if (configuration->id != request.old_id()) {
01606         // configurations has changed in the meantime
01607         response.mutable_configuration_changed()->set_error(
01608             Core::StringUtil::format(
01609                 "The current configuration has ID %lu (no longer %lu) "
01610                 "and it's %s",
01611                 configuration->id,
01612                 request.old_id(),
01613                 Core::StringUtil::toString(configuration->state).c_str()));
01614         return ClientResult::FAIL;
01615     }
01616     if (configuration->state != Configuration::State::STABLE) {
01617         response.mutable_configuration_changed()->set_error(
01618             Core::StringUtil::format(
01619                 "The current configuration (%lu) is not stable (it's %s)",
01620                 configuration->id,
01621                 Core::StringUtil::toString(configuration->state).c_str()));
01622         return ClientResult::FAIL;
01623     }
01624 
01625     NOTICE("Attempting to change the configuration from %lu",
01626            configuration->id);
01627 
01628     // Set the staging servers in the configuration.
01629     Protocol::Raft::SimpleConfiguration nextConfiguration;
01630     for (auto it = request.new_servers().begin();
01631          it != request.new_servers().end();
01632          ++it) {
01633         NOTICE("Adding server %lu at %s to staging servers",
01634                it->server_id(), it->addresses().c_str());
01635         Protocol::Raft::Server* s = nextConfiguration.add_servers();
01636         s->set_server_id(it->server_id());
01637         s->set_addresses(it->addresses());
01638     }
01639     configuration->setStagingServers(nextConfiguration);
01640     stateChanged.notify_all();
01641 
01642     // Wait for new servers to be caught up. This will abort if not every
01643     // server makes progress in a ELECTION_TIMEOUT period.
01644     uint64_t term = currentTerm;
01645     ++currentEpoch;
01646     uint64_t epoch = currentEpoch;
01647     TimePoint checkProgressAt = Clock::now() + ELECTION_TIMEOUT;
01648     while (true) {
01649         if (exiting || term != currentTerm) {
01650             NOTICE("Lost leadership, aborting configuration change");
01651             // caller will fill in response
01652             return ClientResult::NOT_LEADER;
01653         }
01654         if (configuration->stagingAll(&Server::isCaughtUp)) {
01655             NOTICE("Done catching up servers");
01656             break;
01657         }
01658         if (Clock::now() >= checkProgressAt) {
01659             using RaftConsensusInternal::StagingProgressing;
01660             StagingProgressing progressing(epoch, response);
01661             if (!configuration->stagingAll(progressing)) {
01662                 NOTICE("Failed to catch up new servers, aborting "
01663                        "configuration change");
01664                 configuration->resetStagingServers();
01665                 stateChanged.notify_all();
01666                 // progressing filled in response
01667                 return ClientResult::FAIL;
01668             } else {
01669                 ++currentEpoch;
01670                 epoch = currentEpoch;
01671                 checkProgressAt = Clock::now() + ELECTION_TIMEOUT;
01672             }
01673         }
01674         stateChanged.wait_until(lockGuard, checkProgressAt);
01675     }
01676 
01677     // Write and commit transitional configuration
01678     NOTICE("Writing transitional configuration entry");
01679     Protocol::Raft::Configuration newConfiguration;
01680     *newConfiguration.mutable_prev_configuration() =
01681         configuration->description.prev_configuration();
01682     *newConfiguration.mutable_next_configuration() = nextConfiguration;
01683     Log::Entry entry;
01684     entry.set_type(Protocol::Raft::EntryType::CONFIGURATION);
01685     *entry.mutable_configuration() = newConfiguration;
01686     std::pair<ClientResult, uint64_t> result =
01687         replicateEntry(entry, lockGuard);
01688     if (result.first != ClientResult::SUCCESS) {
01689         NOTICE("Failed to commit transitional configuration entry, aborting "
01690                "configuration change (%s)",
01691                Core::StringUtil::toString(result.first).c_str());
01692         if (result.first == ClientResult::NOT_LEADER) {
01693             // caller will fill in response
01694         } else {
01695             response.mutable_configuration_changed()->set_error(
01696                 Core::StringUtil::format(
01697                     "Couldn't successfully replicate the transitional "
01698                     "configuration (%s)",
01699                     Core::StringUtil::toString(result.first).c_str()));
01700         }
01701         return result.first;
01702     }
01703     uint64_t transitionalId = result.second;
01704 
01705     // Wait until the configuration that removes the old servers has been
01706     // committed. This is the first configuration with ID greater than
01707     // transitionalId.
01708     NOTICE("Waiting for stable configuration to commit");
01709     while (true) {
01710         // Check this first: if the new configuration excludes us so we've
01711         // stepped down upon committing it, we still want to return success.
01712         if (configuration->id > transitionalId &&
01713             commitIndex >= configuration->id) {
01714             response.mutable_ok();
01715             NOTICE("Stable configuration committed. Configuration change "
01716                    "completed successfully");
01717             return ClientResult::SUCCESS;
01718         }
01719         if (exiting || term != currentTerm) {
01720             NOTICE("Lost leadership");
01721             // caller fills in response
01722             return ClientResult::NOT_LEADER;
01723         }
01724         stateChanged.wait(lockGuard);
01725     }
01726 }
01727 
01728 void
01729 RaftConsensus::setSupportedStateMachineVersions(uint16_t minSupported,
01730                                                 uint16_t maxSupported)
01731 {
01732     std::lock_guard<Mutex> lockGuard(mutex);
01733     auto& s = *configuration->localServer;
01734     if (!s.haveStateMachineSupportedVersions ||
01735         s.minStateMachineVersion != minSupported ||
01736         s.maxStateMachineVersion != maxSupported) {
01737 
01738         s.haveStateMachineSupportedVersions = true;
01739         s.minStateMachineVersion = minSupported;
01740         s.maxStateMachineVersion = maxSupported;
01741         stateChanged.notify_all();
01742     }
01743 }
01744 
01745 std::unique_ptr<Storage::SnapshotFile::Writer>
01746 RaftConsensus::beginSnapshot(uint64_t lastIncludedIndex)
01747 {
01748     std::lock_guard<Mutex> lockGuard(mutex);
01749 
01750     NOTICE("Creating new snapshot through log index %lu (inclusive)",
01751            lastIncludedIndex);
01752     std::unique_ptr<Storage::SnapshotFile::Writer> writer(
01753                 new Storage::SnapshotFile::Writer(storageLayout));
01754 
01755     // Only committed entries may be snapshotted.
01756     // (This check relies on commitIndex monotonically increasing.)
01757     if (lastIncludedIndex > commitIndex) {
01758         PANIC("Attempted to snapshot uncommitted entries (%lu requested but "
01759               "%lu is last committed entry)", lastIncludedIndex, commitIndex);
01760     }
01761 
01762     // Format version of snapshot file is 1.
01763     uint8_t version = 1;
01764     writer->writeRaw(&version, sizeof(version));
01765 
01766     // set header fields
01767     SnapshotMetadata::Header header;
01768     header.set_last_included_index(lastIncludedIndex);
01769     // Set last_included_term and last_cluster_time:
01770     if (lastIncludedIndex >= log->getLogStartIndex() &&
01771         lastIncludedIndex <= log->getLastLogIndex()) {
01772         const Log::Entry& entry = log->getEntry(lastIncludedIndex);
01773         header.set_last_included_term(entry.term());
01774         header.set_last_cluster_time(entry.cluster_time());
01775     } else if (lastIncludedIndex == 0) {
01776         WARNING("Taking a snapshot covering no log entries");
01777         header.set_last_included_term(0);
01778         header.set_last_cluster_time(0);
01779     } else if (lastIncludedIndex == lastSnapshotIndex) {
01780         WARNING("Taking a snapshot where we already have one, covering "
01781                 "entries 1 through %lu (inclusive)", lastIncludedIndex);
01782         header.set_last_included_term(lastSnapshotTerm);
01783         header.set_last_cluster_time(lastSnapshotClusterTime);
01784     } else {
01785         WARNING("We've already discarded the entries that the state machine "
01786                 "wants to snapshot. This can happen in rare cases if the "
01787                 "leader already sent us a newer snapshot. We'll go ahead and "
01788                 "compute the snapshot, but it'll be discarded later in "
01789                 "snapshotDone(). Setting the last included term in the "
01790                 "snapshot header to 0 (a bogus value).");
01791         // If this turns out to be common, we should return NULL instead and
01792         // change the state machines to deal with that.
01793         header.set_last_included_term(0);
01794         header.set_last_cluster_time(0);
01795     }
01796 
01797     // Copy the configuration as of lastIncludedIndex to the header.
01798     std::pair<uint64_t, Protocol::Raft::Configuration> c =
01799         configurationManager->getLatestConfigurationAsOf(lastIncludedIndex);
01800     if (c.first == 0) {
01801         WARNING("Taking snapshot with no configuration. "
01802                 "This should have been the first thing in the log.");
01803     } else {
01804         header.set_configuration_index(c.first);
01805         *header.mutable_configuration() = c.second;
01806     }
01807 
01808     // write header to file
01809     writer->writeMessage(header);
01810     return writer;
01811 }
01812 
01813 void
01814 RaftConsensus::snapshotDone(
01815         uint64_t lastIncludedIndex,
01816         std::unique_ptr<Storage::SnapshotFile::Writer> writer)
01817 {
01818     std::lock_guard<Mutex> lockGuard(mutex);
01819     if (lastIncludedIndex <= lastSnapshotIndex) {
01820         NOTICE("Discarding snapshot through %lu since we already have one "
01821                "(presumably from another server) through %lu",
01822                lastIncludedIndex, lastSnapshotIndex);
01823         writer->discard();
01824         return;
01825     }
01826 
01827     // log->getEntry(lastIncludedIndex) is safe:
01828     // If the log prefix for this snapshot was truncated, that means we have a
01829     // newer snapshot (handled above).
01830     assert(lastIncludedIndex >= log->getLogStartIndex());
01831     // We never truncate committed entries from the end of our log, and
01832     // beginSnapshot() made sure that lastIncludedIndex covers only committed
01833     // entries.
01834     assert(lastIncludedIndex <= log->getLastLogIndex());
01835 
01836     lastSnapshotBytes = writer->save();
01837     lastSnapshotIndex = lastIncludedIndex;
01838     const Log::Entry& lastEntry = log->getEntry(lastIncludedIndex);
01839     lastSnapshotTerm = lastEntry.term();
01840     lastSnapshotClusterTime = lastEntry.cluster_time();
01841 
01842     // It's easier to grab this configuration out of the manager again than to
01843     // carry it around after writing the header.
01844     std::pair<uint64_t, Protocol::Raft::Configuration> c =
01845         configurationManager->getLatestConfigurationAsOf(lastIncludedIndex);
01846     if (c.first == 0) {
01847         WARNING("Could not find the latest configuration as of index %lu "
01848                 "(inclusive). This shouldn't happen if the snapshot was "
01849                 "created with a configuration, as they should be.",
01850                 lastIncludedIndex);
01851     } else {
01852         configurationManager->setSnapshot(c.first, c.second);
01853     }
01854 
01855     NOTICE("Completed snapshot through log index %lu (inclusive)",
01856            lastSnapshotIndex);
01857 
01858     // It may be beneficial to defer discarding entries if some followers are
01859     // a little bit slow, to avoid having to send them a snapshot when a few
01860     // entries would do the trick. Best to avoid premature optimization though.
01861     discardUnneededEntries();
01862 }
01863 
01864 void
01865 RaftConsensus::updateServerStats(Protocol::ServerStats& serverStats) const
01866 {
01867     std::lock_guard<Mutex> lockGuard(mutex);
01868     Core::Time::SteadyTimeConverter time;
01869     serverStats.clear_raft();
01870     Protocol::ServerStats::Raft& raftStats = *serverStats.mutable_raft();
01871 
01872     raftStats.set_current_term(currentTerm);
01873     switch (state) {
01874         case State::FOLLOWER:
01875             raftStats.set_state(Protocol::ServerStats::Raft::FOLLOWER);
01876             break;
01877         case State::CANDIDATE:
01878             raftStats.set_state(Protocol::ServerStats::Raft::CANDIDATE);
01879             break;
01880         case State::LEADER:
01881             raftStats.set_state(Protocol::ServerStats::Raft::LEADER);
01882             break;
01883     }
01884     raftStats.set_commit_index(commitIndex);
01885     raftStats.set_last_log_index(log->getLastLogIndex());
01886     raftStats.set_leader_id(leaderId);
01887     raftStats.set_voted_for(votedFor);
01888     raftStats.set_start_election_at(time.unixNanos(startElectionAt));
01889     raftStats.set_withhold_votes_until(time.unixNanos(withholdVotesUntil));
01890     raftStats.set_cluster_time_epoch(clusterClock.clusterTimeAtEpoch);
01891     raftStats.set_cluster_time(clusterClock.interpolate());
01892 
01893     raftStats.set_last_snapshot_index(lastSnapshotIndex);
01894     raftStats.set_last_snapshot_term(lastSnapshotTerm);
01895     raftStats.set_last_snapshot_cluster_time(lastSnapshotClusterTime);
01896     raftStats.set_last_snapshot_bytes(lastSnapshotBytes);
01897     raftStats.set_num_entries_truncated(numEntriesTruncated);
01898     raftStats.set_log_start_index(log->getLogStartIndex());
01899     raftStats.set_log_bytes(log->getSizeBytes());
01900     configuration->updateServerStats(serverStats, time);
01901     log->updateServerStats(serverStats);
01902 }
01903 
01904 std::ostream&
01905 operator<<(std::ostream& os, const RaftConsensus& raft)
01906 {
01907     std::lock_guard<RaftConsensus::Mutex> lockGuard(raft.mutex);
01908     typedef RaftConsensus::State State;
01909     os << "server id: " << raft.serverId << std::endl;
01910     os << "term: " << raft.currentTerm << std::endl;
01911     os << "state: " << raft.state << std::endl;
01912     os << "leader: " << raft.leaderId << std::endl;
01913     os << "lastSnapshotIndex: " << raft.lastSnapshotIndex << std::endl;
01914     os << "lastSnapshotTerm: " << raft.lastSnapshotTerm << std::endl;
01915     os << "lastSnapshotClusterTime: " << raft.lastSnapshotClusterTime
01916        << std::endl;
01917     os << "commitIndex: " << raft.commitIndex << std::endl;
01918     switch (raft.state) {
01919         case State::FOLLOWER:
01920             os << "vote: ";
01921             if (raft.votedFor == 0)
01922                 os << "available";
01923             else
01924                 os << "given to " << raft.votedFor;
01925             os << std::endl;
01926             break;
01927         case State::CANDIDATE:
01928             break;
01929         case State::LEADER:
01930             break;
01931     }
01932     os << *raft.log;
01933     os << *raft.configuration;
01934     return os;
01935 }
01936 
01937 
01938 //// RaftConsensus private methods that MUST acquire the lock
01939 
01940 void
01941 RaftConsensus::stateMachineUpdaterThreadMain()
01942 {
01943     // This implementation might create many spurious entries, since this
01944     // process will append a state machine version if it hasn't appended that
01945     // same version before during this boot. That should be fine for most use
01946     // cases. If the state machine's num_redundant_advance_version_entries
01947     // server stat gets to be large, this may need to be revisited.
01948     std::unique_lock<Mutex> lockGuard(mutex);
01949     Core::ThreadId::setName("StateMachineUpdater");
01950     uint64_t lastVersionCommitted = 0;
01951     TimePoint backoffUntil = TimePoint::min();
01952     while (!exiting) {
01953         TimePoint now = Clock::now();
01954         if (backoffUntil <= now && state == State::LEADER) {
01955             using RaftConsensusInternal::StateMachineVersionIntersection;
01956             StateMachineVersionIntersection s;
01957             configuration->forEach(std::ref(s));
01958             if (s.missingCount == 0) {
01959                 if (s.minVersion > s.maxVersion) {
01960                     ERROR("The state machines on the %lu servers do not "
01961                           "currently support a common version "
01962                           "(max of mins=%u, min of maxes=%u). Will wait to "
01963                           "change the state machine version for at least "
01964                           "another backoff period",
01965                           s.allCount,
01966                           s.minVersion,
01967                           s.maxVersion);
01968                     backoffUntil = now + STATE_MACHINE_UPDATER_BACKOFF;
01969                 } else { // s.maxVersion is the one we want
01970                     if (s.maxVersion > lastVersionCommitted) {
01971                         NOTICE("Appending log entry to advance state machine "
01972                                "version to %u (it may be set to %u already, "
01973                                "but it's hard to check that and not much "
01974                                "overhead to just do it again)",
01975                                s.maxVersion,
01976                                s.maxVersion);
01977                         Log::Entry entry;
01978                         entry.set_term(currentTerm);
01979                         entry.set_type(Protocol::Raft::EntryType::DATA);
01980                         entry.set_cluster_time(clusterClock.leaderStamp());
01981                         Protocol::Client::StateMachineCommand::Request command;
01982                         command.mutable_advance_version()->
01983                             set_requested_version(s.maxVersion);
01984                         Core::Buffer cmdBuf;
01985                         Core::ProtoBuf::serialize(command, cmdBuf);
01986                         entry.set_data(cmdBuf.getData(), cmdBuf.getLength());
01987 
01988                         std::pair<ClientResult, uint64_t> result =
01989                             replicateEntry(entry, lockGuard);
01990                         if (result.first == ClientResult::SUCCESS) {
01991                             lastVersionCommitted = s.maxVersion;
01992                         } else {
01993                             using Core::StringUtil::toString;
01994                             WARNING("Failed to commit entry to advance state "
01995                                     "machine version to version %u (%s). "
01996                                     "Will retry later after backoff period",
01997                                     s.maxVersion,
01998                                     toString(result.first).c_str());
01999                             backoffUntil = now + STATE_MACHINE_UPDATER_BACKOFF;
02000                         }
02001                         continue;
02002                     } else {
02003                         // We're in good shape, go back to sleep.
02004                     }
02005                 }
02006             } else { // missing info from at least one server
02007                 // Do nothing until we have info from everyone else
02008                 // (stateChanged will be notified). The backoff is here just to
02009                 // avoid spamming the NOTICE message.
02010                 NOTICE("Waiting to receive state machine supported version "
02011                        "information from all peers (missing %lu of %lu)",
02012                        s.missingCount, s.allCount);
02013                 backoffUntil = now + STATE_MACHINE_UPDATER_BACKOFF;
02014             }
02015         }
02016         if (backoffUntil <= now)
02017             stateChanged.wait(lockGuard);
02018         else
02019             stateChanged.wait_until(lockGuard, backoffUntil);
02020     }
02021     NOTICE("Exiting");
02022 }
02023 
02024 void
02025 RaftConsensus::leaderDiskThreadMain()
02026 {
02027     std::unique_lock<Mutex> lockGuard(mutex);
02028     Core::ThreadId::setName("LeaderDisk");
02029     // Each iteration of this loop syncs the log to disk once or sleeps until
02030     // that is necessary.
02031     while (!exiting) {
02032         if (state == State::LEADER && logSyncQueued) {
02033             uint64_t term = currentTerm;
02034             std::unique_ptr<Log::Sync> sync = log->takeSync();
02035             logSyncQueued = false;
02036             leaderDiskThreadWorking = true;
02037             {
02038                 Core::MutexUnlock<Mutex> unlockGuard(lockGuard);
02039                 sync->wait();
02040                 // Mark this false before re-acquiring RaftConsensus lock,
02041                 // since stepDown() polls on this to go false while holding the
02042                 // lock.
02043                 leaderDiskThreadWorking = false;
02044             }
02045             if (state == State::LEADER && currentTerm == term) {
02046                 configuration->localServer->lastSyncedIndex = sync->lastIndex;
02047                 advanceCommitIndex();
02048             }
02049             log->syncComplete(std::move(sync));
02050             continue;
02051         }
02052         stateChanged.wait(lockGuard);
02053     }
02054 }
02055 
02056 void
02057 RaftConsensus::timerThreadMain()
02058 {
02059     std::unique_lock<Mutex> lockGuard(mutex);
02060     Core::ThreadId::setName("startNewElection");
02061     while (!exiting) {
02062         if (Clock::now() >= startElectionAt)
02063             startNewElection();
02064         stateChanged.wait_until(lockGuard, startElectionAt);
02065     }
02066 }
02067 
02068 void
02069 RaftConsensus::peerThreadMain(std::shared_ptr<Peer> peer)
02070 {
02071     std::unique_lock<Mutex> lockGuard(mutex);
02072     Core::ThreadId::setName(
02073         Core::StringUtil::format("Peer(%lu)", peer->serverId));
02074     NOTICE("Peer thread for server %lu started", peer->serverId);
02075 
02076     // Each iteration of this loop issues a new RPC or sleeps on the condition
02077     // variable.
02078     while (!peer->exiting) {
02079         TimePoint now = Clock::now();
02080         TimePoint waitUntil = TimePoint::min();
02081 
02082         if (peer->backoffUntil > now) {
02083             waitUntil = peer->backoffUntil;
02084         } else {
02085             switch (state) {
02086                 // Followers don't issue RPCs.
02087                 case State::FOLLOWER:
02088                     waitUntil = TimePoint::max();
02089                     break;
02090 
02091                 // Candidates request votes.
02092                 case State::CANDIDATE:
02093                     if (!peer->requestVoteDone)
02094                         requestVote(lockGuard, *peer);
02095                     else
02096                         waitUntil = TimePoint::max();
02097                     break;
02098 
02099                 // Leaders replicate entries and periodically send heartbeats.
02100                 case State::LEADER:
02101                     if (peer->getMatchIndex() < log->getLastLogIndex() ||
02102                         peer->nextHeartbeatTime < now) {
02103                         // appendEntries delegates to installSnapshot if we
02104                         // need to send a snapshot instead
02105                         appendEntries(lockGuard, *peer);
02106                     } else {
02107                         waitUntil = peer->nextHeartbeatTime;
02108                     }
02109                     break;
02110             }
02111         }
02112 
02113         stateChanged.wait_until(lockGuard, waitUntil);
02114     }
02115 
02116     // must return immediately after this
02117     --numPeerThreads;
02118     stateChanged.notify_all();
02119     NOTICE("Peer thread for server %lu exiting", peer->serverId);
02120 }
02121 
02122 void
02123 RaftConsensus::stepDownThreadMain()
02124 {
02125     std::unique_lock<Mutex> lockGuard(mutex);
02126     Core::ThreadId::setName("stepDown");
02127     while (true) {
02128         // Wait until this server is the leader and is not the only server in
02129         // the cluster.
02130         while (true) {
02131             if (exiting)
02132                 return;
02133             if (state == State::LEADER) {
02134                 // If this local server forms a quorum (it is the only server
02135                 // in the configuration), we need to sleep. Without this guard,
02136                 // this method would not relinquish the CPU.
02137                 ++currentEpoch;
02138                 if (configuration->quorumMin(&Server::getLastAckEpoch) <
02139                     currentEpoch) {
02140                     break;
02141                 }
02142             }
02143             stateChanged.wait(lockGuard);
02144         }
02145         // Now, if an election timeout goes by without confirming leadership,
02146         // step down. The election timeout is a reasonable amount of time,
02147         // since it's about when other servers will start elections and bump
02148         // the term.
02149         TimePoint stepDownAt = Clock::now() + ELECTION_TIMEOUT;
02150         uint64_t term = currentTerm;
02151         uint64_t epoch = currentEpoch; // currentEpoch was incremented above
02152         while (true) {
02153             if (exiting)
02154                 return;
02155             if (currentTerm > term)
02156                 break;
02157             if (configuration->quorumMin(&Server::getLastAckEpoch) >= epoch)
02158                 break;
02159             if (Clock::now() >= stepDownAt) {
02160                 NOTICE("No broadcast for a timeout, stepping down from leader "
02161                        "of term %lu (converting to follower in term %lu)",
02162                        currentTerm, currentTerm + 1);
02163                 stepDown(currentTerm + 1);
02164                 break;
02165             }
02166             stateChanged.wait_until(lockGuard, stepDownAt);
02167         }
02168     }
02169 }
02170 
02171 //// RaftConsensus private methods that MUST NOT acquire the lock
02172 
02173 void
02174 RaftConsensus::advanceCommitIndex()
02175 {
02176     if (state != State::LEADER) {
02177         // getMatchIndex is undefined unless we're leader
02178         WARNING("advanceCommitIndex called as %s",
02179                 Core::StringUtil::toString(state).c_str());
02180         return;
02181     }
02182 
02183     // calculate the largest entry ID stored on a quorum of servers
02184     uint64_t newCommitIndex =
02185         configuration->quorumMin(&Server::getMatchIndex);
02186     if (commitIndex >= newCommitIndex)
02187         return;
02188     // If we have discarded the entry, it's because we already knew it was
02189     // committed.
02190     assert(newCommitIndex >= log->getLogStartIndex());
02191     // At least one of these entries must also be from the current term to
02192     // guarantee that no server without them can be elected.
02193     if (log->getEntry(newCommitIndex).term() != currentTerm)
02194         return;
02195     commitIndex = newCommitIndex;
02196     VERBOSE("New commitIndex: %lu", commitIndex);
02197     assert(commitIndex <= log->getLastLogIndex());
02198     stateChanged.notify_all();
02199 
02200     if (state == State::LEADER && commitIndex >= configuration->id) {
02201         // Upon committing a configuration that excludes itself, the leader
02202         // steps down.
02203         if (!configuration->hasVote(configuration->localServer)) {
02204             NOTICE("Newly committed configuration does not include self. "
02205                    "Stepping down as leader");
02206             stepDown(currentTerm + 1);
02207             return;
02208         }
02209 
02210         // Upon committing a reconfiguration (Cold,new) entry, the leader
02211         // creates the next configuration (Cnew) entry.
02212         if (configuration->state == Configuration::State::TRANSITIONAL) {
02213             Log::Entry entry;
02214             entry.set_term(currentTerm);
02215             entry.set_type(Protocol::Raft::EntryType::CONFIGURATION);
02216             entry.set_cluster_time(clusterClock.leaderStamp());
02217             *entry.mutable_configuration()->mutable_prev_configuration() =
02218                 configuration->description.next_configuration();
02219             append({&entry});
02220             return;
02221         }
02222     }
02223 }
02224 
02225 void
02226 RaftConsensus::append(const std::vector<const Log::Entry*>& entries)
02227 {
02228     for (auto it = entries.begin(); it != entries.end(); ++it)
02229         assert((*it)->term() != 0);
02230     std::pair<uint64_t, uint64_t> range = log->append(entries);
02231     if (state == State::LEADER) { // defer log sync
02232         logSyncQueued = true;
02233     } else { // sync log now
02234         std::unique_ptr<Log::Sync> sync = log->takeSync();
02235         sync->wait();
02236         log->syncComplete(std::move(sync));
02237     }
02238     uint64_t index = range.first;
02239     for (auto it = entries.begin(); it != entries.end(); ++it) {
02240         const Log::Entry& entry = **it;
02241         if (entry.type() == Protocol::Raft::EntryType::CONFIGURATION)
02242             configurationManager->add(index, entry.configuration());
02243         ++index;
02244     }
02245     stateChanged.notify_all();
02246 }
02247 
02248 void
02249 RaftConsensus::appendEntries(std::unique_lock<Mutex>& lockGuard,
02250                              Peer& peer)
02251 {
02252     uint64_t lastLogIndex = log->getLastLogIndex();
02253     uint64_t prevLogIndex = peer.nextIndex - 1;
02254     assert(prevLogIndex <= lastLogIndex);
02255 
02256     // Don't have needed entry: send a snapshot instead.
02257     if (peer.nextIndex < log->getLogStartIndex()) {
02258         installSnapshot(lockGuard, peer);
02259         return;
02260     }
02261 
02262     // Find prevLogTerm or fall back to sending a snapshot.
02263     uint64_t prevLogTerm;
02264     if (prevLogIndex >= log->getLogStartIndex()) {
02265         prevLogTerm = log->getEntry(prevLogIndex).term();
02266     } else if (prevLogIndex == 0) {
02267         prevLogTerm = 0;
02268     } else if (prevLogIndex == lastSnapshotIndex) {
02269         prevLogTerm = lastSnapshotTerm;
02270     } else {
02271         // Don't have needed entry for prevLogTerm: send snapshot instead.
02272         installSnapshot(lockGuard, peer);
02273         return;
02274     }
02275 
02276     // Build up request
02277     Protocol::Raft::AppendEntries::Request request;
02278     request.set_server_id(serverId);
02279     request.set_term(currentTerm);
02280     request.set_prev_log_term(prevLogTerm);
02281     request.set_prev_log_index(prevLogIndex);
02282     uint64_t numEntries = 0;
02283     if (!peer.suppressBulkData)
02284         numEntries = packEntries(peer.nextIndex, request);
02285     request.set_commit_index(std::min(commitIndex, prevLogIndex + numEntries));
02286 
02287     // Execute RPC
02288     Protocol::Raft::AppendEntries::Response response;
02289     TimePoint start = Clock::now();
02290     uint64_t epoch = currentEpoch;
02291     Peer::CallStatus status = peer.callRPC(
02292                 Protocol::Raft::OpCode::APPEND_ENTRIES,
02293                 request, response,
02294                 lockGuard);
02295     switch (status) {
02296         case Peer::CallStatus::OK:
02297             break;
02298         case Peer::CallStatus::FAILED:
02299             peer.suppressBulkData = true;
02300             peer.backoffUntil = start + RPC_FAILURE_BACKOFF;
02301             return;
02302         case Peer::CallStatus::INVALID_REQUEST:
02303             PANIC("The server's RaftService doesn't support the AppendEntries "
02304                   "RPC or claims the request is malformed");
02305     }
02306 
02307     // Process response
02308 
02309     if (currentTerm != request.term() || peer.exiting) {
02310         // we don't care about result of RPC
02311         return;
02312     }
02313     // Since we were leader in this term before, we must still be leader in
02314     // this term.
02315     assert(state == State::LEADER);
02316     if (response.term() > currentTerm) {
02317         NOTICE("Received AppendEntries response from server %lu in term %lu "
02318                "(this server's term was %lu)",
02319                 peer.serverId, response.term(), currentTerm);
02320         stepDown(response.term());
02321     } else {
02322         assert(response.term() == currentTerm);
02323         peer.lastAckEpoch = epoch;
02324         stateChanged.notify_all();
02325         peer.nextHeartbeatTime = start + HEARTBEAT_PERIOD;
02326         if (response.success()) {
02327             if (peer.matchIndex > prevLogIndex + numEntries) {
02328                 // Revisit this warning if we pipeline AppendEntries RPCs for
02329                 // performance.
02330                 WARNING("matchIndex should monotonically increase within a "
02331                         "term, since servers don't forget entries. But it "
02332                         "didn't.");
02333             } else {
02334                 peer.matchIndex = prevLogIndex + numEntries;
02335                 advanceCommitIndex();
02336             }
02337             peer.nextIndex = peer.matchIndex + 1;
02338             peer.suppressBulkData = false;
02339 
02340             if (!peer.isCaughtUp_ &&
02341                 peer.thisCatchUpIterationGoalId <= peer.matchIndex) {
02342                 Clock::duration duration =
02343                     Clock::now() - peer.thisCatchUpIterationStart;
02344                 uint64_t thisCatchUpIterationMs =
02345                     uint64_t(std::chrono::duration_cast<
02346                                  std::chrono::milliseconds>(duration).count());
02347                 if (labs(int64_t(peer.lastCatchUpIterationMs -
02348                                  thisCatchUpIterationMs)) * 1000L * 1000L <
02349                     std::chrono::nanoseconds(ELECTION_TIMEOUT).count()) {
02350                     peer.isCaughtUp_ = true;
02351                     stateChanged.notify_all();
02352                 } else {
02353                     peer.lastCatchUpIterationMs = thisCatchUpIterationMs;
02354                     peer.thisCatchUpIterationStart = Clock::now();
02355                     peer.thisCatchUpIterationGoalId = log->getLastLogIndex();
02356                 }
02357             }
02358         } else {
02359             if (peer.nextIndex > 1)
02360                 --peer.nextIndex;
02361             // A server that hasn't been around for a while might have a much
02362             // shorter log than ours. The AppendEntries reply contains the
02363             // index of its last log entry, and there's no reason for us to
02364             // set nextIndex to be more than 1 past that (that would leave a
02365             // gap, so it will always be rejected).
02366             if (response.has_last_log_index() &&
02367                 peer.nextIndex > response.last_log_index() + 1) {
02368                 peer.nextIndex = response.last_log_index() + 1;
02369             }
02370         }
02371     }
02372     if (response.has_server_capabilities()) {
02373         auto& cap = response.server_capabilities();
02374         if (cap.has_min_supported_state_machine_version() &&
02375             cap.has_max_supported_state_machine_version()) {
02376             peer.haveStateMachineSupportedVersions = true;
02377             peer.minStateMachineVersion = Core::Util::downCast<uint16_t>(
02378                 cap.min_supported_state_machine_version());
02379             peer.maxStateMachineVersion = Core::Util::downCast<uint16_t>(
02380                 cap.max_supported_state_machine_version());
02381             stateChanged.notify_all();
02382         }
02383     }
02384 }
02385 
02386 void
02387 RaftConsensus::installSnapshot(std::unique_lock<Mutex>& lockGuard,
02388                                Peer& peer)
02389 {
02390     // Build up request
02391     Protocol::Raft::InstallSnapshot::Request request;
02392     request.set_server_id(serverId);
02393     request.set_term(currentTerm);
02394     request.set_version(2);
02395 
02396     // Open the latest snapshot if we haven't already. Stash a copy of the
02397     // lastSnapshotIndex that goes along with the file, since it's possible
02398     // that this will change while we're transferring chunks).
02399     if (!peer.snapshotFile) {
02400         namespace FS = Storage::FilesystemUtil;
02401         peer.snapshotFile.reset(new FS::FileContents(
02402             FS::openFile(storageLayout.snapshotDir, "snapshot", O_RDONLY)));
02403         peer.snapshotFileOffset = 0;
02404         peer.lastSnapshotIndex = lastSnapshotIndex;
02405         NOTICE("Beginning to send snapshot of %lu bytes up through index %lu "
02406                "to follower",
02407                peer.snapshotFile->getFileLength(),
02408                lastSnapshotIndex);
02409     }
02410     request.set_last_snapshot_index(peer.lastSnapshotIndex);
02411     request.set_byte_offset(peer.snapshotFileOffset);
02412     uint64_t numDataBytes = 0;
02413     if (!peer.suppressBulkData) {
02414         // The amount of data we can send is bounded by the remaining bytes in
02415         // the file and the maximum length for RPCs.
02416         numDataBytes = std::min(
02417             peer.snapshotFile->getFileLength() - peer.snapshotFileOffset,
02418             SOFT_RPC_SIZE_LIMIT);
02419     }
02420     request.set_data(peer.snapshotFile->get<char>(peer.snapshotFileOffset,
02421                                                   numDataBytes),
02422                      numDataBytes);
02423     request.set_done(peer.snapshotFileOffset + numDataBytes ==
02424                      peer.snapshotFile->getFileLength());
02425 
02426     // Execute RPC
02427     Protocol::Raft::InstallSnapshot::Response response;
02428     TimePoint start = Clock::now();
02429     uint64_t epoch = currentEpoch;
02430     Peer::CallStatus status = peer.callRPC(
02431                 Protocol::Raft::OpCode::INSTALL_SNAPSHOT,
02432                 request, response,
02433                 lockGuard);
02434     switch (status) {
02435         case Peer::CallStatus::OK:
02436             break;
02437         case Peer::CallStatus::FAILED:
02438             peer.suppressBulkData = true;
02439             peer.backoffUntil = start + RPC_FAILURE_BACKOFF;
02440             return;
02441         case Peer::CallStatus::INVALID_REQUEST:
02442             PANIC("The server's RaftService doesn't support the "
02443                   "InstallSnapshot RPC or claims the request is malformed");
02444     }
02445 
02446     // Process response
02447 
02448     if (currentTerm != request.term() || peer.exiting) {
02449         // we don't care about result of RPC
02450         return;
02451     }
02452     // Since we were leader in this term before, we must still be leader in
02453     // this term.
02454     assert(state == State::LEADER);
02455     if (response.term() > currentTerm) {
02456         NOTICE("Received InstallSnapshot response from server %lu in "
02457                "term %lu (this server's term was %lu)",
02458                 peer.serverId, response.term(), currentTerm);
02459         stepDown(response.term());
02460     } else {
02461         assert(response.term() == currentTerm);
02462         peer.lastAckEpoch = epoch;
02463         stateChanged.notify_all();
02464         peer.nextHeartbeatTime = start + HEARTBEAT_PERIOD;
02465         peer.suppressBulkData = false;
02466         if (response.has_bytes_stored()) {
02467             // Normal path (since InstallSnapshot version 2).
02468             peer.snapshotFileOffset = response.bytes_stored();
02469         } else {
02470             // This is the old path for InstallSnapshot version 1 followers
02471             // only. The leader would just assume the snapshot chunk was always
02472             // appended to the file if the terms matched.
02473             peer.snapshotFileOffset += numDataBytes;
02474         }
02475         if (peer.snapshotFileOffset == peer.snapshotFile->getFileLength()) {
02476             NOTICE("Done sending snapshot through index %lu to follower",
02477                    peer.lastSnapshotIndex);
02478             peer.matchIndex = peer.lastSnapshotIndex;
02479             peer.nextIndex = peer.lastSnapshotIndex + 1;
02480             // These entries are already committed if they're in a snapshot, so
02481             // the commitIndex shouldn't advance, but let's just follow the
02482             // simple rule that bumping matchIndex should always be
02483             // followed by a call to advanceCommitIndex():
02484             advanceCommitIndex();
02485             peer.snapshotFile.reset();
02486             peer.snapshotFileOffset = 0;
02487             peer.lastSnapshotIndex = 0;
02488         }
02489     }
02490 }
02491 
02492 void
02493 RaftConsensus::becomeLeader()
02494 {
02495     assert(state == State::CANDIDATE);
02496     NOTICE("Now leader for term %lu (appending no-op at index %lu)",
02497            currentTerm,
02498            log->getLastLogIndex() + 1);
02499     state = State::LEADER;
02500     leaderId = serverId;
02501     printElectionState();
02502     startElectionAt = TimePoint::max();
02503     withholdVotesUntil = TimePoint::max();
02504 
02505     // Our local cluster time clock has been ticking ever since we got the last
02506     // log entry/snapshot. Set the clock back to when that happened, since we
02507     // don't really want to count that time (the cluster probably had no leader
02508     // for most of it).
02509     clusterClock.newEpoch(clusterClock.clusterTimeAtEpoch);
02510 
02511     // The ordering is pretty important here: First set nextIndex and
02512     // matchIndex for ourselves and each follower, then append the no op.
02513     // Otherwise we'll set our localServer's last agree index too high.
02514     configuration->forEach(&Server::beginLeadership);
02515 
02516     // Append a new entry so that commitment is not delayed indefinitely.
02517     // Otherwise, if the leader never gets anything to append, it will never
02518     // return to read-only operations (it can't prove that its committed index
02519     // is up-to-date).
02520     Log::Entry entry;
02521     entry.set_term(currentTerm);
02522     entry.set_type(Protocol::Raft::EntryType::NOOP);
02523     entry.set_cluster_time(clusterClock.leaderStamp());
02524     append({&entry});
02525 
02526     // Outstanding RequestVote RPCs are no longer needed.
02527     interruptAll();
02528 }
02529 
02530 void
02531 RaftConsensus::discardUnneededEntries()
02532 {
02533     if (log->getLogStartIndex() <= lastSnapshotIndex) {
02534         NOTICE("Removing log entries through %lu (inclusive) since "
02535                "they're no longer needed", lastSnapshotIndex);
02536         log->truncatePrefix(lastSnapshotIndex + 1);
02537         configurationManager->truncatePrefix(lastSnapshotIndex + 1);
02538         stateChanged.notify_all();
02539         if (state == State::LEADER) { // defer log sync
02540             logSyncQueued = true;
02541         } else { // sync log now
02542             std::unique_ptr<Log::Sync> sync = log->takeSync();
02543             sync->wait();
02544             log->syncComplete(std::move(sync));
02545         }
02546     }
02547 }
02548 
02549 uint64_t
02550 RaftConsensus::getLastLogTerm() const
02551 {
02552     uint64_t lastLogIndex = log->getLastLogIndex();
02553     if (lastLogIndex >= log->getLogStartIndex()) {
02554         return log->getEntry(lastLogIndex).term();
02555     } else {
02556         assert(lastLogIndex == lastSnapshotIndex); // potentially 0
02557         return lastSnapshotTerm;
02558     }
02559 }
02560 
02561 void
02562 RaftConsensus::interruptAll()
02563 {
02564     stateChanged.notify_all();
02565     // A configuration is sometimes missing for unit tests.
02566     if (configuration)
02567         configuration->forEach(&Server::interrupt);
02568 }
02569 
02570 uint64_t
02571 RaftConsensus::packEntries(
02572         uint64_t nextIndex,
02573         Protocol::Raft::AppendEntries::Request& request) const
02574 {
02575     // Add as many as entries as will fit comfortably in the request. It's
02576     // easiest to add one entry at a time until the RPC gets too big, then back
02577     // the last one out.
02578 
02579     // Calculating the size of the request ProtoBuf is a bit expensive, so this
02580     // estimates high, then if it reaches the size limit, corrects the estimate
02581     // and keeps going. This is a dumb algorithm but does well enough. It gets
02582     // the number of calls to request.ByteSize() down to about 15 even with
02583     // extremely small entries (10 bytes of payload data in each of 50,000
02584     // entries filling to a 1MB max).
02585 
02586     // Processing 19000 entries here with 10 bytes of data each (total request
02587     // size of 1MB) still takes about 42 milliseconds on an overloaded laptop
02588     // when compiling in DEBUG mode. That's a bit slow, in case someone has
02589     // aggressive election timeouts. As a result, the total number of entries
02590     // in a request is now limited to MAX_LOG_ENTRIES_PER_REQUEST=5000, which
02591     // amortizes RPC overhead well enough anyhow. This limit will only kick in
02592     // when the entry size drops below 200 bytes, since 1M/5K=200.
02593 
02594     using Core::Util::downCast;
02595     uint64_t lastIndex = std::min(log->getLastLogIndex(),
02596                                   nextIndex + MAX_LOG_ENTRIES_PER_REQUEST - 1);
02597     google::protobuf::RepeatedPtrField<Protocol::Raft::Entry>& requestEntries =
02598         *request.mutable_entries();
02599 
02600     uint64_t numEntries = 0;
02601     uint64_t currentSize = downCast<uint64_t>(request.ByteSize());
02602 
02603     for (uint64_t index = nextIndex; index <= lastIndex; ++index) {
02604         const Log::Entry& entry = log->getEntry(index);
02605         *requestEntries.Add() = entry;
02606 
02607         // Each member of a repeated message field is encoded with a tag
02608         // and a length. We conservatively assume the tag and length will
02609         // be up to 10 bytes each (2^64), though in practice the tag is
02610         // probably one byte and the length is probably two.
02611         currentSize += uint64_t(entry.ByteSize()) + 20;
02612 
02613         if (currentSize >= SOFT_RPC_SIZE_LIMIT) {
02614             // The message might be too big: calculate more exact but more
02615             // expensive size.
02616             uint64_t actualSize = downCast<uint64_t>(request.ByteSize());
02617             assert(currentSize >= actualSize);
02618             currentSize = actualSize;
02619             if (currentSize >= SOFT_RPC_SIZE_LIMIT && numEntries > 0) {
02620                 // This entry doesn't fit and we've already got some
02621                 // entries to send: discard this one and stop adding more.
02622                 requestEntries.RemoveLast();
02623                 break;
02624             }
02625         }
02626         // This entry fit, so we'll send it.
02627         ++numEntries;
02628     }
02629 
02630     assert(numEntries == uint64_t(requestEntries.size()));
02631     return numEntries;
02632 }
02633 
02634 void
02635 RaftConsensus::readSnapshot()
02636 {
02637     std::unique_ptr<Storage::SnapshotFile::Reader> reader;
02638     if (storageLayout.serverDir.fd != -1) {
02639         try {
02640             reader.reset(new Storage::SnapshotFile::Reader(storageLayout));
02641         } catch (const std::runtime_error& e) { // file not found
02642             NOTICE("%s", e.what());
02643         }
02644     }
02645     if (reader) {
02646         // Check that this snapshot uses format version 1
02647         uint8_t version = 0;
02648         uint64_t bytesRead = reader->readRaw(&version, sizeof(version));
02649         if (bytesRead < 1) {
02650             PANIC("Found completely empty snapshot file (it doesn't even "
02651                   "have a version field)");
02652         } else {
02653             if (version != 1) {
02654                 PANIC("Snapshot format version read was %u, but this code can "
02655                       "only read version 1",
02656                       version);
02657             }
02658         }
02659 
02660         // load header contents
02661         SnapshotMetadata::Header header;
02662         std::string error = reader->readMessage(header);
02663         if (!error.empty()) {
02664             PANIC("Couldn't read snapshot header: %s", error.c_str());
02665         }
02666         if (header.last_included_index() < lastSnapshotIndex) {
02667             PANIC("Trying to load a snapshot that is more stale than one this "
02668                   "server loaded earlier. The earlier snapshot covers through "
02669                   "log index %lu (inclusive); this one covers through log "
02670                   "index %lu (inclusive)",
02671                   lastSnapshotIndex,
02672                   header.last_included_index());
02673 
02674         }
02675         lastSnapshotIndex = header.last_included_index();
02676         lastSnapshotTerm = header.last_included_term();
02677         lastSnapshotClusterTime = header.last_cluster_time();
02678         lastSnapshotBytes = reader->getSizeBytes();
02679         commitIndex = std::max(lastSnapshotIndex, commitIndex);
02680 
02681         NOTICE("Reading snapshot which covers log entries 1 through %lu "
02682                "(inclusive)", lastSnapshotIndex);
02683 
02684         // We should keep log entries if they might be needed for a quorum. So:
02685         // 1. Discard log if it is shorter than the snapshot.
02686         // 2. Discard log if its lastSnapshotIndex entry disagrees with the
02687         //    lastSnapshotTerm.
02688         if (log->getLastLogIndex() < lastSnapshotIndex ||
02689             (log->getLogStartIndex() <= lastSnapshotIndex &&
02690              log->getEntry(lastSnapshotIndex).term() != lastSnapshotTerm)) {
02691             // The NOTICE message can be confusing if the log is empty, so
02692             // don't print it in that case. We still want to shift the log
02693             // start index, though.
02694             if (log->getLogStartIndex() <= log->getLastLogIndex()) {
02695                 NOTICE("Discarding the entire log, since it's not known to be "
02696                        "consistent with the snapshot that is being read");
02697             }
02698             // Discard the entire log, setting the log start to point to the
02699             // right place.
02700             log->truncatePrefix(lastSnapshotIndex + 1);
02701             log->truncateSuffix(lastSnapshotIndex);
02702             configurationManager->truncatePrefix(lastSnapshotIndex + 1);
02703             configurationManager->truncateSuffix(lastSnapshotIndex);
02704             // Clean up resources.
02705             if (state == State::LEADER) { // defer log sync
02706                 logSyncQueued = true;
02707             } else { // sync log now
02708                 std::unique_ptr<Log::Sync> sync = log->takeSync();
02709                 sync->wait();
02710                 log->syncComplete(std::move(sync));
02711             }
02712             clusterClock.newEpoch(lastSnapshotClusterTime);
02713         }
02714 
02715         discardUnneededEntries();
02716 
02717         if (header.has_configuration_index() && header.has_configuration()) {
02718             configurationManager->setSnapshot(header.configuration_index(),
02719                                               header.configuration());
02720         } else {
02721             WARNING("No configuration. This is unexpected, since any snapshot "
02722                     "should contain a configuration (they're the first thing "
02723                     "found in any log).");
02724         }
02725 
02726         stateChanged.notify_all();
02727     }
02728     if (log->getLogStartIndex() > lastSnapshotIndex + 1) {
02729         PANIC("The newest snapshot on this server covers up through log index "
02730               "%lu (inclusive), but its log starts at index %lu. This "
02731               "should never happen and indicates a corrupt disk state. If you "
02732               "want this server to participate in your cluster, you should "
02733               "back up all of its state, delete it, and add the server back "
02734               "as a new cluster member using the reconfiguration mechanism.",
02735               lastSnapshotIndex, log->getLogStartIndex());
02736     }
02737 
02738     snapshotReader = std::move(reader);
02739 }
02740 
02741 std::pair<RaftConsensus::ClientResult, uint64_t>
02742 RaftConsensus::replicateEntry(Log::Entry& entry,
02743                               std::unique_lock<Mutex>& lockGuard)
02744 {
02745     if (state == State::LEADER) {
02746         entry.set_term(currentTerm);
02747         entry.set_cluster_time(clusterClock.leaderStamp());
02748         append({&entry});
02749         uint64_t index = log->getLastLogIndex();
02750         while (!exiting && currentTerm == entry.term()) {
02751             if (commitIndex >= index) {
02752                 VERBOSE("replicate succeeded");
02753                 return {ClientResult::SUCCESS, index};
02754             }
02755             stateChanged.wait(lockGuard);
02756         }
02757     }
02758     return {ClientResult::NOT_LEADER, 0};
02759 }
02760 
02761 void
02762 RaftConsensus::requestVote(std::unique_lock<Mutex>& lockGuard, Peer& peer)
02763 {
02764     Protocol::Raft::RequestVote::Request request;
02765     request.set_server_id(serverId);
02766     request.set_term(currentTerm);
02767     request.set_last_log_term(getLastLogTerm());
02768     request.set_last_log_index(log->getLastLogIndex());
02769 
02770     Protocol::Raft::RequestVote::Response response;
02771     VERBOSE("requestVote start");
02772     TimePoint start = Clock::now();
02773     uint64_t epoch = currentEpoch;
02774     Peer::CallStatus status = peer.callRPC(
02775                 Protocol::Raft::OpCode::REQUEST_VOTE,
02776                 request, response,
02777                 lockGuard);
02778     VERBOSE("requestVote done");
02779     switch (status) {
02780         case Peer::CallStatus::OK:
02781             break;
02782         case Peer::CallStatus::FAILED:
02783             peer.suppressBulkData = true;
02784             peer.backoffUntil = start + RPC_FAILURE_BACKOFF;
02785             return;
02786         case Peer::CallStatus::INVALID_REQUEST:
02787             PANIC("The server's RaftService doesn't support the RequestVote "
02788                   "RPC or claims the request is malformed");
02789     }
02790 
02791     if (currentTerm != request.term() || state != State::CANDIDATE ||
02792         peer.exiting) {
02793         VERBOSE("ignore RPC result");
02794         // we don't care about result of RPC
02795         return;
02796     }
02797 
02798     if (response.term() > currentTerm) {
02799         NOTICE("Received RequestVote response from server %lu in "
02800                "term %lu (this server's term was %lu)",
02801                 peer.serverId, response.term(), currentTerm);
02802         stepDown(response.term());
02803     } else {
02804         peer.requestVoteDone = true;
02805         peer.lastAckEpoch = epoch;
02806         stateChanged.notify_all();
02807 
02808         if (response.granted()) {
02809             peer.haveVote_ = true;
02810             NOTICE("Got vote from server %lu for term %lu",
02811                    peer.serverId, currentTerm);
02812             if (configuration->quorumAll(&Server::haveVote))
02813                 becomeLeader();
02814         } else {
02815             NOTICE("Vote denied by server %lu for term %lu",
02816                    peer.serverId, currentTerm);
02817         }
02818     }
02819 }
02820 
02821 void
02822 RaftConsensus::setElectionTimer()
02823 {
02824     std::chrono::nanoseconds duration(
02825         Core::Random::randomRange(
02826             uint64_t(std::chrono::nanoseconds(ELECTION_TIMEOUT).count()),
02827             uint64_t(std::chrono::nanoseconds(ELECTION_TIMEOUT).count()) * 2));
02828     VERBOSE("Will become candidate in %s",
02829             Core::StringUtil::toString(duration).c_str());
02830     startElectionAt = Clock::now() + duration;
02831     stateChanged.notify_all();
02832 }
02833 
02834 void
02835 RaftConsensus::printElectionState() const
02836 {
02837     const char* s = NULL;
02838     switch (state) {
02839         case State::FOLLOWER:
02840             s = "FOLLOWER, ";
02841             break;
02842         case State::CANDIDATE:
02843             s = "CANDIDATE,";
02844             break;
02845         case State::LEADER:
02846             s = "LEADER,   ";
02847             break;
02848     }
02849     NOTICE("server=%lu, term=%lu, state=%s leader=%lu, vote=%lu",
02850            serverId,
02851            currentTerm,
02852            s,
02853            leaderId,
02854            votedFor);
02855 }
02856 
02857 void
02858 RaftConsensus::startNewElection()
02859 {
02860     if (configuration->id == 0) {
02861         // Don't have a configuration: go back to sleep.
02862         setElectionTimer();
02863         return;
02864     }
02865 
02866     if (commitIndex >= configuration->id &&
02867         !configuration->hasVote(configuration->localServer)) {
02868         // we are not in the latest configuration, do not start an election
02869         setElectionTimer();
02870         return;
02871     }
02872 
02873     if (leaderId > 0) {
02874         NOTICE("Running for election in term %lu "
02875                "(haven't heard from leader %lu lately)",
02876                currentTerm + 1,
02877                leaderId);
02878     } else if (state == State::CANDIDATE) {
02879         NOTICE("Running for election in term %lu "
02880                "(previous candidacy for term %lu timed out)",
02881                currentTerm + 1,
02882                currentTerm);
02883     } else {
02884         NOTICE("Running for election in term %lu",
02885                currentTerm + 1);
02886     }
02887     ++currentTerm;
02888     state = State::CANDIDATE;
02889     leaderId = 0;
02890     votedFor = serverId;
02891     printElectionState();
02892     setElectionTimer();
02893     configuration->forEach(&Server::beginRequestVote);
02894     if (snapshotWriter) {
02895         snapshotWriter->discard();
02896         snapshotWriter.reset();
02897     }
02898     updateLogMetadata();
02899     interruptAll();
02900 
02901     // if we're the only server, this election is already done
02902     if (configuration->quorumAll(&Server::haveVote))
02903         becomeLeader();
02904 }
02905 
02906 void
02907 RaftConsensus::stepDown(uint64_t newTerm)
02908 {
02909     assert(currentTerm <= newTerm);
02910     if (currentTerm < newTerm) {
02911         VERBOSE("stepDown(%lu)", newTerm);
02912         currentTerm = newTerm;
02913         leaderId = 0;
02914         votedFor = 0;
02915         updateLogMetadata();
02916         configuration->resetStagingServers();
02917         if (snapshotWriter) {
02918             snapshotWriter->discard();
02919             snapshotWriter.reset();
02920         }
02921         state = State::FOLLOWER;
02922         printElectionState();
02923     } else {
02924         if (state != State::FOLLOWER) {
02925             state = State::FOLLOWER;
02926             printElectionState();
02927         }
02928     }
02929     if (startElectionAt == TimePoint::max()) // was leader
02930         setElectionTimer();
02931     if (withholdVotesUntil == TimePoint::max()) // was leader
02932         withholdVotesUntil = TimePoint::min();
02933     interruptAll();
02934 
02935     // If the leader disk thread is currently writing to disk, wait for it to
02936     // finish. We poll here because we don't want to release the lock (this
02937     // server would then believe its writes have been flushed when they
02938     // haven't).
02939     while (leaderDiskThreadWorking)
02940         usleep(500);
02941 
02942     // If a recent append has been queued, empty it here. Do this after waiting
02943     // for leaderDiskThread to preserve FIFO ordering of Log::Sync objects.
02944     // Don't bother updating the localServer's lastSyncedIndex, since it
02945     // doesn't matter for non-leaders.
02946     if (logSyncQueued) {
02947         std::unique_ptr<Log::Sync> sync = log->takeSync();
02948         sync->wait();
02949         log->syncComplete(std::move(sync));
02950         logSyncQueued = false;
02951     }
02952 }
02953 
02954 void
02955 RaftConsensus::updateLogMetadata()
02956 {
02957     log->metadata.set_current_term(currentTerm);
02958     log->metadata.set_voted_for(votedFor);
02959     VERBOSE("updateMetadata start");
02960     log->updateMetadata();
02961     VERBOSE("updateMetadata end");
02962 }
02963 
02964 bool
02965 RaftConsensus::upToDateLeader(std::unique_lock<Mutex>& lockGuard) const
02966 {
02967     ++currentEpoch;
02968     uint64_t epoch = currentEpoch;
02969     // schedule a heartbeat now so that this returns quickly
02970     configuration->forEach(&Server::scheduleHeartbeat);
02971     stateChanged.notify_all();
02972     while (true) {
02973         if (exiting || state != State::LEADER)
02974             return false;
02975         if (configuration->quorumMin(&Server::getLastAckEpoch) >= epoch) {
02976             // So we know we're the current leader, but do we have an
02977             // up-to-date commitIndex yet? What we'd like to check is whether
02978             // the entry's term at commitIndex matches our currentTerm, but
02979             // snapshots mean that we may not have the entry in our log. Since
02980             // commitIndex >= lastSnapshotIndex, we split into two cases:
02981             uint64_t commitTerm;
02982             if (commitIndex == lastSnapshotIndex) {
02983                 commitTerm = lastSnapshotTerm;
02984             } else {
02985                 assert(commitIndex > lastSnapshotIndex);
02986                 assert(commitIndex >= log->getLogStartIndex());
02987                 assert(commitIndex <= log->getLastLogIndex());
02988                 commitTerm = log->getEntry(commitIndex).term();
02989             }
02990             if (commitTerm == currentTerm)
02991                 return true;
02992         }
02993         stateChanged.wait(lockGuard);
02994     }
02995 }
02996 
02997 std::ostream&
02998 operator<<(std::ostream& os, RaftConsensus::ClientResult clientResult)
02999 {
03000     typedef RaftConsensus::ClientResult ClientResult;
03001     switch (clientResult) {
03002         case ClientResult::SUCCESS:
03003             os << "ClientResult::SUCCESS";
03004             break;
03005         case ClientResult::FAIL:
03006             os << "ClientResult::FAIL";
03007             break;
03008         case ClientResult::RETRY:
03009             os << "ClientResult::RETRY";
03010             break;
03011         case ClientResult::NOT_LEADER:
03012             os << "ClientResult::NOT_LEADER";
03013             break;
03014     }
03015     return os;
03016 }
03017 
03018 std::ostream&
03019 operator<<(std::ostream& os, RaftConsensus::State state)
03020 {
03021     typedef RaftConsensus::State State;
03022     switch (state) {
03023         case State::FOLLOWER:
03024             os << "State::FOLLOWER";
03025             break;
03026         case State::CANDIDATE:
03027             os << "State::CANDIDATE";
03028             break;
03029         case State::LEADER:
03030             os << "State::LEADER";
03031             break;
03032     }
03033     return os;
03034 }
03035 
03036 } // namespace LogCabin::Server
03037 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines