LogCabin
Server/RaftConsensus.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <chrono>
00018 #include <deque>
00019 #include <functional>
00020 #include <memory>
00021 #include <thread>
00022 #include <unordered_map>
00023 
00024 #include "build/Protocol/Client.pb.h"
00025 #include "build/Protocol/Raft.pb.h"
00026 #include "build/Protocol/ServerStats.pb.h"
00027 #include "build/Server/SnapshotStats.pb.h"
00028 #include "Client/SessionManager.h"
00029 #include "Core/CompatAtomic.h"
00030 #include "Core/ConditionVariable.h"
00031 #include "Core/Mutex.h"
00032 #include "Core/Time.h"
00033 #include "RPC/ClientRPC.h"
00034 #include "Storage/Layout.h"
00035 #include "Storage/Log.h"
00036 #include "Storage/SnapshotFile.h"
00037 
00038 #ifndef LOGCABIN_SERVER_RAFTCONSENSUS_H
00039 #define LOGCABIN_SERVER_RAFTCONSENSUS_H
00040 
00041 namespace LogCabin {
00042 
00043 // forward declarations
00044 namespace Event {
00045 class Loop;
00046 }
00047 namespace RPC {
00048 class ClientSession;
00049 }
00050 
00051 namespace Server {
00052 
00053 // forward declaration
00054 class Globals;
00055 
00056 // forward declaration
00057 class RaftConsensus;
00058 
00059 namespace RaftConsensusInternal {
00060 
00061 
00062 class Invariants {
00063   public:
00064     explicit Invariants(RaftConsensus&);
00065     ~Invariants();
00066     void checkAll();
00067   private:
00068     void checkBasic();
00069     void checkPeerBasic();
00070     void checkDelta();
00071     void checkPeerDelta();
00072 
00073     const RaftConsensus& consensus;
00074     uint64_t errors;
00075     struct ConsensusSnapshot;
00076     std::unique_ptr<ConsensusSnapshot> previous;
00077 };
00078 
00079 
00080 /**
00081  * True if this should actually spawn threads, false otherwise.
00082  * Normally set to true, but many unit tests set this to false.
00083  */
00084 extern bool startThreads;
00085 
00086 /**
00087  * Reads the current time. This will refer to the best clock available on our
00088  * system, which may or may not be monotonic.
00089  */
00090 typedef LogCabin::Core::Time::SteadyClock Clock;
00091 
00092 /**
00093  * Some point in time relative to the Clock's epoch.
00094  */
00095 typedef Clock::time_point TimePoint;
00096 
00097 typedef Core::Mutex Mutex;
00098 
00099 /**
00100  * A base class for known servers in the cluster, including this process (see
00101  * LocalServer) and others (see Peer). This tracks various bits of state for
00102  * each server, which is used when we are a candidate or leader. This class
00103  * does not do any internal locking; it should be accessed only while holding
00104  * the RaftConsensus lock.
00105  */
00106 class Server {
00107   public:
00108     /**
00109      * Constructor.
00110      */
00111     explicit Server(uint64_t serverId);
00112     /**
00113      * Destructor.
00114      */
00115     virtual ~Server();
00116     /**
00117      * Begin requesting the Server's vote in the current election. Return
00118      * immediately. The condition variable in RaftConsensus will be notified
00119      * separately.
00120      */
00121     virtual void beginRequestVote() = 0;
00122     /**
00123      * Begin replicating to the Server in the current term. Return
00124      * immediately. The condition variable in RaftConsensus will be notified
00125      * separately.
00126      */
00127     virtual void beginLeadership() = 0;
00128     /**
00129      * Inform any threads belonging to this Server to exit. Return immediately.
00130      * The condition variable in RaftConsensus will be notified separately.
00131      */
00132     virtual void exit() = 0;
00133     /**
00134      * Return the latest time this Server acknowledged our current term.
00135      */
00136     virtual uint64_t getLastAckEpoch() const = 0;
00137     /**
00138      * Return the largest entry ID for which this Server is known to share the
00139      * same entries up to and including this entry with our log.
00140      * This is used for advancing the leader's commitIndex.
00141      * Monotonically increases within a term.
00142      *
00143      * \warning
00144      *      Only valid when we're leader.
00145      */
00146     virtual uint64_t getMatchIndex() const = 0;
00147     /**
00148      * Return true if this Server has awarded us its vote for this term.
00149      */
00150     virtual bool haveVote() const = 0;
00151     /**
00152      * Cancel any outstanding RPCs to this Server.
00153      * The condition variable in RaftConsensus will be notified separately.
00154      */
00155     virtual void interrupt() = 0;
00156     /**
00157      * Return true once this Server is ready to be added to the cluster. This
00158      * means it has received enough of our log to where it is not expected to
00159      * cause an availability problem when added to the cluster configuration.
00160      * Should monotonically change from false to true.
00161      */
00162     virtual bool isCaughtUp() const = 0;
00163     /**
00164      * Make the next heartbeat RPC happen soon. Return immediately.
00165      * The condition variable in RaftConsensus will be notified separately.
00166      */
00167     virtual void scheduleHeartbeat() = 0;
00168     /**
00169      * Write this Server's state into the given structure. Used for
00170      * diagnostics.
00171      */
00172     virtual void
00173     updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats,
00174                     Core::Time::SteadyTimeConverter& time) const = 0;
00175 
00176     /**
00177      * Print out a Server for debugging purposes.
00178      */
00179     friend std::ostream& operator<<(std::ostream& os, const Server& server);
00180 
00181     /**
00182      * Virtual method for operator<<.
00183      */
00184     virtual std::ostream& dumpToStream(std::ostream& os) const = 0;
00185 
00186     /**
00187      * The ID of this server.
00188      */
00189     const uint64_t serverId;
00190     /**
00191      * The network addresses at which this server may be available
00192      * (comma-delimited)
00193      */
00194     std::string addresses;
00195 
00196     /**
00197      * If true, minStateMachineVersion and maxStateMachineVersion are set
00198      * (although they may be stale).
00199      */
00200     bool haveStateMachineSupportedVersions;
00201     /**
00202      * If haveStateMachineSupportedVersions is true, the smallest version of
00203      * the state machine commands/behavior that the server can support.
00204      */
00205     uint16_t minStateMachineVersion;
00206     /**
00207      * If haveStateMachineSupportedVersions is true, the largest version of
00208      * the state machine commands/behavior that the server can support.
00209      */
00210     uint16_t maxStateMachineVersion;
00211 
00212     /**
00213      * Used internally by Configuration for garbage collection.
00214      */
00215     bool gcFlag;
00216 };
00217 
00218 
00219 /**
00220  * A type of Server for the local process. There will only be one instance of
00221  * this class. Most of these methods don't do much, but they are needed to
00222  * satisfy the Server interface.
00223  */
00224 class LocalServer : public Server {
00225   public:
00226     LocalServer(uint64_t serverId, RaftConsensus& consensus);
00227     ~LocalServer();
00228     void exit();
00229     void beginRequestVote();
00230     void beginLeadership();
00231     uint64_t getMatchIndex() const;
00232     bool haveVote() const;
00233     uint64_t getLastAckEpoch() const;
00234     void interrupt();
00235     bool isCaughtUp() const;
00236     void scheduleHeartbeat();
00237     std::ostream& dumpToStream(std::ostream& os) const;
00238     void updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats,
00239                          Core::Time::SteadyTimeConverter& time) const;
00240     RaftConsensus& consensus;
00241     /**
00242      * The index of the last log entry that has been flushed to disk.
00243      * Valid for leaders only. Returned by getMatchIndex() and used to
00244      * advance the leader's commitIndex.
00245      */
00246     uint64_t lastSyncedIndex;
00247 };
00248 
00249 /**
00250  * Represents another server in the cluster. One of these exists for each other
00251  * server. In addition to tracking state for each other server, this class
00252  * provides a thread that executes RaftConsensus::peerThreadMain().
00253  *
00254  * This class has no internal locking: in general, the RaftConsensus lock
00255  * should be held when accessing this class, but there are some exceptions
00256  * noted below.
00257  */
00258 class Peer : public Server {
00259   public:
00260     /**
00261      * Constructor.
00262      */
00263     Peer(uint64_t serverId, RaftConsensus& consensus);
00264 
00265     /**
00266      * Destructor.
00267      */
00268     ~Peer();
00269 
00270     // Methods implemented from Server interface.
00271     void beginRequestVote();
00272     void beginLeadership();
00273     void exit();
00274     uint64_t getLastAckEpoch() const;
00275     uint64_t getMatchIndex() const;
00276     bool haveVote() const;
00277     bool isCaughtUp() const;
00278     void interrupt();
00279     void scheduleHeartbeat();
00280 
00281     /**
00282      * Returned by callRPC().
00283      */
00284     enum class CallStatus {
00285         /**
00286          * The RPC succeeded and the response was filled in.
00287          */
00288         OK,
00289         /**
00290          * No reply was received from the server. Maybe the connection was
00291          * dropped; maybe the RPC was canceled.
00292          */
00293         FAILED,
00294         /**
00295          * The server does not support this RPC or didn't like the arguments
00296          * given.
00297          */
00298         INVALID_REQUEST,
00299     };
00300 
00301     /**
00302      * Execute a remote procedure call on the server's RaftService. As this
00303      * operation might take a while, it should be called without RaftConsensus
00304      * lock.
00305      * \param[in] opCode
00306      *      The RPC opcode to execute (see Protocol::Raft::OpCode).
00307      * \param[in] request
00308      *      The request that was received from the other server.
00309      * \param[out] response
00310      *      Where the reply should be placed, if status is OK.
00311      * \param[in] lockGuard
00312      *      The Raft lock, which is released internally to allow for I/O
00313      *      concurrency.
00314      * \return
00315      *      See CallStatus.
00316      */
00317     CallStatus
00318     callRPC(Protocol::Raft::OpCode opCode,
00319             const google::protobuf::Message& request,
00320             google::protobuf::Message& response,
00321             std::unique_lock<Mutex>& lockGuard);
00322 
00323     /**
00324      * Launch this Peer's thread, which should run
00325      * RaftConsensus::peerThreadMain.
00326      * \param self
00327      *      A shared_ptr to this object, which the detached thread uses to make
00328      *      sure this object doesn't go away.
00329      */
00330     void startThread(std::shared_ptr<Peer> self);
00331     std::ostream& dumpToStream(std::ostream& os) const;
00332     void updatePeerStats(Protocol::ServerStats::Raft::Peer& peerStats,
00333                          Core::Time::SteadyTimeConverter& time) const;
00334 
00335   private:
00336 
00337     /**
00338      * Get the current session for this server. (This is cached in the #session
00339      * member for efficiency.) As this operation might take a while, it should
00340      * be called without RaftConsensus lock.
00341      */
00342     std::shared_ptr<RPC::ClientSession>
00343     getSession(std::unique_lock<Mutex>& lockGuard);
00344 
00345   public:
00346 
00347     /**
00348      * Used in startThread.
00349      * TODO(ongaro): reconsider
00350      */
00351     RaftConsensus& consensus;
00352 
00353     /**
00354      * A reference to the server's event loop, needed to construct new
00355      * sessions.
00356      */
00357     Event::Loop& eventLoop;
00358 
00359     /**
00360      * Set to true when thread should exit.
00361      */
00362     bool exiting;
00363 
00364     /**
00365      * Set to true if the server has responded to our RequestVote request in
00366      * the current term, false otherwise.
00367      */
00368     bool requestVoteDone;
00369 
00370     /**
00371      * See #haveVote().
00372      */
00373     bool haveVote_;
00374 
00375     /**
00376      * Indicates that the leader and the follower aren't necessarily
00377      * synchronized. The leader should not send large amounts of data (with
00378      * many log entries or large chunks of a snapshot file) to the follower
00379      * while this flag is true. For example, the follower might have been
00380      * disconnected, or the leader might not know where the follower's log
00381      * diverges from its own. It's better to sync up using small RPCs like
00382      * heartbeats, then begin/resume sending bulk data after receiving an
00383      * acknowledgment.
00384      *
00385      * Only used when leader.
00386      */
00387     bool suppressBulkData;
00388 
00389     /**
00390      * The index of the next entry to send to the follower. Only used when
00391      * leader. Minimum value of 1.
00392      */
00393     uint64_t nextIndex;
00394 
00395     /**
00396      * See #getMatchIndex().
00397      */
00398     uint64_t matchIndex;
00399 
00400     /**
00401      * See #getLastAckEpoch().
00402      */
00403     uint64_t lastAckEpoch;
00404 
00405     /**
00406      * When the next heartbeat should be sent to the follower.
00407      * Only valid while we're leader. The leader sends heartbeats periodically
00408      * if it has no new data to send, to stop the follower from starting a new
00409      * election.
00410      * \invariant
00411      *      This is never more than HEARTBEAT_PERIOD in the future, since
00412      *      new leaders don't reset it.
00413      */
00414     TimePoint nextHeartbeatTime;
00415 
00416     /**
00417      * The minimum time at which the next RPC should be sent.
00418      * Only valid while we're a candidate or leader. This is set when an RPC
00419      * fails so as to not overwhelm the network with retries (some RPCs fail
00420      * without timing out, for example if the remote kernel rejects the
00421      * connection).
00422      */
00423     TimePoint backoffUntil;
00424 
00425     /**
00426      * Counts RPC failures to issue fewer warnings.
00427      * Accessed only from callRPC() without holding the lock.
00428      */
00429     uint64_t rpcFailuresSinceLastWarning;
00430 
00431     // Used for isCaughtUp. TODO(ongaro): doc precisely
00432     uint64_t lastCatchUpIterationMs;
00433     TimePoint thisCatchUpIterationStart;
00434     uint64_t thisCatchUpIterationGoalId;
00435 
00436     /**
00437      * See #isCaughtUp().
00438      */
00439     bool isCaughtUp_;
00440 
00441     /**
00442      * A snapshot file to be sent to the follower, or NULL.
00443      * TODO(ongaro): It'd be better to destroy this as soon as this server
00444      * steps down, but peers don't have a hook for that right now.
00445      */
00446     std::unique_ptr<Storage::FilesystemUtil::FileContents> snapshotFile;
00447     /**
00448      * The number of bytes of 'snapshotFile' that have been acknowledged by the
00449      * follower already. Send starting here next time.
00450      */
00451     uint64_t snapshotFileOffset;
00452     /**
00453      * The last log index that 'snapshotFile' corresponds to. This is used to
00454      * set the follower's #nextIndex accordingly after we're done sending it
00455      * the snapshot.
00456      */
00457     uint64_t lastSnapshotIndex;
00458 
00459   private:
00460 
00461     /**
00462      * Caches the result of getSession().
00463      */
00464     std::shared_ptr<RPC::ClientSession> session;
00465 
00466     /**
00467      * callRPC() places its RPC here so that interrupt() may cancel it.
00468      * Setting this member and canceling the RPC must be done while holding the
00469      * Raft lock; waiting on the RPC is done without holding that lock.
00470      */
00471     RPC::ClientRPC rpc;
00472 
00473     // Peer is not copyable.
00474     Peer(const Peer&) = delete;
00475     Peer& operator=(const Peer&) = delete;
00476 };
00477 
00478 /**
00479  * A configuration defines the servers that are part of the cluster. This class
00480  * does not do any internal locking; it should be accessed only while holding
00481  * the RaftConsensus lock.
00482  */
00483 class Configuration {
00484   public:
00485     typedef std::shared_ptr<Server> ServerRef;
00486     typedef std::function<bool(Server&)> Predicate;
00487     typedef std::function<uint64_t(Server&)> GetValue;
00488     typedef std::function<void(Server&)> SideEffect;
00489 
00490   private:
00491     /**
00492      * A list of servers in which a simple majority constitutes a quorum.
00493      */
00494     struct SimpleConfiguration {
00495         SimpleConfiguration();
00496         ~SimpleConfiguration();
00497         bool all(const Predicate& predicate) const;
00498         bool contains(ServerRef server) const;
00499         void forEach(const SideEffect& sideEffect);
00500         uint64_t min(const GetValue& getValue) const;
00501         bool quorumAll(const Predicate& predicate) const;
00502         uint64_t quorumMin(const GetValue& getValue) const;
00503         std::vector<ServerRef> servers;
00504     };
00505 
00506   public:
00507     /**
00508      * See #state.
00509      */
00510     enum class State {
00511         /**
00512          * The configuration specifies no servers. Servers that are new to the
00513          * cluster and have empty logs start in this state.
00514          */
00515         BLANK,
00516         /**
00517          * The configuration specifies a single list of servers: a quorum
00518          * requires any majority of this list.
00519          */
00520         STABLE,
00521         /**
00522          * The configuration specifies two lists of servers: a quorum requires
00523          * any majority of the first list, but the servers in the second list
00524          * also receive log entries.
00525          */
00526         STAGING,
00527         /**
00528          * The configuration specifies two lists of servers: a quorum requires
00529          * any majority of the first list and any majority of the second.
00530          */
00531         TRANSITIONAL,
00532     };
00533 
00534     /**
00535      * Constructor.
00536      */
00537     Configuration(uint64_t serverId, RaftConsensus& consensus);
00538 
00539     /**
00540      * Destructor.
00541      */
00542     ~Configuration();
00543 
00544     /**
00545      * Apply a function to every known server, including the local, old, new,
00546      * and staging servers. The function will only be called once for each
00547      * server, even if a server exists in more than one of these categories.
00548      */
00549     void forEach(const SideEffect& sideEffect);
00550 
00551     /**
00552      * Return true if the given server may be part of a quorum, false
00553      * otherwise.
00554      */
00555     bool hasVote(ServerRef server) const;
00556 
00557     /**
00558      * Lookup the network addresses for a particular server
00559      * (comma-delimited).
00560      * Returns empty string if not found.
00561      */
00562     std::string lookupAddress(uint64_t serverId) const;
00563 
00564     /**
00565      * Return true if there exists a quorum for which every server satisfies
00566      * the predicate, false otherwise.
00567      */
00568     bool quorumAll(const Predicate& predicate) const;
00569 
00570     /**
00571      * Return the smallest value of any server in the quorum of servers that
00572      * have the largest values.
00573      * \return
00574      *      Largest value for which every server in a quorum has a value
00575      *      greater than or equal to this one. 0 if the configuration is BLANK.
00576      */
00577     uint64_t quorumMin(const GetValue& getValue) const;
00578 
00579     /**
00580      * Remove the staging servers, if any. Return to the configuration state
00581      * prior to a preceding call to setStagingServers.
00582      */
00583     void resetStagingServers();
00584 
00585     /**
00586      * Set the state of this object as if it had just been constructed.
00587      */
00588     void reset();
00589 
00590     /**
00591      * Set the configuration. Any existing staging servers are dropped.
00592      * \param newId
00593      *      The log entry ID of the configuration.
00594      * \param newDescription
00595      *      The IDs and addresses of the servers in the configuration. If any
00596      *      newServers are listed in the description, it is considered
00597      *      TRANSITIONAL; otherwise, it is STABLE.
00598      */
00599     void setConfiguration(
00600             uint64_t newId,
00601             const Protocol::Raft::Configuration& newDescription);
00602 
00603     /**
00604      * Add servers that are to mirror the log but that may not have a vote
00605      * (listeners). This can only be called on a STABLE configuration and makes
00606      * it STAGING.
00607      * TODO(ongaro): that might be a sign of a poor interface. descriptions
00608      * should probably have three sets, as john said.
00609      */
00610     void setStagingServers(
00611             const Protocol::Raft::SimpleConfiguration& stagingServers);
00612 
00613     /**
00614      * Return true if every server in the staging set satisfies the predicate,
00615      * false otherwise.
00616      */
00617     bool stagingAll(const Predicate& predicate) const;
00618 
00619     /**
00620      * Return the smallest value of any server in the staging set.
00621      * \return
00622      *      Minimum value on any server in the staging set, or 0 if the staging
00623      *      set is empty.
00624      */
00625     uint64_t stagingMin(const GetValue& getValue) const;
00626 
00627     /**
00628      * Write the configuration servers' state into the given structure. Used
00629      * for diagnostics.
00630      */
00631     void updateServerStats(Protocol::ServerStats& serverStats,
00632                            Core::Time::SteadyTimeConverter& time) const;
00633 
00634     /**
00635      * Print out a State for debugging purposes.
00636      */
00637     friend std::ostream& operator<<(std::ostream& os, State state);
00638 
00639     /**
00640      * Print out a Configuration for debugging purposes.
00641      */
00642     friend std::ostream& operator<<(std::ostream& os,
00643                                     const Configuration& configuration);
00644 
00645   private:
00646     /**
00647      * If no server by the given ID is known, construct a new one.
00648      * \return
00649      *      Return the existing or new server.
00650      * TODO(ongaro): this name and signature is misleading
00651      */
00652     ServerRef getServer(uint64_t newServerId);
00653 
00654     /**
00655      * Used for constructing Server instances.
00656      */
00657     RaftConsensus& consensus;
00658 
00659     /**
00660      * A map from server ID to Server of every server, including the local,
00661      * previous, new, and staging servers.
00662      */
00663     std::unordered_map<uint64_t, ServerRef> knownServers;
00664 
00665   public:
00666     /**
00667      * This server.
00668      */
00669     std::shared_ptr<LocalServer> localServer;
00670 
00671     /**
00672      * Specifies the meaning of #oldServers and #newServers.
00673      */
00674     State state;
00675 
00676     /**
00677      * The ID of the current configuration. This is the same as the entry ID in
00678      * which this configuration's description is written to the log.
00679      */
00680     uint64_t id;
00681 
00682     /**
00683      * A description of the current configuration.
00684      */
00685     Protocol::Raft::Configuration description;
00686 
00687   private:
00688     /**
00689      * A majority of these servers are necessary for a quorum under 
00690      * STABLE, STAGING, and TRANSITIONAL configurations. (Under TRANSITIONAL, a
00691      * majority of newServers is also needed.)
00692      */
00693     SimpleConfiguration oldServers;
00694 
00695     /**
00696      * A majority of these servers are necessary for a quorum under
00697      * TRANSITIONAL configurations. Under STAGING configurations, these servers
00698      * receive log entries but do not participate in elections.
00699      */
00700     SimpleConfiguration newServers;
00701 
00702     friend class Invariants;
00703 };
00704 
00705 /**
00706  * Ensures the current configuration reflects the latest state of the log and
00707  * snapshot.
00708  */
00709 class ConfigurationManager {
00710   public:
00711     /**
00712      * Constructor.
00713      * \param configuration
00714      *      The configuration that this object is in charge of setting.
00715      */
00716     explicit ConfigurationManager(Configuration& configuration);
00717 
00718     /**
00719      * Destructor.
00720      */
00721     ~ConfigurationManager();
00722 
00723     /**
00724      * Called when a new configuration is added to the log.
00725      * \param index
00726      *      The log index of this configuration (equivalently, its ID).
00727      * \param description
00728      *      The serializable representation of the configuration.
00729      */
00730     void add(uint64_t index,
00731              const Protocol::Raft::Configuration& description);
00732     /**
00733      * Called when a log prefix is truncated (after saving a snapshot that
00734      * covers this prefix).
00735      * \param firstIndexKept
00736      *      The log entries in range [1, firstIndexKept) are being discarded.
00737      */
00738     void truncatePrefix(uint64_t firstIndexKept);
00739     /**
00740      * Called when a log suffix is truncated (when the leader doesn't agree
00741      * with this server's log).
00742      * \param lastIndexKept
00743      *      The log entries in range (lastIndexKept, infinity) are being
00744      *      discarded.
00745      */
00746     void truncateSuffix(uint64_t lastIndexKept);
00747     /**
00748      * Called when a new snapshot is saved.
00749      * Only the latest such configuration is kept.
00750      * \param index
00751      *      The log index of this configuration (equivalently, its ID).
00752      * \param description
00753      *      The serializable representation of the configuration.
00754      */
00755     void setSnapshot(uint64_t index,
00756                      const Protocol::Raft::Configuration& description);
00757 
00758     /**
00759      * Return the configuration as of a particular log index.
00760      * This is useful when taking snapshots.
00761      * \param lastIncludedIndex
00762      *      Configurations greater than this index will be ignored.
00763      * \return
00764      *      The index and description of the configuration with the largest
00765      *      index in the range [1, lastIncludedIndex].
00766      */
00767     std::pair<uint64_t, Protocol::Raft::Configuration>
00768     getLatestConfigurationAsOf(uint64_t lastIncludedIndex) const;
00769 
00770   private:
00771 
00772     /**
00773      * Helper function called after changing this object's state.
00774      * - Make sure the snapshot configuration is in the descriptions map.
00775      * - Set configuration to the configuration with the largest index in the
00776      *   descriptions map, or reset it the map is empty.
00777      */
00778     void restoreInvariants();
00779 
00780     /**
00781      * Defines the servers that are part of the cluster. See Configuration.
00782      */
00783     Configuration& configuration;
00784 
00785     /**
00786      * This contains all the cluster configurations found in the log, plus one
00787      * additional configuration from the latest snapshot.
00788      *
00789      * It is used to find the right configuration when taking a snapshot and
00790      * truncating the end of the log. It must be kept consistent with the log
00791      * when it is loaded, as the log grows, as it gets truncated from the
00792      * beginning for snapshots, and as it gets truncated from the end upon
00793      * conflicts with the leader.
00794      *
00795      * The key is the entry ID where the configuration belongs in the log; the
00796      * value is the serializable form of the configuration.
00797      */
00798     std::map<uint64_t, Protocol::Raft::Configuration> descriptions;
00799 
00800     /**
00801      * This reflects the configuration found in this server's latest snapshot,
00802      * or {0, {}} if this server has no snapshot.
00803      */
00804     std::pair<uint64_t, Protocol::Raft::Configuration> snapshot;
00805 
00806     friend class Invariants;
00807 };
00808 
00809 /**
00810  * This is the rough equivalent of a SteadyClock that can be shared across the
00811  * network with other Raft servers. The cluster time approximately tracks how
00812  * long the cluster has been available with a working leader.
00813  *
00814  * Cluster time is measured in nanoseconds and progresses at about the same
00815  * rate as a normal clock when the cluster is operational. While there's a
00816  * stable leader, the nanoseconds increase according to that leader's
00817  * SteadyClock. When a new leader takes over, it starts ticking from cluster
00818  * time value it finds in its last entry/snapshot, so some cluster time may be
00819  * unaccounted for between the last leader replicating its final entry and then
00820  * losing leadership.
00821  *
00822  * The StateMachine uses cluster time to expire client sessions. Cluster times
00823  * in committed log entries monotonically increase, so the state machine will
00824  * see cluster times monotonically increase.
00825  *
00826  * Before cluster time, client sessions were expired based on the SystemClock.
00827  * That meant that if the SystemClock jumped forwards drastically, all clients
00828  * would expire. That's undesirable, so cluster time was introduced in #90
00829  * ("make client session timeout use monotonic clock") to address this.
00830  */
00831 class ClusterClock {
00832   public:
00833     /// Default constructor.
00834     ClusterClock();
00835 
00836     /**
00837      * Reset to the given cluster time, assuming it's the current time right
00838      * now. This is used, for example, when a follower gets a new log entry
00839      * (which includes a cluster time) from the leader.
00840      */
00841     void newEpoch(uint64_t clusterTime);
00842 
00843     /**
00844      * Called by leaders to generate a new cluster time for a new log entry.
00845      * This is equivalent to the following but slightly more efficient:
00846      *   uint64_t now = c.interpolate();
00847      *   c.newEpoch(now);
00848      *   return now;
00849      */
00850     uint64_t leaderStamp();
00851 
00852     /**
00853      * Return the best approximation of the current cluster time, assuming
00854      * there's been a leader all along.
00855      */
00856     uint64_t interpolate() const;
00857 
00858     /**
00859      * Invariant: this is equal to the cluster time in:
00860      * - the last log entry, if any, or
00861      * - the last snapshot, if any, or
00862      * - 0.
00863      */
00864     uint64_t clusterTimeAtEpoch;
00865 
00866     /**
00867      * The local SteadyClock time when clusterTimeAtEpoch was set.
00868      */
00869     Core::Time::SteadyClock::time_point localTimeAtEpoch;
00870 };
00871 
00872 } // namespace RaftConsensusInternal
00873 
00874 /**
00875  * An implementation of the Raft consensus algorithm. The algorithm is
00876  * described at https://raftconsensus.github.io
00877  * . In brief, Raft divides time into terms and elects a leader at the
00878  * beginning of each term. This election mechanism guarantees that the emerging
00879  * leader has at least all committed log entries. Once a candidate has received
00880  * votes from a quorum, it replicates its own log entries in order to the
00881  * followers. The leader is the only machine that serves client requests.
00882  */
00883 class RaftConsensus {
00884   public:
00885     typedef RaftConsensusInternal::Invariants Invariants;
00886     typedef RaftConsensusInternal::Server Server;
00887     typedef RaftConsensusInternal::LocalServer LocalServer;
00888     typedef RaftConsensusInternal::Peer Peer;
00889     typedef RaftConsensusInternal::Configuration Configuration;
00890     typedef RaftConsensusInternal::ConfigurationManager ConfigurationManager;
00891     typedef RaftConsensusInternal::ClusterClock ClusterClock;
00892     typedef RaftConsensusInternal::Mutex Mutex;
00893     typedef RaftConsensusInternal::Clock Clock;
00894     typedef RaftConsensusInternal::TimePoint TimePoint;
00895 
00896     /**
00897      * This is returned by getNextEntry().
00898      */
00899     struct Entry {
00900         /// Default constructor.
00901         Entry();
00902         /// Move constructor.
00903         Entry(Entry&& other);
00904         /// Destructor.
00905         ~Entry();
00906 
00907         /**
00908          * The Raft log index for this entry (or the last one a snapshot
00909          * covers). Pass this as the lastIndex argument to the next call to
00910          * getNextEntry().
00911          */
00912         uint64_t index;
00913 
00914         /**
00915          * The type of the entry.
00916          */
00917         enum {
00918             /**
00919              * This is a normal entry containing a client request for the state
00920              * machine. The 'data' field contains that request, and
00921              * 'snapshotReader' is not set.
00922              */
00923             DATA,
00924             /**
00925              * This is a snapshot: the state machine should clear its state and
00926              * load in the snapshot. The 'data' field is not set, and the
00927              * 'snapshotReader' should be used to read the snapshot contents
00928              * from.
00929              */
00930             SNAPSHOT,
00931             /**
00932              * Some entries should be ignored by the state machine (they are
00933              * consumed internally by the consensus module). For client service
00934              * threads to know when a state machine is up-to-date, it's easiest
00935              * for the state machine to get empty entries back for these, and
00936              * simply call back into getNextEntry() again with the next ID,
00937              * Entries of type 'SKIP' will have neither 'data' nor
00938              * 'snapshotReader' set.
00939              */
00940             SKIP,
00941         } type;
00942 
00943         /**
00944          * The client request for entries of type 'DATA'.
00945          */
00946         Core::Buffer command;
00947 
00948         /**
00949          * A handle to the snapshot file for entries of type 'SNAPSHOT'.
00950          */
00951         std::unique_ptr<Storage::SnapshotFile::Reader> snapshotReader;
00952 
00953         /**
00954          * Cluster time when leader created entry/snapshot. This is valid for
00955          * entries of all types.
00956          */
00957         uint64_t clusterTime;
00958 
00959         // copy and assign not allowed
00960         Entry(const Entry&) = delete;
00961         Entry& operator=(const Entry&) = delete;
00962     };
00963 
00964     enum class ClientResult {
00965         /**
00966          * Request completed successfully.
00967          */
00968         SUCCESS,
00969         /**
00970          * Returned by setConfiguration() if the configuration could not be
00971          * set because the previous configuration was unsuitable or because the
00972          * new servers could not be caught up.
00973          */
00974         FAIL,
00975         /**
00976          * Returned by getConfiguration() if the configuration is not stable or
00977          * is not committed. The client should wait and retry later.
00978          */
00979         RETRY,
00980         /**
00981          * Cannot process the request because this server is not leader or
00982          * temporarily lost its leadership.
00983          */
00984         NOT_LEADER,
00985     };
00986 
00987     /**
00988      * Constructor.
00989      * \param globals
00990      *      Handle to LogCabin's top-level objects.
00991      */
00992     explicit RaftConsensus(Globals& globals);
00993 
00994     /**
00995      * Destructor.
00996      */
00997     ~RaftConsensus();
00998 
00999     /// Initialize. Must be called before any other method.
01000     void init();
01001 
01002     /// Signal the consensus module to exit (shut down threads, etc).
01003     void exit();
01004 
01005     /**
01006      * Initialize the log with a configuration consisting of just this server.
01007      * This should be called just once the very first time the very first
01008      * server in your cluster is started.
01009      * PANICs if any log entries or snapshots already exist.
01010      */
01011     void bootstrapConfiguration();
01012 
01013     /**
01014      * Get the current leader's active, committed, simple cluster
01015      * configuration.
01016      */
01017     ClientResult getConfiguration(
01018             Protocol::Raft::SimpleConfiguration& configuration,
01019             uint64_t& id) const;
01020 
01021     /**
01022      * Return the most recent entry ID that has been externalized by the
01023      * replicated log. This is used to provide non-stale reads to the state
01024      * machine.
01025      */
01026     std::pair<ClientResult, uint64_t> getLastCommitIndex() const;
01027 
01028     /**
01029      * Return the network address for a recent leader, if known,
01030      * or empty string otherwise.
01031      */
01032     std::string getLeaderHint() const;
01033 
01034     /**
01035      * This returns the entry following lastIndex in the replicated log. Some
01036      * entries may be used internally by the consensus module. These will have
01037      * Entry.hasData set to false. The reason these are exposed to the state
01038      * machine is that the state machine waits to be caught up to the latest
01039      * committed entry in the replicated log sometimes, but if that entry
01040      * was for internal use, it would would otherwise never reach the state
01041      * machine.
01042      * \throw Core::Util::ThreadInterruptedException
01043      *      Thread should exit.
01044      */
01045     Entry getNextEntry(uint64_t lastIndex) const;
01046 
01047     /**
01048      * Return statistics that may be useful in deciding when to snapshot.
01049      */
01050     SnapshotStats::SnapshotStats getSnapshotStats() const;
01051 
01052     /**
01053      * Process an AppendEntries RPC from another server. Called by RaftService.
01054      * \param[in] request
01055      *      The request that was received from the other server.
01056      * \param[out] response
01057      *      Where the reply should be placed.
01058      */
01059     void handleAppendEntries(
01060                 const Protocol::Raft::AppendEntries::Request& request,
01061                 Protocol::Raft::AppendEntries::Response& response);
01062 
01063     /**
01064      * Process an InstallSnapshot RPC from another server. Called by
01065      * RaftService.
01066      * \param[in] request
01067      *      The request that was received from the other server.
01068      * \param[out] response
01069      *      Where the reply should be placed.
01070      */
01071     void handleInstallSnapshot(
01072                 const Protocol::Raft::InstallSnapshot::Request& request,
01073                 Protocol::Raft::InstallSnapshot::Response& response);
01074 
01075     /**
01076      * Process a RequestVote RPC from another server. Called by RaftService.
01077      * \param[in] request
01078      *      The request that was received from the other server.
01079      * \param[out] response
01080      *      Where the reply should be placed.
01081      */
01082     void handleRequestVote(const Protocol::Raft::RequestVote::Request& request,
01083                            Protocol::Raft::RequestVote::Response& response);
01084 
01085     /**
01086      * Submit an operation to the replicated log.
01087      * \param operation
01088      *      If the cluster accepts this operation, then it will be added to the
01089      *      log and the state machine will eventually apply it.
01090      * \return
01091      *      First component is status code. If SUCCESS, second component is the
01092      *      log index at which the entry has been committed to the replicated
01093      *      log.
01094      */
01095     std::pair<ClientResult, uint64_t> replicate(const Core::Buffer& operation);
01096 
01097     /**
01098      * Change the cluster's configuration.
01099      * Returns successfully once operation completed and old servers are no
01100      * longer needed.
01101      * \return
01102      *      NOT_LEADER, or other code with response filled in.
01103      */
01104     ClientResult
01105     setConfiguration(
01106             const Protocol::Client::SetConfiguration::Request& request,
01107             Protocol::Client::SetConfiguration::Response& response);
01108 
01109     /**
01110      * Register which versions of client commands/behavior the local state
01111      * machine supports. Invoked just once on boot (though calling this
01112      * multiple times is safe). This information is used to support upgrades to
01113      * the running replicated state machine version, and it is transmitted to
01114      * other servers as needed. See #stateMachineUpdaterThreadMain.
01115      * \param minSupported
01116      *      The smallest version the local state machine can support.
01117      * \param maxSupported
01118      *      The largest version the local state machine can support.
01119      */
01120     void
01121     setSupportedStateMachineVersions(uint16_t minSupported,
01122                                      uint16_t maxSupported);
01123 
01124     /**
01125      * Start taking a snapshot. Called by the state machine when it wants to
01126      * take a snapshot.
01127      * \param lastIncludedIndex
01128      *      The snapshot will cover log entries in the range
01129      *      [1, lastIncludedIndex].
01130      *      lastIncludedIndex must be committed (must have been previously
01131      *      returned by #getNextEntry()).
01132      * \return
01133      *      A file the state machine can dump its snapshot into.
01134      */
01135     std::unique_ptr<Storage::SnapshotFile::Writer>
01136     beginSnapshot(uint64_t lastIncludedIndex);
01137 
01138     /**
01139      * Complete taking a snapshot for the log entries in range [1,
01140      * lastIncludedIndex]. Called by the state machine when it is done taking a
01141      * snapshot.
01142      * \param lastIncludedIndex
01143      *      The snapshot will cover log entries in the range
01144      *      [1, lastIncludedIndex].
01145      * \param writer
01146      *      A writer that has not yet been saved: the consensus module may
01147      *      have to discard the snapshot in case it's gotten a better snapshot
01148      *      from another server. If this snapshot is to be saved (normal case),
01149      *      the consensus module will call save() on it.
01150      */
01151     void snapshotDone(uint64_t lastIncludedIndex,
01152                       std::unique_ptr<Storage::SnapshotFile::Writer> writer);
01153 
01154     /**
01155      * Add information about the consensus state to the given structure.
01156      */
01157     void updateServerStats(Protocol::ServerStats& serverStats) const;
01158 
01159     /**
01160      * Print out the contents of this class for debugging purposes.
01161      */
01162     friend std::ostream& operator<<(std::ostream& os,
01163                                     const RaftConsensus& raft);
01164 
01165   private:
01166     /**
01167      * See #state.
01168      */
01169     enum class State {
01170         /**
01171          * A follower does not initiate RPCs. It becomes a candidate with
01172          * startNewElection() when a timeout elapses without hearing from a
01173          * candidate/leader. This is the initial state for servers when they
01174          * start up.
01175          */
01176         FOLLOWER,
01177 
01178         /**
01179          * A candidate sends RequestVote RPCs in an attempt to become a leader.
01180          * It steps down to be a follower if it discovers a current leader, and
01181          * it becomes leader if it collects votes from a quorum.
01182          */
01183         CANDIDATE,
01184 
01185         /**
01186          * A leader sends AppendEntries RPCs to replicate its log onto followers.
01187          * It also sends heartbeats periodically during periods of inactivity
01188          * to delay its followers from becoming candidates. It steps down to be
01189          * a follower if it discovers a server with a higher term, if it can't
01190          * communicate with a quorum, or if it is not part of the latest
01191          * committed configuration.
01192          */
01193         LEADER,
01194     };
01195 
01196 
01197     //// The following private methods MUST acquire the lock.
01198 
01199     /**
01200      * Flush log entries to stable storage in the background on leaders.
01201      * Once they're flushed, it tries to advance the #commitIndex.
01202      * This is the method that #leaderDiskThread executes.
01203      */
01204     void leaderDiskThreadMain();
01205 
01206     /**
01207      * Start new elections when it's time to do so. This is the method that
01208      * #timerThread executes.
01209      */
01210     void timerThreadMain();
01211 
01212     /**
01213      * Initiate RPCs to a specific server as necessary.
01214      * One thread for each remote server calls this method (see Peer::thread).
01215      */
01216     void peerThreadMain(std::shared_ptr<Peer> peer);
01217 
01218     /**
01219      * Append advance state machine version entries to the log as leader once
01220      * all servers can support a new state machine version.
01221      */
01222     void stateMachineUpdaterThreadMain();
01223 
01224     /**
01225      * Return to follower state when, as leader, this server is not able to
01226      * communicate with a quorum. This helps two things in cases where a quorum
01227      * is not available to this leader but clients can still communicate with
01228      * the leader. First, it returns to clients in a timely manner so that they
01229      * can try to find another current leader, if one exists. Second, it frees
01230      * up the resources associated with those client's RPCs on the server.
01231      * This is the method that #stepDownThread executes.
01232      */
01233     void stepDownThreadMain();
01234 
01235 
01236     //// The following private methods MUST NOT acquire the lock.
01237 
01238 
01239     /**
01240      * Move forward #commitIndex if possible. Called only on leaders after
01241      * receiving RPC responses and flushing entries to disk. If commitIndex
01242      * changes, this will notify #stateChanged. It will also change the
01243      * configuration or step down due to a configuration change when
01244      * appropriate.
01245      *
01246      * #commitIndex can jump by more than 1 on new leaders, since their
01247      * #commitIndex may be well out of date until they figure out which log
01248      * entries their followers have.
01249      *
01250      * \pre
01251      *      state is LEADER.
01252      */
01253     void advanceCommitIndex();
01254 
01255     /**
01256      * Append entries to the log, set the configuration if this contains a
01257      * configuration entry, and notify #stateChanged.
01258      */
01259     void append(const std::vector<const Storage::Log::Entry*>& entries);
01260 
01261     /**
01262      * Send an AppendEntries RPC to the server (either a heartbeat or containing
01263      * an entry to replicate).
01264      * \param lockGuard
01265      *      Used to temporarily release the lock while invoking the RPC, so as
01266      *      to allow for some concurrency.
01267      * \param peer
01268      *      State used in communicating with the follower, building the RPC
01269      *      request, and processing its result.
01270      */
01271     void appendEntries(std::unique_lock<Mutex>& lockGuard, Peer& peer);
01272 
01273     /**
01274      * Send an InstallSnapshot RPC to the server (containing part of a
01275      * snapshot file to replicate).
01276      * \param lockGuard
01277      *      Used to temporarily release the lock while invoking the RPC, so as
01278      *      to allow for some concurrency.
01279      * \param peer
01280      *      State used in communicating with the follower, building the RPC
01281      *      request, and processing its result.
01282      */
01283     void installSnapshot(std::unique_lock<Mutex>& lockGuard, Peer& peer);
01284 
01285     /**
01286      * Transition to being a leader. This is called when a candidate has
01287      * received votes from a quorum.
01288      */
01289     void becomeLeader();
01290 
01291     /**
01292      * Remove the prefix of the log that is redundant with this server's
01293      * snapshot.
01294      */
01295     void discardUnneededEntries();
01296 
01297     /**
01298      * Return the term corresponding to log->getLastLogIndex(). This may come
01299      * from the log, from the snapshot, or it may be 0.
01300      */
01301     uint64_t getLastLogTerm() const;
01302 
01303     /**
01304      * Notify the #stateChanged condition variable and cancel all current RPCs.
01305      * This should be called when stepping down, starting a new election,
01306      * becoming leader, or exiting.
01307      */
01308     void interruptAll();
01309 
01310     /**
01311      * Helper for #appendEntries() to put the right number of entries into the
01312      * request.
01313      * \param nextIndex
01314      *      First entry to send to the follower.
01315      * \param request
01316      *      AppendEntries request ProtoBuf in which to pack the entries.
01317      * \return
01318      *      Number of entries in the request.
01319      */
01320     uint64_t
01321     packEntries(uint64_t nextIndex,
01322                 Protocol::Raft::AppendEntries::Request& request) const;
01323 
01324     /**
01325      * Try to read the latest good snapshot from disk. Loads the header of the
01326      * snapshot file, which is used internally by the consensus module. The
01327      * rest of the file reader is kept in #snapshotReader for the state machine
01328      * to process upon a future getNextEntry().
01329      *
01330      * If the snapshot file on disk is no good, #snapshotReader will remain
01331      * NULL.
01332      */
01333     void readSnapshot();
01334 
01335     /**
01336      * Append an entry to the log and wait for it to be committed.
01337      */
01338     std::pair<ClientResult, uint64_t>
01339     replicateEntry(Storage::Log::Entry& entry,
01340                    std::unique_lock<Mutex>& lockGuard);
01341 
01342     /**
01343      * Send a RequestVote RPC to the server. This is used by candidates to
01344      * request a server's vote and by new leaders to retrieve information about
01345      * the server's log.
01346      * \param lockGuard
01347      *      Used to temporarily release the lock while invoking the RPC, so as
01348      *      to allow for some concurrency.
01349      * \param peer
01350      *      State used in communicating with the follower, building the RPC
01351      *      request, and processing its result.
01352      */
01353     void requestVote(std::unique_lock<Mutex>& lockGuard, Peer& peer);
01354 
01355     /**
01356      * Dumps serverId, currentTerm, state, leaderId, and votedFor to the debug
01357      * log. This is intended to be easy to grep and parse.
01358      */
01359     void printElectionState() const;
01360 
01361     /**
01362      * Set the timer to start a new election and notify #stateChanged.
01363      * The timer is set for ELECTION_TIMEOUT plus some random jitter from
01364      * now.
01365      */
01366     void setElectionTimer();
01367 
01368     /**
01369      * Transitions to being a candidate from being a follower or candidate.
01370      * This is called when a timeout elapses. If the configuration is blank, it
01371      * does nothing. Moreover, if this server forms a quorum (it is the only
01372      * server in the configuration), this will immediately transition to
01373      * leader.
01374      */
01375     void startNewElection();
01376 
01377     /**
01378      * Transition to being a follower. This is called when we
01379      * receive an RPC request with newer term, receive an RPC response
01380      * indicating our term is stale, or discover a current leader while a
01381      * candidate. In this last case, newTerm will be the same as currentTerm.
01382      * This will call setElectionTimer for you if no election timer is
01383      * currently set.
01384      */
01385     void stepDown(uint64_t newTerm);
01386 
01387     /**
01388      * Persist critical state, such as the term and the vote, to stable
01389      * storage.
01390      */
01391     void updateLogMetadata();
01392 
01393     /**
01394      * Return true if every entry that might have already been marked committed
01395      * on any leader is marked committed on this leader by the time this call
01396      * returns.
01397      * This is used to provide non-stale read operations to
01398      * clients. It gives up after ELECTION_TIMEOUT, since stepDownThread
01399      * will return to the follower state after that time.
01400      */
01401     bool upToDateLeader(std::unique_lock<Mutex>& lockGuard) const;
01402 
01403     /**
01404      * Print out a ClientResult for debugging purposes.
01405      */
01406     friend std::ostream& operator<<(std::ostream& os,
01407                                     ClientResult clientResult);
01408 
01409     /**
01410      * Print out a State for debugging purposes.
01411      */
01412     friend std::ostream& operator<<(std::ostream& os, State state);
01413 
01414     /**
01415      * A follower waits for about this much inactivity before becoming a
01416      * candidate and starting a new election.
01417      */
01418     const std::chrono::nanoseconds ELECTION_TIMEOUT;
01419 
01420     /**
01421      * A leader sends RPCs at least this often, even if there is no data to
01422      * send.
01423      */
01424     const std::chrono::nanoseconds HEARTBEAT_PERIOD;
01425 
01426     /**
01427      * A leader will pack at most this many entries into an AppendEntries
01428      * request message. This helps bound processing time when entries are very
01429      * small in size.
01430      * Const except for unit tests.
01431      */
01432     uint64_t MAX_LOG_ENTRIES_PER_REQUEST;
01433 
01434     /**
01435      * A candidate or leader waits this long after an RPC fails before sending
01436      * another one, so as to not overwhelm the network with retries.
01437      */
01438     const std::chrono::nanoseconds RPC_FAILURE_BACKOFF;
01439 
01440     /**
01441      * How long the state machine updater thread should sleep if:
01442      * - The servers do not currently support a common version, or
01443      * - This server has not yet received version information from all other
01444      *   servers, or
01445      * - An advance state machine entry failed to commit (probably due to lost
01446      *   leadership).
01447      */
01448     const std::chrono::nanoseconds STATE_MACHINE_UPDATER_BACKOFF;
01449 
01450     /**
01451      * Prefer to keep RPC requests under this size.
01452      * Const except for unit tests.
01453      */
01454     uint64_t SOFT_RPC_SIZE_LIMIT;
01455 
01456   public:
01457     /**
01458      * This server's unique ID. Not available until init() is called.
01459      */
01460     uint64_t serverId;
01461 
01462     /**
01463      * The addresses that this server is listening on. Not available until
01464      * init() is called.
01465      */
01466     std::string serverAddresses;
01467 
01468   private:
01469 
01470     /**
01471      * The LogCabin daemon's top-level objects.
01472      */
01473     Globals& globals;
01474 
01475     /**
01476      * Where the files for the log and snapshots are stored.
01477      */
01478     Storage::Layout storageLayout;
01479 
01480     /**
01481      * Used to create new sessions.
01482      */
01483     Client::SessionManager sessionManager;
01484 
01485     /**
01486      * This class behaves mostly like a monitor. This protects all the state in
01487      * this class and almost all of the Peer class (with some
01488      * documented exceptions).
01489      */
01490     mutable Mutex mutex;
01491 
01492     /**
01493      * Notified when basically anything changes. Specifically, this is notified
01494      * when any of the following events occur:
01495      *  - term changes.
01496      *  - state changes.
01497      *  - log changes.
01498      *  - commitIndex changes.
01499      *  - exiting is set.
01500      *  - numPeerThreads is decremented.
01501      *  - configuration changes.
01502      *  - startElectionAt changes (see note under startElectionAt).
01503      *  - an acknowledgement from a peer is received.
01504      *  - a server goes from not caught up to caught up.
01505      *  - a heartbeat is scheduled.
01506      * TODO(ongaro): Should there be multiple condition variables? This one is
01507      * used by a lot of threads for a lot of different conditions.
01508      */
01509     mutable Core::ConditionVariable stateChanged;
01510 
01511     /**
01512      * Set to true when this class is about to be destroyed. When this is true,
01513      * threads must exit right away and no more RPCs should be sent or
01514      * processed.
01515      */
01516     bool exiting;
01517 
01518     /**
01519      * The number of Peer::thread threads that are still using this
01520      * RaftConsensus object. When they exit, they decrement this and notify
01521      * #stateChanged.
01522      */
01523     uint32_t numPeerThreads;
01524 
01525     /**
01526      * Provides all storage for this server. Keeps track of all log entries and
01527      * some additional metadata.
01528      *
01529      * If you modify this, be sure to keep #configurationManager consistent.
01530      */
01531     std::unique_ptr<Storage::Log> log;
01532 
01533     /**
01534      * Flag to indicate that #leaderDiskThreadMain should flush recent log
01535      * writes to stable storage. This is always false for followers and
01536      * candidates and is only used for leaders.
01537      *
01538      * When a server steps down, it waits for all syncs to complete, that way
01539      * followers can assume that all of their log entries are durable when
01540      * replying to leaders.
01541      */
01542     bool logSyncQueued;
01543 
01544     /**
01545      * Used for stepDown() to wait on #leaderDiskThread without releasing
01546      * #mutex. This is true while #leaderDiskThread is writing to disk. It's
01547      * set to true while holding #mutex; set to false without #mutex.
01548      */
01549     std::atomic<bool> leaderDiskThreadWorking;
01550 
01551     /**
01552      * Defines the servers that are part of the cluster. See Configuration.
01553      */
01554     std::unique_ptr<Configuration> configuration;
01555 
01556     /**
01557      * Ensures that 'configuration' reflects the latest state of the log and
01558      * snapshot.
01559      */
01560     std::unique_ptr<ConfigurationManager> configurationManager;
01561 
01562     /**
01563      * The latest term this server has seen. This value monotonically increases
01564      * over time. It gets updated in stepDown(), startNewElection(), and when a
01565      * candidate receives a vote response with a newer term.
01566      * \warning
01567      *      After setting this value, you must call updateLogMetadata() to
01568      *      persist it.
01569      */
01570     uint64_t currentTerm;
01571 
01572     /**
01573      * The server's current role in the cluster (follower, candidate, or
01574      * leader). See #State.
01575      */
01576     State state;
01577 
01578     /**
01579      * The latest good snapshot covers entries 1 through 'lastSnapshotIndex'
01580      * (inclusive). It is known that these are committed. They are safe to
01581      * remove from the log, but it may be advantageous to keep them around for
01582      * a little while (to avoid shipping snapshots to straggling followers).
01583      * Thus, the log may or may not have some of the entries in this range.
01584      */
01585     uint64_t lastSnapshotIndex;
01586 
01587     /**
01588      * The term of the last entry covered by the latest good snapshot, or 0 if
01589      * we have no snapshot.
01590      */
01591     uint64_t lastSnapshotTerm;
01592 
01593     /**
01594      * The cluster time of the last entry covered by the latest good snapshot,
01595      * or 0 if we have no snapshot.
01596      */
01597     uint64_t lastSnapshotClusterTime;
01598 
01599     /**
01600      * The size of the latest good snapshot in bytes, or 0 if we have no
01601      * snapshot.
01602      */
01603     uint64_t lastSnapshotBytes;
01604 
01605     /**
01606      * If not NULL, this is a Storage::SnapshotFile::Reader that covers up through
01607      * lastSnapshotIndex. This is ready for the state machine to process and is
01608      * returned to the state machine in getNextEntry(). It's just a cache which
01609      * can be repopulated with readSnapshot().
01610      */
01611     mutable std::unique_ptr<Storage::SnapshotFile::Reader> snapshotReader;
01612 
01613     /**
01614      * This is used in handleInstallSnapshot when receiving a snapshot from
01615      * the current leader. The leader is assumed to send at most one snapshot
01616      * at a time, and any partial snapshots here are discarded when the term
01617      * changes.
01618      */
01619     std::unique_ptr<Storage::SnapshotFile::Writer> snapshotWriter;
01620 
01621     /**
01622      * The largest entry ID for which a quorum is known to have stored the same
01623      * entry as this server has. Entries 1 through commitIndex as stored in
01624      * this server's log are guaranteed to never change. This value will
01625      * monotonically increase over time.
01626      */
01627     uint64_t commitIndex;
01628 
01629     /**
01630      * The server ID of the leader for this term. This is used to help point
01631      * clients to the right server. The special value 0 means either there is
01632      * no leader for this term yet or this server does not know who it is yet.
01633      */
01634     uint64_t leaderId;
01635 
01636     /**
01637      * The server ID that this server voted for during this term's election, if
01638      * any. The special value 0 means no vote has been given out during this
01639      * term.
01640      * \warning
01641      *      After setting this value, you must call updateLogMetadata() to
01642      *      persist it.
01643      */
01644     uint64_t votedFor;
01645 
01646     /**
01647      * A logical clock used to confirm leadership and connectivity.
01648      */
01649     // TODO(ongaro): rename, explain more
01650     mutable uint64_t currentEpoch;
01651 
01652     /**
01653      * Tracks the passage of "cluster time". See ClusterClock.
01654      */
01655     ClusterClock clusterClock;
01656 
01657     /**
01658      * The earliest time at which #timerThread should begin a new election
01659      * with startNewElection().
01660      *
01661      * It is safe for increases to startElectionAt to not notify the condition
01662      * variable. Decreases to this value, however, must notify the condition
01663      * variable to make sure the timerThread gets woken in a timely manner.
01664      * Unfortunately, startElectionAt does not monotonically increase because
01665      * of the random jitter that is applied to the follower timeout, and it
01666      * would reduce the jitter's effectiveness for the thread to wait as long
01667      * as the largest startElectionAt value.
01668      */
01669     TimePoint startElectionAt;
01670 
01671     /**
01672      * The earliest time at which RequestVote messages should be processed.
01673      * Until this time, they are rejected, as processing them risks
01674      * causing the cluster leader to needlessly step down.
01675      * For more motivation, see the "disruptive servers" issue in membership
01676      * changes described in the Raft paper/thesis.
01677      *
01678      * This is set to the current time + an election timeout when a heartbeat
01679      * is received, and it's set to infinity for leaders (who begin processing
01680      * RequestVote messages again immediately when they step down).
01681      */
01682     TimePoint withholdVotesUntil;
01683 
01684     /**
01685      * The total number of entries ever truncated from the end of the log.
01686      * This happens only when a new leader tells this server to remove
01687      * extraneous uncommitted entries from its log.
01688      */
01689     uint64_t numEntriesTruncated;
01690 
01691     /**
01692      * The thread that executes leaderDiskThreadMain() to flush log entries to
01693      * stable storage in the background on leaders.
01694      */
01695     std::thread leaderDiskThread;
01696 
01697     /**
01698      * The thread that executes timerThreadMain() to begin new elections
01699      * after periods of inactivity.
01700      */
01701     std::thread timerThread;
01702 
01703     /**
01704      * The thread that executes stateMachineUpdaterThreadMain() to append
01705      * advance state machine version entries to the log on leaders.
01706      */
01707     std::thread stateMachineUpdaterThread;
01708 
01709     /**
01710      * The thread that executes stepDownThreadMain() to return to the follower
01711      * state if the leader becomes disconnected from a quorum of servers.
01712      */
01713     std::thread stepDownThread;
01714 
01715     Invariants invariants;
01716 
01717     friend class RaftConsensusInternal::LocalServer;
01718     friend class RaftConsensusInternal::Peer;
01719     friend class RaftConsensusInternal::Invariants;
01720 };
01721 
01722 } // namespace LogCabin::Server
01723 } // namespace LogCabin
01724 
01725 #endif /* LOGCABIN_SERVER_RAFTCONSENSUS_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines