LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <chrono> 00018 #include <deque> 00019 #include <functional> 00020 #include <memory> 00021 #include <thread> 00022 #include <unordered_map> 00023 00024 #include "build/Protocol/Client.pb.h" 00025 #include "build/Protocol/Raft.pb.h" 00026 #include "build/Protocol/ServerStats.pb.h" 00027 #include "build/Server/SnapshotStats.pb.h" 00028 #include "Client/SessionManager.h" 00029 #include "Core/CompatAtomic.h" 00030 #include "Core/ConditionVariable.h" 00031 #include "Core/Mutex.h" 00032 #include "Core/Time.h" 00033 #include "RPC/ClientRPC.h" 00034 #include "Storage/Layout.h" 00035 #include "Storage/Log.h" 00036 #include "Storage/SnapshotFile.h" 00037 00038 #ifndef LOGCABIN_SERVER_RAFTCONSENSUS_H 00039 #define LOGCABIN_SERVER_RAFTCONSENSUS_H 00040 00041 namespace LogCabin { 00042 00043 // forward declarations 00044 namespace Event { 00045 class Loop; 00046 } 00047 namespace RPC { 00048 class ClientSession; 00049 } 00050 00051 namespace Server { 00052 00053 // forward declaration 00054 class Globals; 00055 00056 // forward declaration 00057 class RaftConsensus; 00058 00059 namespace RaftConsensusInternal { 00060 00061 00062 class Invariants { 00063 public: 00064 explicit Invariants(RaftConsensus&); 00065 ~Invariants(); 00066 void checkAll(); 00067 private: 00068 void checkBasic(); 00069 void checkPeerBasic(); 00070 void checkDelta(); 00071 void checkPeerDelta(); 00072 00073 const RaftConsensus& consensus; 00074 uint64_t errors; 00075 struct ConsensusSnapshot; 00076 std::unique_ptr<ConsensusSnapshot> previous; 00077 }; 00078 00079 00080 /** 00081 * True if this should actually spawn threads, false otherwise. 00082 * Normally set to true, but many unit tests set this to false. 00083 */ 00084 extern bool startThreads; 00085 00086 /** 00087 * Reads the current time. This will refer to the best clock available on our 00088 * system, which may or may not be monotonic. 00089 */ 00090 typedef LogCabin::Core::Time::SteadyClock Clock; 00091 00092 /** 00093 * Some point in time relative to the Clock's epoch. 00094 */ 00095 typedef Clock::time_point TimePoint; 00096 00097 typedef Core::Mutex Mutex; 00098 00099 /** 00100 * A base class for known servers in the cluster, including this process (see 00101 * LocalServer) and others (see Peer). This tracks various bits of state for 00102 * each server, which is used when we are a candidate or leader. This class 00103 * does not do any internal locking; it should be accessed only while holding 00104 * the RaftConsensus lock. 00105 */ 00106 class Server { 00107 public: 00108 /** 00109 * Constructor. 00110 */ 00111 explicit Server(uint64_t serverId); 00112 /** 00113 * Destructor. 00114 */ 00115 virtual ~Server(); 00116 /** 00117 * Begin requesting the Server's vote in the current election. Return 00118 * immediately. The condition variable in RaftConsensus will be notified 00119 * separately. 00120 */ 00121 virtual void beginRequestVote() = 0; 00122 /** 00123 * Begin replicating to the Server in the current term. Return 00124 * immediately. The condition variable in RaftConsensus will be notified 00125 * separately. 00126 */ 00127 virtual void beginLeadership() = 0; 00128 /** 00129 * Inform any threads belonging to this Server to exit. Return immediately. 00130 * The condition variable in RaftConsensus will be notified separately. 00131 */ 00132 virtual void exit() = 0; 00133 /** 00134 * Return the latest time this Server acknowledged our current term. 00135 */ 00136 virtual uint64_t getLastAckEpoch() const = 0; 00137 /** 00138 * Return the largest entry ID for which this Server is known to share the 00139 * same entries up to and including this entry with our log. 00140 * This is used for advancing the leader's commitIndex. 00141 * Monotonically increases within a term. 00142 * 00143 * \warning 00144 * Only valid when we're leader. 00145 */ 00146 virtual uint64_t getMatchIndex() const = 0; 00147 /** 00148 * Return true if this Server has awarded us its vote for this term. 00149 */ 00150 virtual bool haveVote() const = 0; 00151 /** 00152 * Cancel any outstanding RPCs to this Server. 00153 * The condition variable in RaftConsensus will be notified separately. 00154 */ 00155 virtual void interrupt() = 0; 00156 /** 00157 * Return true once this Server is ready to be added to the cluster. This 00158 * means it has received enough of our log to where it is not expected to 00159 * cause an availability problem when added to the cluster configuration. 00160 * Should monotonically change from false to true. 00161 */ 00162 virtual bool isCaughtUp() const = 0; 00163 /** 00164 * Make the next heartbeat RPC happen soon. Return immediately. 00165 * The condition variable in RaftConsensus will be notified separately. 00166 */ 00167 virtual void scheduleHeartbeat() = 0; 00168 /** 00169 * Write this Server's state into the given structure. Used for 00170 * diagnostics. 00171 */ 00172 virtual void 00173 updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats, 00174 Core::Time::SteadyTimeConverter& time) const = 0; 00175 00176 /** 00177 * Print out a Server for debugging purposes. 00178 */ 00179 friend std::ostream& operator<<(std::ostream& os, const Server& server); 00180 00181 /** 00182 * Virtual method for operator<<. 00183 */ 00184 virtual std::ostream& dumpToStream(std::ostream& os) const = 0; 00185 00186 /** 00187 * The ID of this server. 00188 */ 00189 const uint64_t serverId; 00190 /** 00191 * The network addresses at which this server may be available 00192 * (comma-delimited) 00193 */ 00194 std::string addresses; 00195 00196 /** 00197 * If true, minStateMachineVersion and maxStateMachineVersion are set 00198 * (although they may be stale). 00199 */ 00200 bool haveStateMachineSupportedVersions; 00201 /** 00202 * If haveStateMachineSupportedVersions is true, the smallest version of 00203 * the state machine commands/behavior that the server can support. 00204 */ 00205 uint16_t minStateMachineVersion; 00206 /** 00207 * If haveStateMachineSupportedVersions is true, the largest version of 00208 * the state machine commands/behavior that the server can support. 00209 */ 00210 uint16_t maxStateMachineVersion; 00211 00212 /** 00213 * Used internally by Configuration for garbage collection. 00214 */ 00215 bool gcFlag; 00216 }; 00217 00218 00219 /** 00220 * A type of Server for the local process. There will only be one instance of 00221 * this class. Most of these methods don't do much, but they are needed to 00222 * satisfy the Server interface. 00223 */ 00224 class LocalServer : public Server { 00225 public: 00226 LocalServer(uint64_t serverId, RaftConsensus& consensus); 00227 ~LocalServer(); 00228 void exit(); 00229 void beginRequestVote(); 00230 void beginLeadership(); 00231 uint64_t getMatchIndex() const; 00232 bool haveVote() const; 00233 uint64_t getLastAckEpoch() const; 00234 void interrupt(); 00235 bool isCaughtUp() const; 00236 void scheduleHeartbeat(); 00237 std::ostream& dumpToStream(std::ostream& os) const; 00238 void updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats, 00239 Core::Time::SteadyTimeConverter& time) const; 00240 RaftConsensus& consensus; 00241 /** 00242 * The index of the last log entry that has been flushed to disk. 00243 * Valid for leaders only. Returned by getMatchIndex() and used to 00244 * advance the leader's commitIndex. 00245 */ 00246 uint64_t lastSyncedIndex; 00247 }; 00248 00249 /** 00250 * Represents another server in the cluster. One of these exists for each other 00251 * server. In addition to tracking state for each other server, this class 00252 * provides a thread that executes RaftConsensus::peerThreadMain(). 00253 * 00254 * This class has no internal locking: in general, the RaftConsensus lock 00255 * should be held when accessing this class, but there are some exceptions 00256 * noted below. 00257 */ 00258 class Peer : public Server { 00259 public: 00260 /** 00261 * Constructor. 00262 */ 00263 Peer(uint64_t serverId, RaftConsensus& consensus); 00264 00265 /** 00266 * Destructor. 00267 */ 00268 ~Peer(); 00269 00270 // Methods implemented from Server interface. 00271 void beginRequestVote(); 00272 void beginLeadership(); 00273 void exit(); 00274 uint64_t getLastAckEpoch() const; 00275 uint64_t getMatchIndex() const; 00276 bool haveVote() const; 00277 bool isCaughtUp() const; 00278 void interrupt(); 00279 void scheduleHeartbeat(); 00280 00281 /** 00282 * Returned by callRPC(). 00283 */ 00284 enum class CallStatus { 00285 /** 00286 * The RPC succeeded and the response was filled in. 00287 */ 00288 OK, 00289 /** 00290 * No reply was received from the server. Maybe the connection was 00291 * dropped; maybe the RPC was canceled. 00292 */ 00293 FAILED, 00294 /** 00295 * The server does not support this RPC or didn't like the arguments 00296 * given. 00297 */ 00298 INVALID_REQUEST, 00299 }; 00300 00301 /** 00302 * Execute a remote procedure call on the server's RaftService. As this 00303 * operation might take a while, it should be called without RaftConsensus 00304 * lock. 00305 * \param[in] opCode 00306 * The RPC opcode to execute (see Protocol::Raft::OpCode). 00307 * \param[in] request 00308 * The request that was received from the other server. 00309 * \param[out] response 00310 * Where the reply should be placed, if status is OK. 00311 * \param[in] lockGuard 00312 * The Raft lock, which is released internally to allow for I/O 00313 * concurrency. 00314 * \return 00315 * See CallStatus. 00316 */ 00317 CallStatus 00318 callRPC(Protocol::Raft::OpCode opCode, 00319 const google::protobuf::Message& request, 00320 google::protobuf::Message& response, 00321 std::unique_lock<Mutex>& lockGuard); 00322 00323 /** 00324 * Launch this Peer's thread, which should run 00325 * RaftConsensus::peerThreadMain. 00326 * \param self 00327 * A shared_ptr to this object, which the detached thread uses to make 00328 * sure this object doesn't go away. 00329 */ 00330 void startThread(std::shared_ptr<Peer> self); 00331 std::ostream& dumpToStream(std::ostream& os) const; 00332 void updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats, 00333 Core::Time::SteadyTimeConverter& time) const; 00334 00335 private: 00336 00337 /** 00338 * Get the current session for this server. (This is cached in the #session 00339 * member for efficiency.) As this operation might take a while, it should 00340 * be called without RaftConsensus lock. 00341 */ 00342 std::shared_ptr<RPC::ClientSession> 00343 getSession(std::unique_lock<Mutex>& lockGuard); 00344 00345 public: 00346 00347 /** 00348 * Used in startThread. 00349 * TODO(ongaro): reconsider 00350 */ 00351 RaftConsensus& consensus; 00352 00353 /** 00354 * A reference to the server's event loop, needed to construct new 00355 * sessions. 00356 */ 00357 Event::Loop& eventLoop; 00358 00359 /** 00360 * Set to true when thread should exit. 00361 */ 00362 bool exiting; 00363 00364 /** 00365 * Set to true if the server has responded to our RequestVote request in 00366 * the current term, false otherwise. 00367 */ 00368 bool requestVoteDone; 00369 00370 /** 00371 * See #haveVote(). 00372 */ 00373 bool haveVote_; 00374 00375 /** 00376 * Indicates that the leader and the follower aren't necessarily 00377 * synchronized. The leader should not send large amounts of data (with 00378 * many log entries or large chunks of a snapshot file) to the follower 00379 * while this flag is true. For example, the follower might have been 00380 * disconnected, or the leader might not know where the follower's log 00381 * diverges from its own. It's better to sync up using small RPCs like 00382 * heartbeats, then begin/resume sending bulk data after receiving an 00383 * acknowledgment. 00384 * 00385 * Only used when leader. 00386 */ 00387 bool suppressBulkData; 00388 00389 /** 00390 * The index of the next entry to send to the follower. Only used when 00391 * leader. Minimum value of 1. 00392 */ 00393 uint64_t nextIndex; 00394 00395 /** 00396 * See #getMatchIndex(). 00397 */ 00398 uint64_t matchIndex; 00399 00400 /** 00401 * See #getLastAckEpoch(). 00402 */ 00403 uint64_t lastAckEpoch; 00404 00405 /** 00406 * When the next heartbeat should be sent to the follower. 00407 * Only valid while we're leader. The leader sends heartbeats periodically 00408 * if it has no new data to send, to stop the follower from starting a new 00409 * election. 00410 * \invariant 00411 * This is never more than HEARTBEAT_PERIOD in the future, since 00412 * new leaders don't reset it. 00413 */ 00414 TimePoint nextHeartbeatTime; 00415 00416 /** 00417 * The minimum time at which the next RPC should be sent. 00418 * Only valid while we're a candidate or leader. This is set when an RPC 00419 * fails so as to not overwhelm the network with retries (some RPCs fail 00420 * without timing out, for example if the remote kernel rejects the 00421 * connection). 00422 */ 00423 TimePoint backoffUntil; 00424 00425 /** 00426 * Counts RPC failures to issue fewer warnings. 00427 * Accessed only from callRPC() without holding the lock. 00428 */ 00429 uint64_t rpcFailuresSinceLastWarning; 00430 00431 // Used for isCaughtUp. TODO(ongaro): doc precisely 00432 uint64_t lastCatchUpIterationMs; 00433 TimePoint thisCatchUpIterationStart; 00434 uint64_t thisCatchUpIterationGoalId; 00435 00436 /** 00437 * See #isCaughtUp(). 00438 */ 00439 bool isCaughtUp_; 00440 00441 /** 00442 * A snapshot file to be sent to the follower, or NULL. 00443 * TODO(ongaro): It'd be better to destroy this as soon as this server 00444 * steps down, but peers don't have a hook for that right now. 00445 */ 00446 std::unique_ptr<Storage::FilesystemUtil::FileContents> snapshotFile; 00447 /** 00448 * The number of bytes of 'snapshotFile' that have been acknowledged by the 00449 * follower already. Send starting here next time. 00450 */ 00451 uint64_t snapshotFileOffset; 00452 /** 00453 * The last log index that 'snapshotFile' corresponds to. This is used to 00454 * set the follower's #nextIndex accordingly after we're done sending it 00455 * the snapshot. 00456 */ 00457 uint64_t lastSnapshotIndex; 00458 00459 private: 00460 00461 /** 00462 * Caches the result of getSession(). 00463 */ 00464 std::shared_ptr<RPC::ClientSession> session; 00465 00466 /** 00467 * callRPC() places its RPC here so that interrupt() may cancel it. 00468 * Setting this member and canceling the RPC must be done while holding the 00469 * Raft lock; waiting on the RPC is done without holding that lock. 00470 */ 00471 RPC::ClientRPC rpc; 00472 00473 // Peer is not copyable. 00474 Peer(const Peer&) = delete; 00475 Peer& operator=(const Peer&) = delete; 00476 }; 00477 00478 /** 00479 * A configuration defines the servers that are part of the cluster. This class 00480 * does not do any internal locking; it should be accessed only while holding 00481 * the RaftConsensus lock. 00482 */ 00483 class Configuration { 00484 public: 00485 typedef std::shared_ptr<Server> ServerRef; 00486 typedef std::function<bool(Server&)> Predicate; 00487 typedef std::function<uint64_t(Server&)> GetValue; 00488 typedef std::function<void(Server&)> SideEffect; 00489 00490 private: 00491 /** 00492 * A list of servers in which a simple majority constitutes a quorum. 00493 */ 00494 struct SimpleConfiguration { 00495 SimpleConfiguration(); 00496 ~SimpleConfiguration(); 00497 bool all(const Predicate& predicate) const; 00498 bool contains(ServerRef server) const; 00499 void forEach(const SideEffect& sideEffect); 00500 uint64_t min(const GetValue& getValue) const; 00501 bool quorumAll(const Predicate& predicate) const; 00502 uint64_t quorumMin(const GetValue& getValue) const; 00503 std::vector<ServerRef> servers; 00504 }; 00505 00506 public: 00507 /** 00508 * See #state. 00509 */ 00510 enum class State { 00511 /** 00512 * The configuration specifies no servers. Servers that are new to the 00513 * cluster and have empty logs start in this state. 00514 */ 00515 BLANK, 00516 /** 00517 * The configuration specifies a single list of servers: a quorum 00518 * requires any majority of this list. 00519 */ 00520 STABLE, 00521 /** 00522 * The configuration specifies two lists of servers: a quorum requires 00523 * any majority of the first list, but the servers in the second list 00524 * also receive log entries. 00525 */ 00526 STAGING, 00527 /** 00528 * The configuration specifies two lists of servers: a quorum requires 00529 * any majority of the first list and any majority of the second. 00530 */ 00531 TRANSITIONAL, 00532 }; 00533 00534 /** 00535 * Constructor. 00536 */ 00537 Configuration(uint64_t serverId, RaftConsensus& consensus); 00538 00539 /** 00540 * Destructor. 00541 */ 00542 ~Configuration(); 00543 00544 /** 00545 * Apply a function to every known server, including the local, old, new, 00546 * and staging servers. The function will only be called once for each 00547 * server, even if a server exists in more than one of these categories. 00548 */ 00549 void forEach(const SideEffect& sideEffect); 00550 00551 /** 00552 * Return true if the given server may be part of a quorum, false 00553 * otherwise. 00554 */ 00555 bool hasVote(ServerRef server) const; 00556 00557 /** 00558 * Lookup the network addresses for a particular server 00559 * (comma-delimited). 00560 * Returns empty string if not found. 00561 */ 00562 std::string lookupAddress(uint64_t serverId) const; 00563 00564 /** 00565 * Return true if there exists a quorum for which every server satisfies 00566 * the predicate, false otherwise. 00567 */ 00568 bool quorumAll(const Predicate& predicate) const; 00569 00570 /** 00571 * Return the smallest value of any server in the quorum of servers that 00572 * have the largest values. 00573 * \return 00574 * Largest value for which every server in a quorum has a value 00575 * greater than or equal to this one. 0 if the configuration is BLANK. 00576 */ 00577 uint64_t quorumMin(const GetValue& getValue) const; 00578 00579 /** 00580 * Remove the staging servers, if any. Return to the configuration state 00581 * prior to a preceding call to setStagingServers. 00582 */ 00583 void resetStagingServers(); 00584 00585 /** 00586 * Set the state of this object as if it had just been constructed. 00587 */ 00588 void reset(); 00589 00590 /** 00591 * Set the configuration. Any existing staging servers are dropped. 00592 * \param newId 00593 * The log entry ID of the configuration. 00594 * \param newDescription 00595 * The IDs and addresses of the servers in the configuration. If any 00596 * newServers are listed in the description, it is considered 00597 * TRANSITIONAL; otherwise, it is STABLE. 00598 */ 00599 void setConfiguration( 00600 uint64_t newId, 00601 const Protocol::Raft::Configuration& newDescription); 00602 00603 /** 00604 * Add servers that are to mirror the log but that may not have a vote 00605 * (listeners). This can only be called on a STABLE configuration and makes 00606 * it STAGING. 00607 * TODO(ongaro): that might be a sign of a poor interface. descriptions 00608 * should probably have three sets, as john said. 00609 */ 00610 void setStagingServers( 00611 const Protocol::Raft::SimpleConfiguration& stagingServers); 00612 00613 /** 00614 * Return true if every server in the staging set satisfies the predicate, 00615 * false otherwise. 00616 */ 00617 bool stagingAll(const Predicate& predicate) const; 00618 00619 /** 00620 * Return the smallest value of any server in the staging set. 00621 * \return 00622 * Minimum value on any server in the staging set, or 0 if the staging 00623 * set is empty. 00624 */ 00625 uint64_t stagingMin(const GetValue& getValue) const; 00626 00627 /** 00628 * Write the configuration servers' state into the given structure. Used 00629 * for diagnostics. 00630 */ 00631 void updateServerStats(Protocol::ServerStats& serverStats, 00632 Core::Time::SteadyTimeConverter& time) const; 00633 00634 /** 00635 * Print out a State for debugging purposes. 00636 */ 00637 friend std::ostream& operator<<(std::ostream& os, State state); 00638 00639 /** 00640 * Print out a Configuration for debugging purposes. 00641 */ 00642 friend std::ostream& operator<<(std::ostream& os, 00643 const Configuration& configuration); 00644 00645 private: 00646 /** 00647 * If no server by the given ID is known, construct a new one. 00648 * \return 00649 * Return the existing or new server. 00650 * TODO(ongaro): this name and signature is misleading 00651 */ 00652 ServerRef getServer(uint64_t newServerId); 00653 00654 /** 00655 * Used for constructing Server instances. 00656 */ 00657 RaftConsensus& consensus; 00658 00659 /** 00660 * A map from server ID to Server of every server, including the local, 00661 * previous, new, and staging servers. 00662 */ 00663 std::unordered_map<uint64_t, ServerRef> knownServers; 00664 00665 public: 00666 /** 00667 * This server. 00668 */ 00669 std::shared_ptr<LocalServer> localServer; 00670 00671 /** 00672 * Specifies the meaning of #oldServers and #newServers. 00673 */ 00674 State state; 00675 00676 /** 00677 * The ID of the current configuration. This is the same as the entry ID in 00678 * which this configuration's description is written to the log. 00679 */ 00680 uint64_t id; 00681 00682 /** 00683 * A description of the current configuration. 00684 */ 00685 Protocol::Raft::Configuration description; 00686 00687 private: 00688 /** 00689 * A majority of these servers are necessary for a quorum under 00690 * STABLE, STAGING, and TRANSITIONAL configurations. (Under TRANSITIONAL, a 00691 * majority of newServers is also needed.) 00692 */ 00693 SimpleConfiguration oldServers; 00694 00695 /** 00696 * A majority of these servers are necessary for a quorum under 00697 * TRANSITIONAL configurations. Under STAGING configurations, these servers 00698 * receive log entries but do not participate in elections. 00699 */ 00700 SimpleConfiguration newServers; 00701 00702 friend class Invariants; 00703 }; 00704 00705 /** 00706 * Ensures the current configuration reflects the latest state of the log and 00707 * snapshot. 00708 */ 00709 class ConfigurationManager { 00710 public: 00711 /** 00712 * Constructor. 00713 * \param configuration 00714 * The configuration that this object is in charge of setting. 00715 */ 00716 explicit ConfigurationManager(Configuration& configuration); 00717 00718 /** 00719 * Destructor. 00720 */ 00721 ~ConfigurationManager(); 00722 00723 /** 00724 * Called when a new configuration is added to the log. 00725 * \param index 00726 * The log index of this configuration (equivalently, its ID). 00727 * \param description 00728 * The serializable representation of the configuration. 00729 */ 00730 void add(uint64_t index, 00731 const Protocol::Raft::Configuration& description); 00732 /** 00733 * Called when a log prefix is truncated (after saving a snapshot that 00734 * covers this prefix). 00735 * \param firstIndexKept 00736 * The log entries in range [1, firstIndexKept) are being discarded. 00737 */ 00738 void truncatePrefix(uint64_t firstIndexKept); 00739 /** 00740 * Called when a log suffix is truncated (when the leader doesn't agree 00741 * with this server's log). 00742 * \param lastIndexKept 00743 * The log entries in range (lastIndexKept, infinity) are being 00744 * discarded. 00745 */ 00746 void truncateSuffix(uint64_t lastIndexKept); 00747 /** 00748 * Called when a new snapshot is saved. 00749 * Only the latest such configuration is kept. 00750 * \param index 00751 * The log index of this configuration (equivalently, its ID). 00752 * \param description 00753 * The serializable representation of the configuration. 00754 */ 00755 void setSnapshot(uint64_t index, 00756 const Protocol::Raft::Configuration& description); 00757 00758 /** 00759 * Return the configuration as of a particular log index. 00760 * This is useful when taking snapshots. 00761 * \param lastIncludedIndex 00762 * Configurations greater than this index will be ignored. 00763 * \return 00764 * The index and description of the configuration with the largest 00765 * index in the range [1, lastIncludedIndex]. 00766 */ 00767 std::pair<uint64_t, Protocol::Raft::Configuration> 00768 getLatestConfigurationAsOf(uint64_t lastIncludedIndex) const; 00769 00770 private: 00771 00772 /** 00773 * Helper function called after changing this object's state. 00774 * - Make sure the snapshot configuration is in the descriptions map. 00775 * - Set configuration to the configuration with the largest index in the 00776 * descriptions map, or reset it the map is empty. 00777 */ 00778 void restoreInvariants(); 00779 00780 /** 00781 * Defines the servers that are part of the cluster. See Configuration. 00782 */ 00783 Configuration& configuration; 00784 00785 /** 00786 * This contains all the cluster configurations found in the log, plus one 00787 * additional configuration from the latest snapshot. 00788 * 00789 * It is used to find the right configuration when taking a snapshot and 00790 * truncating the end of the log. It must be kept consistent with the log 00791 * when it is loaded, as the log grows, as it gets truncated from the 00792 * beginning for snapshots, and as it gets truncated from the end upon 00793 * conflicts with the leader. 00794 * 00795 * The key is the entry ID where the configuration belongs in the log; the 00796 * value is the serializable form of the configuration. 00797 */ 00798 std::map<uint64_t, Protocol::Raft::Configuration> descriptions; 00799 00800 /** 00801 * This reflects the configuration found in this server's latest snapshot, 00802 * or {0, {}} if this server has no snapshot. 00803 */ 00804 std::pair<uint64_t, Protocol::Raft::Configuration> snapshot; 00805 00806 friend class Invariants; 00807 }; 00808 00809 /** 00810 * This is the rough equivalent of a SteadyClock that can be shared across the 00811 * network with other Raft servers. The cluster time approximately tracks how 00812 * long the cluster has been available with a working leader. 00813 * 00814 * Cluster time is measured in nanoseconds and progresses at about the same 00815 * rate as a normal clock when the cluster is operational. While there's a 00816 * stable leader, the nanoseconds increase according to that leader's 00817 * SteadyClock. When a new leader takes over, it starts ticking from cluster 00818 * time value it finds in its last entry/snapshot, so some cluster time may be 00819 * unaccounted for between the last leader replicating its final entry and then 00820 * losing leadership. 00821 * 00822 * The StateMachine uses cluster time to expire client sessions. Cluster times 00823 * in committed log entries monotonically increase, so the state machine will 00824 * see cluster times monotonically increase. 00825 * 00826 * Before cluster time, client sessions were expired based on the SystemClock. 00827 * That meant that if the SystemClock jumped forwards drastically, all clients 00828 * would expire. That's undesirable, so cluster time was introduced in #90 00829 * ("make client session timeout use monotonic clock") to address this. 00830 */ 00831 class ClusterClock { 00832 public: 00833 /// Default constructor. 00834 ClusterClock(); 00835 00836 /** 00837 * Reset to the given cluster time, assuming it's the current time right 00838 * now. This is used, for example, when a follower gets a new log entry 00839 * (which includes a cluster time) from the leader. 00840 */ 00841 void newEpoch(uint64_t clusterTime); 00842 00843 /** 00844 * Called by leaders to generate a new cluster time for a new log entry. 00845 * This is equivalent to the following but slightly more efficient: 00846 * uint64_t now = c.interpolate(); 00847 * c.newEpoch(now); 00848 * return now; 00849 */ 00850 uint64_t leaderStamp(); 00851 00852 /** 00853 * Return the best approximation of the current cluster time, assuming 00854 * there's been a leader all along. 00855 */ 00856 uint64_t interpolate() const; 00857 00858 /** 00859 * Invariant: this is equal to the cluster time in: 00860 * - the last log entry, if any, or 00861 * - the last snapshot, if any, or 00862 * - 0. 00863 */ 00864 uint64_t clusterTimeAtEpoch; 00865 00866 /** 00867 * The local SteadyClock time when clusterTimeAtEpoch was set. 00868 */ 00869 Core::Time::SteadyClock::time_point localTimeAtEpoch; 00870 }; 00871 00872 } // namespace RaftConsensusInternal 00873 00874 /** 00875 * An implementation of the Raft consensus algorithm. The algorithm is 00876 * described at https://raftconsensus.github.io 00877 * . In brief, Raft divides time into terms and elects a leader at the 00878 * beginning of each term. This election mechanism guarantees that the emerging 00879 * leader has at least all committed log entries. Once a candidate has received 00880 * votes from a quorum, it replicates its own log entries in order to the 00881 * followers. The leader is the only machine that serves client requests. 00882 */ 00883 class RaftConsensus { 00884 public: 00885 typedef RaftConsensusInternal::Invariants Invariants; 00886 typedef RaftConsensusInternal::Server Server; 00887 typedef RaftConsensusInternal::LocalServer LocalServer; 00888 typedef RaftConsensusInternal::Peer Peer; 00889 typedef RaftConsensusInternal::Configuration Configuration; 00890 typedef RaftConsensusInternal::ConfigurationManager ConfigurationManager; 00891 typedef RaftConsensusInternal::ClusterClock ClusterClock; 00892 typedef RaftConsensusInternal::Mutex Mutex; 00893 typedef RaftConsensusInternal::Clock Clock; 00894 typedef RaftConsensusInternal::TimePoint TimePoint; 00895 00896 /** 00897 * This is returned by getNextEntry(). 00898 */ 00899 struct Entry { 00900 /// Default constructor. 00901 Entry(); 00902 /// Move constructor. 00903 Entry(Entry&& other); 00904 /// Destructor. 00905 ~Entry(); 00906 00907 /** 00908 * The Raft log index for this entry (or the last one a snapshot 00909 * covers). Pass this as the lastIndex argument to the next call to 00910 * getNextEntry(). 00911 */ 00912 uint64_t index; 00913 00914 /** 00915 * The type of the entry. 00916 */ 00917 enum { 00918 /** 00919 * This is a normal entry containing a client request for the state 00920 * machine. The 'data' field contains that request, and 00921 * 'snapshotReader' is not set. 00922 */ 00923 DATA, 00924 /** 00925 * This is a snapshot: the state machine should clear its state and 00926 * load in the snapshot. The 'data' field is not set, and the 00927 * 'snapshotReader' should be used to read the snapshot contents 00928 * from. 00929 */ 00930 SNAPSHOT, 00931 /** 00932 * Some entries should be ignored by the state machine (they are 00933 * consumed internally by the consensus module). For client service 00934 * threads to know when a state machine is up-to-date, it's easiest 00935 * for the state machine to get empty entries back for these, and 00936 * simply call back into getNextEntry() again with the next ID, 00937 * Entries of type 'SKIP' will have neither 'data' nor 00938 * 'snapshotReader' set. 00939 */ 00940 SKIP, 00941 } type; 00942 00943 /** 00944 * The client request for entries of type 'DATA'. 00945 */ 00946 Core::Buffer command; 00947 00948 /** 00949 * A handle to the snapshot file for entries of type 'SNAPSHOT'. 00950 */ 00951 std::unique_ptr<Storage::SnapshotFile::Reader> snapshotReader; 00952 00953 /** 00954 * Cluster time when leader created entry/snapshot. This is valid for 00955 * entries of all types. 00956 */ 00957 uint64_t clusterTime; 00958 00959 // copy and assign not allowed 00960 Entry(const Entry&) = delete; 00961 Entry& operator=(const Entry&) = delete; 00962 }; 00963 00964 enum class ClientResult { 00965 /** 00966 * Request completed successfully. 00967 */ 00968 SUCCESS, 00969 /** 00970 * Returned by setConfiguration() if the configuration could not be 00971 * set because the previous configuration was unsuitable or because the 00972 * new servers could not be caught up. 00973 */ 00974 FAIL, 00975 /** 00976 * Returned by getConfiguration() if the configuration is not stable or 00977 * is not committed. The client should wait and retry later. 00978 */ 00979 RETRY, 00980 /** 00981 * Cannot process the request because this server is not leader or 00982 * temporarily lost its leadership. 00983 */ 00984 NOT_LEADER, 00985 }; 00986 00987 /** 00988 * Constructor. 00989 * \param globals 00990 * Handle to LogCabin's top-level objects. 00991 */ 00992 explicit RaftConsensus(Globals& globals); 00993 00994 /** 00995 * Destructor. 00996 */ 00997 ~RaftConsensus(); 00998 00999 /// Initialize. Must be called before any other method. 01000 void init(); 01001 01002 /// Signal the consensus module to exit (shut down threads, etc). 01003 void exit(); 01004 01005 /** 01006 * Initialize the log with a configuration consisting of just this server. 01007 * This should be called just once the very first time the very first 01008 * server in your cluster is started. 01009 * PANICs if any log entries or snapshots already exist. 01010 */ 01011 void bootstrapConfiguration(); 01012 01013 /** 01014 * Get the current leader's active, committed, simple cluster 01015 * configuration. 01016 */ 01017 ClientResult getConfiguration( 01018 Protocol::Raft::SimpleConfiguration& configuration, 01019 uint64_t& id) const; 01020 01021 /** 01022 * Return the most recent entry ID that has been externalized by the 01023 * replicated log. This is used to provide non-stale reads to the state 01024 * machine. 01025 */ 01026 std::pair<ClientResult, uint64_t> getLastCommitIndex() const; 01027 01028 /** 01029 * Return the network address for a recent leader, if known, 01030 * or empty string otherwise. 01031 */ 01032 std::string getLeaderHint() const; 01033 01034 /** 01035 * This returns the entry following lastIndex in the replicated log. Some 01036 * entries may be used internally by the consensus module. These will have 01037 * Entry.hasData set to false. The reason these are exposed to the state 01038 * machine is that the state machine waits to be caught up to the latest 01039 * committed entry in the replicated log sometimes, but if that entry 01040 * was for internal use, it would would otherwise never reach the state 01041 * machine. 01042 * \throw Core::Util::ThreadInterruptedException 01043 * Thread should exit. 01044 */ 01045 Entry getNextEntry(uint64_t lastIndex) const; 01046 01047 /** 01048 * Return statistics that may be useful in deciding when to snapshot. 01049 */ 01050 SnapshotStats::SnapshotStats getSnapshotStats() const; 01051 01052 /** 01053 * Process an AppendEntries RPC from another server. Called by RaftService. 01054 * \param[in] request 01055 * The request that was received from the other server. 01056 * \param[out] response 01057 * Where the reply should be placed. 01058 */ 01059 void handleAppendEntries( 01060 const Protocol::Raft::AppendEntries::Request& request, 01061 Protocol::Raft::AppendEntries::Response& response); 01062 01063 /** 01064 * Process an InstallSnapshot RPC from another server. Called by 01065 * RaftService. 01066 * \param[in] request 01067 * The request that was received from the other server. 01068 * \param[out] response 01069 * Where the reply should be placed. 01070 */ 01071 void handleInstallSnapshot( 01072 const Protocol::Raft::InstallSnapshot::Request& request, 01073 Protocol::Raft::InstallSnapshot::Response& response); 01074 01075 /** 01076 * Process a RequestVote RPC from another server. Called by RaftService. 01077 * \param[in] request 01078 * The request that was received from the other server. 01079 * \param[out] response 01080 * Where the reply should be placed. 01081 */ 01082 void handleRequestVote(const Protocol::Raft::RequestVote::Request& request, 01083 Protocol::Raft::RequestVote::Response& response); 01084 01085 /** 01086 * Submit an operation to the replicated log. 01087 * \param operation 01088 * If the cluster accepts this operation, then it will be added to the 01089 * log and the state machine will eventually apply it. 01090 * \return 01091 * First component is status code. If SUCCESS, second component is the 01092 * log index at which the entry has been committed to the replicated 01093 * log. 01094 */ 01095 std::pair<ClientResult, uint64_t> replicate(const Core::Buffer& operation); 01096 01097 /** 01098 * Change the cluster's configuration. 01099 * Returns successfully once operation completed and old servers are no 01100 * longer needed. 01101 * \return 01102 * NOT_LEADER, or other code with response filled in. 01103 */ 01104 ClientResult 01105 setConfiguration( 01106 const Protocol::Client::SetConfiguration::Request& request, 01107 Protocol::Client::SetConfiguration::Response& response); 01108 01109 /** 01110 * Register which versions of client commands/behavior the local state 01111 * machine supports. Invoked just once on boot (though calling this 01112 * multiple times is safe). This information is used to support upgrades to 01113 * the running replicated state machine version, and it is transmitted to 01114 * other servers as needed. See #stateMachineUpdaterThreadMain. 01115 * \param minSupported 01116 * The smallest version the local state machine can support. 01117 * \param maxSupported 01118 * The largest version the local state machine can support. 01119 */ 01120 void 01121 setSupportedStateMachineVersions(uint16_t minSupported, 01122 uint16_t maxSupported); 01123 01124 /** 01125 * Start taking a snapshot. Called by the state machine when it wants to 01126 * take a snapshot. 01127 * \param lastIncludedIndex 01128 * The snapshot will cover log entries in the range 01129 * [1, lastIncludedIndex]. 01130 * lastIncludedIndex must be committed (must have been previously 01131 * returned by #getNextEntry()). 01132 * \return 01133 * A file the state machine can dump its snapshot into. 01134 */ 01135 std::unique_ptr<Storage::SnapshotFile::Writer> 01136 beginSnapshot(uint64_t lastIncludedIndex); 01137 01138 /** 01139 * Complete taking a snapshot for the log entries in range [1, 01140 * lastIncludedIndex]. Called by the state machine when it is done taking a 01141 * snapshot. 01142 * \param lastIncludedIndex 01143 * The snapshot will cover log entries in the range 01144 * [1, lastIncludedIndex]. 01145 * \param writer 01146 * A writer that has not yet been saved: the consensus module may 01147 * have to discard the snapshot in case it's gotten a better snapshot 01148 * from another server. If this snapshot is to be saved (normal case), 01149 * the consensus module will call save() on it. 01150 */ 01151 void snapshotDone(uint64_t lastIncludedIndex, 01152 std::unique_ptr<Storage::SnapshotFile::Writer> writer); 01153 01154 /** 01155 * Add information about the consensus state to the given structure. 01156 */ 01157 void updateServerStats(Protocol::ServerStats& serverStats) const; 01158 01159 /** 01160 * Print out the contents of this class for debugging purposes. 01161 */ 01162 friend std::ostream& operator<<(std::ostream& os, 01163 const RaftConsensus& raft); 01164 01165 private: 01166 /** 01167 * See #state. 01168 */ 01169 enum class State { 01170 /** 01171 * A follower does not initiate RPCs. It becomes a candidate with 01172 * startNewElection() when a timeout elapses without hearing from a 01173 * candidate/leader. This is the initial state for servers when they 01174 * start up. 01175 */ 01176 FOLLOWER, 01177 01178 /** 01179 * A candidate sends RequestVote RPCs in an attempt to become a leader. 01180 * It steps down to be a follower if it discovers a current leader, and 01181 * it becomes leader if it collects votes from a quorum. 01182 */ 01183 CANDIDATE, 01184 01185 /** 01186 * A leader sends AppendEntries RPCs to replicate its log onto followers. 01187 * It also sends heartbeats periodically during periods of inactivity 01188 * to delay its followers from becoming candidates. It steps down to be 01189 * a follower if it discovers a server with a higher term, if it can't 01190 * communicate with a quorum, or if it is not part of the latest 01191 * committed configuration. 01192 */ 01193 LEADER, 01194 }; 01195 01196 01197 //// The following private methods MUST acquire the lock. 01198 01199 /** 01200 * Flush log entries to stable storage in the background on leaders. 01201 * Once they're flushed, it tries to advance the #commitIndex. 01202 * This is the method that #leaderDiskThread executes. 01203 */ 01204 void leaderDiskThreadMain(); 01205 01206 /** 01207 * Start new elections when it's time to do so. This is the method that 01208 * #timerThread executes. 01209 */ 01210 void timerThreadMain(); 01211 01212 /** 01213 * Initiate RPCs to a specific server as necessary. 01214 * One thread for each remote server calls this method (see Peer::thread). 01215 */ 01216 void peerThreadMain(std::shared_ptr<Peer> peer); 01217 01218 /** 01219 * Append advance state machine version entries to the log as leader once 01220 * all servers can support a new state machine version. 01221 */ 01222 void stateMachineUpdaterThreadMain(); 01223 01224 /** 01225 * Return to follower state when, as leader, this server is not able to 01226 * communicate with a quorum. This helps two things in cases where a quorum 01227 * is not available to this leader but clients can still communicate with 01228 * the leader. First, it returns to clients in a timely manner so that they 01229 * can try to find another current leader, if one exists. Second, it frees 01230 * up the resources associated with those client's RPCs on the server. 01231 * This is the method that #stepDownThread executes. 01232 */ 01233 void stepDownThreadMain(); 01234 01235 01236 //// The following private methods MUST NOT acquire the lock. 01237 01238 01239 /** 01240 * Move forward #commitIndex if possible. Called only on leaders after 01241 * receiving RPC responses and flushing entries to disk. If commitIndex 01242 * changes, this will notify #stateChanged. It will also change the 01243 * configuration or step down due to a configuration change when 01244 * appropriate. 01245 * 01246 * #commitIndex can jump by more than 1 on new leaders, since their 01247 * #commitIndex may be well out of date until they figure out which log 01248 * entries their followers have. 01249 * 01250 * \pre 01251 * state is LEADER. 01252 */ 01253 void advanceCommitIndex(); 01254 01255 /** 01256 * Append entries to the log, set the configuration if this contains a 01257 * configuration entry, and notify #stateChanged. 01258 */ 01259 void append(const std::vector<const Storage::Log::Entry*>& entries); 01260 01261 /** 01262 * Send an AppendEntries RPC to the server (either a heartbeat or containing 01263 * an entry to replicate). 01264 * \param lockGuard 01265 * Used to temporarily release the lock while invoking the RPC, so as 01266 * to allow for some concurrency. 01267 * \param peer 01268 * State used in communicating with the follower, building the RPC 01269 * request, and processing its result. 01270 */ 01271 void appendEntries(std::unique_lock<Mutex>& lockGuard, Peer& peer); 01272 01273 /** 01274 * Send an InstallSnapshot RPC to the server (containing part of a 01275 * snapshot file to replicate). 01276 * \param lockGuard 01277 * Used to temporarily release the lock while invoking the RPC, so as 01278 * to allow for some concurrency. 01279 * \param peer 01280 * State used in communicating with the follower, building the RPC 01281 * request, and processing its result. 01282 */ 01283 void installSnapshot(std::unique_lock<Mutex>& lockGuard, Peer& peer); 01284 01285 /** 01286 * Transition to being a leader. This is called when a candidate has 01287 * received votes from a quorum. 01288 */ 01289 void becomeLeader(); 01290 01291 /** 01292 * Remove the prefix of the log that is redundant with this server's 01293 * snapshot. 01294 */ 01295 void discardUnneededEntries(); 01296 01297 /** 01298 * Return the term corresponding to log->getLastLogIndex(). This may come 01299 * from the log, from the snapshot, or it may be 0. 01300 */ 01301 uint64_t getLastLogTerm() const; 01302 01303 /** 01304 * Notify the #stateChanged condition variable and cancel all current RPCs. 01305 * This should be called when stepping down, starting a new election, 01306 * becoming leader, or exiting. 01307 */ 01308 void interruptAll(); 01309 01310 /** 01311 * Helper for #appendEntries() to put the right number of entries into the 01312 * request. 01313 * \param nextIndex 01314 * First entry to send to the follower. 01315 * \param request 01316 * AppendEntries request ProtoBuf in which to pack the entries. 01317 * \return 01318 * Number of entries in the request. 01319 */ 01320 uint64_t 01321 packEntries(uint64_t nextIndex, 01322 Protocol::Raft::AppendEntries::Request& request) const; 01323 01324 /** 01325 * Try to read the latest good snapshot from disk. Loads the header of the 01326 * snapshot file, which is used internally by the consensus module. The 01327 * rest of the file reader is kept in #snapshotReader for the state machine 01328 * to process upon a future getNextEntry(). 01329 * 01330 * If the snapshot file on disk is no good, #snapshotReader will remain 01331 * NULL. 01332 */ 01333 void readSnapshot(); 01334 01335 /** 01336 * Append an entry to the log and wait for it to be committed. 01337 */ 01338 std::pair<ClientResult, uint64_t> 01339 replicateEntry(Storage::Log::Entry& entry, 01340 std::unique_lock<Mutex>& lockGuard); 01341 01342 /** 01343 * Send a RequestVote RPC to the server. This is used by candidates to 01344 * request a server's vote and by new leaders to retrieve information about 01345 * the server's log. 01346 * \param lockGuard 01347 * Used to temporarily release the lock while invoking the RPC, so as 01348 * to allow for some concurrency. 01349 * \param peer 01350 * State used in communicating with the follower, building the RPC 01351 * request, and processing its result. 01352 */ 01353 void requestVote(std::unique_lock<Mutex>& lockGuard, Peer& peer); 01354 01355 /** 01356 * Dumps serverId, currentTerm, state, leaderId, and votedFor to the debug 01357 * log. This is intended to be easy to grep and parse. 01358 */ 01359 void printElectionState() const; 01360 01361 /** 01362 * Set the timer to start a new election and notify #stateChanged. 01363 * The timer is set for ELECTION_TIMEOUT plus some random jitter from 01364 * now. 01365 */ 01366 void setElectionTimer(); 01367 01368 /** 01369 * Transitions to being a candidate from being a follower or candidate. 01370 * This is called when a timeout elapses. If the configuration is blank, it 01371 * does nothing. Moreover, if this server forms a quorum (it is the only 01372 * server in the configuration), this will immediately transition to 01373 * leader. 01374 */ 01375 void startNewElection(); 01376 01377 /** 01378 * Transition to being a follower. This is called when we 01379 * receive an RPC request with newer term, receive an RPC response 01380 * indicating our term is stale, or discover a current leader while a 01381 * candidate. In this last case, newTerm will be the same as currentTerm. 01382 * This will call setElectionTimer for you if no election timer is 01383 * currently set. 01384 */ 01385 void stepDown(uint64_t newTerm); 01386 01387 /** 01388 * Persist critical state, such as the term and the vote, to stable 01389 * storage. 01390 */ 01391 void updateLogMetadata(); 01392 01393 /** 01394 * Return true if every entry that might have already been marked committed 01395 * on any leader is marked committed on this leader by the time this call 01396 * returns. 01397 * This is used to provide non-stale read operations to 01398 * clients. It gives up after ELECTION_TIMEOUT, since stepDownThread 01399 * will return to the follower state after that time. 01400 */ 01401 bool upToDateLeader(std::unique_lock<Mutex>& lockGuard) const; 01402 01403 /** 01404 * Print out a ClientResult for debugging purposes. 01405 */ 01406 friend std::ostream& operator<<(std::ostream& os, 01407 ClientResult clientResult); 01408 01409 /** 01410 * Print out a State for debugging purposes. 01411 */ 01412 friend std::ostream& operator<<(std::ostream& os, State state); 01413 01414 /** 01415 * A follower waits for about this much inactivity before becoming a 01416 * candidate and starting a new election. 01417 */ 01418 const std::chrono::nanoseconds ELECTION_TIMEOUT; 01419 01420 /** 01421 * A leader sends RPCs at least this often, even if there is no data to 01422 * send. 01423 */ 01424 const std::chrono::nanoseconds HEARTBEAT_PERIOD; 01425 01426 /** 01427 * A leader will pack at most this many entries into an AppendEntries 01428 * request message. This helps bound processing time when entries are very 01429 * small in size. 01430 * Const except for unit tests. 01431 */ 01432 uint64_t MAX_LOG_ENTRIES_PER_REQUEST; 01433 01434 /** 01435 * A candidate or leader waits this long after an RPC fails before sending 01436 * another one, so as to not overwhelm the network with retries. 01437 */ 01438 const std::chrono::nanoseconds RPC_FAILURE_BACKOFF; 01439 01440 /** 01441 * How long the state machine updater thread should sleep if: 01442 * - The servers do not currently support a common version, or 01443 * - This server has not yet received version information from all other 01444 * servers, or 01445 * - An advance state machine entry failed to commit (probably due to lost 01446 * leadership). 01447 */ 01448 const std::chrono::nanoseconds STATE_MACHINE_UPDATER_BACKOFF; 01449 01450 /** 01451 * Prefer to keep RPC requests under this size. 01452 * Const except for unit tests. 01453 */ 01454 uint64_t SOFT_RPC_SIZE_LIMIT; 01455 01456 public: 01457 /** 01458 * This server's unique ID. Not available until init() is called. 01459 */ 01460 uint64_t serverId; 01461 01462 /** 01463 * The addresses that this server is listening on. Not available until 01464 * init() is called. 01465 */ 01466 std::string serverAddresses; 01467 01468 private: 01469 01470 /** 01471 * The LogCabin daemon's top-level objects. 01472 */ 01473 Globals& globals; 01474 01475 /** 01476 * Where the files for the log and snapshots are stored. 01477 */ 01478 Storage::Layout storageLayout; 01479 01480 /** 01481 * Used to create new sessions. 01482 */ 01483 Client::SessionManager sessionManager; 01484 01485 /** 01486 * This class behaves mostly like a monitor. This protects all the state in 01487 * this class and almost all of the Peer class (with some 01488 * documented exceptions). 01489 */ 01490 mutable Mutex mutex; 01491 01492 /** 01493 * Notified when basically anything changes. Specifically, this is notified 01494 * when any of the following events occur: 01495 * - term changes. 01496 * - state changes. 01497 * - log changes. 01498 * - commitIndex changes. 01499 * - exiting is set. 01500 * - numPeerThreads is decremented. 01501 * - configuration changes. 01502 * - startElectionAt changes (see note under startElectionAt). 01503 * - an acknowledgement from a peer is received. 01504 * - a server goes from not caught up to caught up. 01505 * - a heartbeat is scheduled. 01506 * TODO(ongaro): Should there be multiple condition variables? This one is 01507 * used by a lot of threads for a lot of different conditions. 01508 */ 01509 mutable Core::ConditionVariable stateChanged; 01510 01511 /** 01512 * Set to true when this class is about to be destroyed. When this is true, 01513 * threads must exit right away and no more RPCs should be sent or 01514 * processed. 01515 */ 01516 bool exiting; 01517 01518 /** 01519 * The number of Peer::thread threads that are still using this 01520 * RaftConsensus object. When they exit, they decrement this and notify 01521 * #stateChanged. 01522 */ 01523 uint32_t numPeerThreads; 01524 01525 /** 01526 * Provides all storage for this server. Keeps track of all log entries and 01527 * some additional metadata. 01528 * 01529 * If you modify this, be sure to keep #configurationManager consistent. 01530 */ 01531 std::unique_ptr<Storage::Log> log; 01532 01533 /** 01534 * Flag to indicate that #leaderDiskThreadMain should flush recent log 01535 * writes to stable storage. This is always false for followers and 01536 * candidates and is only used for leaders. 01537 * 01538 * When a server steps down, it waits for all syncs to complete, that way 01539 * followers can assume that all of their log entries are durable when 01540 * replying to leaders. 01541 */ 01542 bool logSyncQueued; 01543 01544 /** 01545 * Used for stepDown() to wait on #leaderDiskThread without releasing 01546 * #mutex. This is true while #leaderDiskThread is writing to disk. It's 01547 * set to true while holding #mutex; set to false without #mutex. 01548 */ 01549 std::atomic<bool> leaderDiskThreadWorking; 01550 01551 /** 01552 * Defines the servers that are part of the cluster. See Configuration. 01553 */ 01554 std::unique_ptr<Configuration> configuration; 01555 01556 /** 01557 * Ensures that 'configuration' reflects the latest state of the log and 01558 * snapshot. 01559 */ 01560 std::unique_ptr<ConfigurationManager> configurationManager; 01561 01562 /** 01563 * The latest term this server has seen. This value monotonically increases 01564 * over time. It gets updated in stepDown(), startNewElection(), and when a 01565 * candidate receives a vote response with a newer term. 01566 * \warning 01567 * After setting this value, you must call updateLogMetadata() to 01568 * persist it. 01569 */ 01570 uint64_t currentTerm; 01571 01572 /** 01573 * The server's current role in the cluster (follower, candidate, or 01574 * leader). See #State. 01575 */ 01576 State state; 01577 01578 /** 01579 * The latest good snapshot covers entries 1 through 'lastSnapshotIndex' 01580 * (inclusive). It is known that these are committed. They are safe to 01581 * remove from the log, but it may be advantageous to keep them around for 01582 * a little while (to avoid shipping snapshots to straggling followers). 01583 * Thus, the log may or may not have some of the entries in this range. 01584 */ 01585 uint64_t lastSnapshotIndex; 01586 01587 /** 01588 * The term of the last entry covered by the latest good snapshot, or 0 if 01589 * we have no snapshot. 01590 */ 01591 uint64_t lastSnapshotTerm; 01592 01593 /** 01594 * The cluster time of the last entry covered by the latest good snapshot, 01595 * or 0 if we have no snapshot. 01596 */ 01597 uint64_t lastSnapshotClusterTime; 01598 01599 /** 01600 * The size of the latest good snapshot in bytes, or 0 if we have no 01601 * snapshot. 01602 */ 01603 uint64_t lastSnapshotBytes; 01604 01605 /** 01606 * If not NULL, this is a Storage::SnapshotFile::Reader that covers up through 01607 * lastSnapshotIndex. This is ready for the state machine to process and is 01608 * returned to the state machine in getNextEntry(). It's just a cache which 01609 * can be repopulated with readSnapshot(). 01610 */ 01611 mutable std::unique_ptr<Storage::SnapshotFile::Reader> snapshotReader; 01612 01613 /** 01614 * This is used in handleInstallSnapshot when receiving a snapshot from 01615 * the current leader. The leader is assumed to send at most one snapshot 01616 * at a time, and any partial snapshots here are discarded when the term 01617 * changes. 01618 */ 01619 std::unique_ptr<Storage::SnapshotFile::Writer> snapshotWriter; 01620 01621 /** 01622 * The largest entry ID for which a quorum is known to have stored the same 01623 * entry as this server has. Entries 1 through commitIndex as stored in 01624 * this server's log are guaranteed to never change. This value will 01625 * monotonically increase over time. 01626 */ 01627 uint64_t commitIndex; 01628 01629 /** 01630 * The server ID of the leader for this term. This is used to help point 01631 * clients to the right server. The special value 0 means either there is 01632 * no leader for this term yet or this server does not know who it is yet. 01633 */ 01634 uint64_t leaderId; 01635 01636 /** 01637 * The server ID that this server voted for during this term's election, if 01638 * any. The special value 0 means no vote has been given out during this 01639 * term. 01640 * \warning 01641 * After setting this value, you must call updateLogMetadata() to 01642 * persist it. 01643 */ 01644 uint64_t votedFor; 01645 01646 /** 01647 * A logical clock used to confirm leadership and connectivity. 01648 */ 01649 // TODO(ongaro): rename, explain more 01650 mutable uint64_t currentEpoch; 01651 01652 /** 01653 * Tracks the passage of "cluster time". See ClusterClock. 01654 */ 01655 ClusterClock clusterClock; 01656 01657 /** 01658 * The earliest time at which #timerThread should begin a new election 01659 * with startNewElection(). 01660 * 01661 * It is safe for increases to startElectionAt to not notify the condition 01662 * variable. Decreases to this value, however, must notify the condition 01663 * variable to make sure the timerThread gets woken in a timely manner. 01664 * Unfortunately, startElectionAt does not monotonically increase because 01665 * of the random jitter that is applied to the follower timeout, and it 01666 * would reduce the jitter's effectiveness for the thread to wait as long 01667 * as the largest startElectionAt value. 01668 */ 01669 TimePoint startElectionAt; 01670 01671 /** 01672 * The earliest time at which RequestVote messages should be processed. 01673 * Until this time, they are rejected, as processing them risks 01674 * causing the cluster leader to needlessly step down. 01675 * For more motivation, see the "disruptive servers" issue in membership 01676 * changes described in the Raft paper/thesis. 01677 * 01678 * This is set to the current time + an election timeout when a heartbeat 01679 * is received, and it's set to infinity for leaders (who begin processing 01680 * RequestVote messages again immediately when they step down). 01681 */ 01682 TimePoint withholdVotesUntil; 01683 01684 /** 01685 * The total number of entries ever truncated from the end of the log. 01686 * This happens only when a new leader tells this server to remove 01687 * extraneous uncommitted entries from its log. 01688 */ 01689 uint64_t numEntriesTruncated; 01690 01691 /** 01692 * The thread that executes leaderDiskThreadMain() to flush log entries to 01693 * stable storage in the background on leaders. 01694 */ 01695 std::thread leaderDiskThread; 01696 01697 /** 01698 * The thread that executes timerThreadMain() to begin new elections 01699 * after periods of inactivity. 01700 */ 01701 std::thread timerThread; 01702 01703 /** 01704 * The thread that executes stateMachineUpdaterThreadMain() to append 01705 * advance state machine version entries to the log on leaders. 01706 */ 01707 std::thread stateMachineUpdaterThread; 01708 01709 /** 01710 * The thread that executes stepDownThreadMain() to return to the follower 01711 * state if the leader becomes disconnected from a quorum of servers. 01712 */ 01713 std::thread stepDownThread; 01714 01715 Invariants invariants; 01716 01717 friend class RaftConsensusInternal::LocalServer; 01718 friend class RaftConsensusInternal::Peer; 01719 friend class RaftConsensusInternal::Invariants; 01720 }; 01721 01722 } // namespace LogCabin::Server 01723 } // namespace LogCabin 01724 01725 #endif /* LOGCABIN_SERVER_RAFTCONSENSUS_H */