LogCabin
Server/StateMachine.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <memory>
00018 #include <mutex>
00019 #include <thread>
00020 #include <unordered_map>
00021 
00022 #include "build/Protocol/Client.pb.h"
00023 #include "build/Server/SnapshotStateMachine.pb.h"
00024 #include "Core/ConditionVariable.h"
00025 #include "Core/Config.h"
00026 #include "Core/Mutex.h"
00027 #include "Core/Time.h"
00028 #include "Tree/Tree.h"
00029 
00030 #ifndef LOGCABIN_SERVER_STATEMACHINE_H
00031 #define LOGCABIN_SERVER_STATEMACHINE_H
00032 
00033 namespace LogCabin {
00034 namespace Server {
00035 
00036 // forward declaration
00037 class Globals;
00038 class RaftConsensus;
00039 
00040 /**
00041  * Interprets and executes operations that have been committed into the Raft
00042  * log.
00043  *
00044  * Version history:
00045  * - Version 1 of the State Machine shipped with LogCabin v1.0.0.
00046  * - Version 2 added the CloseSession command, which clients can use when they
00047  *   gracefully shut down.
00048  */
00049 class StateMachine {
00050   public:
00051     typedef Protocol::Client::StateMachineCommand Command;
00052     typedef Protocol::Client::StateMachineQuery Query;
00053 
00054     enum {
00055         /**
00056          * This state machine code can behave like all versions between
00057          * MIN_SUPPORTED_VERSION and MAX_SUPPORTED_VERSION, inclusive.
00058          */
00059         MIN_SUPPORTED_VERSION = 1,
00060         /**
00061          * This state machine code can behave like all versions between
00062          * MIN_SUPPORTED_VERSION and MAX_SUPPORTED_VERSION, inclusive.
00063          */
00064         MAX_SUPPORTED_VERSION = 2,
00065     };
00066 
00067 
00068     StateMachine(std::shared_ptr<RaftConsensus> consensus,
00069                  Core::Config& config,
00070                  Globals& globals);
00071     ~StateMachine();
00072 
00073     /**
00074      * Called by ClientService to execute read-only queries on the state
00075      * machine.
00076      * \warning
00077      *      Be sure to wait() first!
00078      */
00079     bool query(const Query::Request& request,
00080                Query::Response& response) const;
00081 
00082     /**
00083      * Add information about the state machine state to the given structure.
00084      */
00085     void updateServerStats(Protocol::ServerStats& serverStats) const;
00086 
00087     /**
00088      * Return once the state machine has applied at least the given entry.
00089      */
00090     void wait(uint64_t index) const;
00091 
00092     /**
00093      * Called by ClientService to get a response for a read-write command on
00094      * the state machine.
00095      * \param logIndex
00096      *      The index in the log where the command was committed.
00097      * \param command
00098      *      The request.
00099      * \param[out] response
00100      *      If the return value is true, the response will be filled in here.
00101      *      Otherwise, this will be unmodified.
00102      */
00103     bool waitForResponse(uint64_t logIndex,
00104                          const Command::Request& command,
00105                          Command::Response& response) const;
00106 
00107     /**
00108      * Return true if the server is currently taking a snapshot and false
00109      * otherwise.
00110      */
00111     bool isTakingSnapshot() const;
00112 
00113     /**
00114      * If the state machine is not taking a snapshot, this starts one. This
00115      * method will return after the snapshot has been started (it may have
00116      * already completed).
00117      */
00118     void startTakingSnapshot();
00119 
00120     /**
00121      * If the server is currently taking a snapshot, abort it. This method will
00122      * return after the existing snapshot has been stopped (it's possible that
00123      * another snapshot will have already started).
00124      */
00125     void stopTakingSnapshot();
00126 
00127     /**
00128      * Return the time for which the state machine will not take any automated
00129      * snapshots.
00130      * \return
00131      *      Zero or positive duration.
00132      */
00133     std::chrono::nanoseconds getInhibit() const;
00134 
00135     /**
00136      * Disable automated snapshots for the given duration.
00137      * \param duration
00138      *      If zero or negative, immediately enables automated snapshotting.
00139      *      If positive, disables automated snapshotting for the given duration
00140      *      (or until a later call to #setInhibit()).
00141      * Note that this will not stop the current snapshot; the caller should
00142      * invoke #stopTakingSnapshot() separately if desired.
00143      */
00144     void setInhibit(std::chrono::nanoseconds duration);
00145 
00146 
00147   private:
00148     // forward declaration
00149     struct Session;
00150 
00151     /// Clock used by watchdog timer thread.
00152     typedef Core::Time::SteadyClock Clock;
00153     /// Point in time of Clock.
00154     typedef Clock::time_point TimePoint;
00155 
00156     /**
00157      * Invoked once per committed entry from the Raft log.
00158      */
00159     void apply(const RaftConsensus::Entry& entry);
00160 
00161     /**
00162      * Main function for thread that waits for new commands from Raft.
00163      */
00164     void applyThreadMain();
00165 
00166     /**
00167      * Return the #sessions table as a protobuf message for writing into a
00168      * snapshot.
00169      */
00170     void serializeSessions(SnapshotStateMachine::Header& header) const;
00171 
00172     /**
00173      * Update the session and clean up unnecessary responses.
00174      * \param session
00175      *      Affected session.
00176      * \param firstOutstandingRPC
00177      *      New value for the first outstanding RPC for a session.
00178      */
00179     void expireResponses(Session& session, uint64_t firstOutstandingRPC);
00180 
00181     /**
00182      * Remove old sessions.
00183      * \param clusterTime
00184      *      Sessions are kept if they have been modified during the last
00185      *      timeout period going backwards from the given time.
00186      */
00187     void expireSessions(uint64_t clusterTime);
00188 
00189     /**
00190      * Return the version of the state machine behavior as of the given log
00191      * index. Note that this is based on versionHistory internally, so if
00192      * you're changing that variable at the given index, update it first.
00193      */
00194     uint16_t getVersion(uint64_t logIndex) const;
00195 
00196     /**
00197      * If there is a current snapshot process, send it a signal and return
00198      * immediately.
00199      */
00200     void killSnapshotProcess(Core::HoldingMutex holdingMutex, int signum);
00201 
00202     /**
00203      * Restore the #sessions table from a snapshot.
00204      */
00205     void loadSessions(const SnapshotStateMachine::Header& header);
00206 
00207     /**
00208      * Read all of the state machine state from a snapshot file
00209      * (including version, sessions, and tree).
00210      */
00211     void loadSnapshot(Core::ProtoBuf::InputStream& stream);
00212 
00213     /**
00214      * Restore the #versionHistory table from a snapshot.
00215      */
00216     void loadVersionHistory(const SnapshotStateMachine::Header& header);
00217 
00218     /**
00219      * Return the #versionHistory table as a protobuf message for writing into
00220      * a snapshot.
00221      */
00222     void serializeVersionHistory(SnapshotStateMachine::Header& header) const;
00223 
00224     /**
00225      * Return true if it is time to create a new snapshot.
00226      * This is called by applyThread as an optimization to avoid waking up
00227      * snapshotThread upon applying every single entry.
00228      * \warning
00229      *      Callers should take care to honor maySnapshotAt; this method
00230      *      ignores it.
00231      */
00232     bool shouldTakeSnapshot(uint64_t lastIncludedIndex) const;
00233 
00234     /**
00235      * Main function for thread that calls takeSnapshot when appropriate.
00236      */
00237     void snapshotThreadMain();
00238 
00239     /**
00240      * Main function for thread that checks the progress of the child process.
00241      */
00242     void snapshotWatchdogThreadMain();
00243 
00244     /**
00245      * Called by snapshotThreadMain to actually take the snapshot.
00246      */
00247     void takeSnapshot(uint64_t lastIncludedIndex,
00248                       std::unique_lock<Core::Mutex>& lockGuard);
00249 
00250     /**
00251      * Called to log a debug message if appropriate when the state machine
00252      * encounters a query or command that is not understood by the current
00253      * running version.
00254      * \param request
00255      *      Problematic command/query.
00256      * \param reason
00257      *      Explains why 'request' is problematic. Should complete the sentence
00258      *      "This version of the state machine (%lu) " + reason, and it should
00259      *      not contain end punctuation.
00260      */
00261     void warnUnknownRequest(const google::protobuf::Message& request,
00262                             const char* reason) const;
00263 
00264     /**
00265      * Consensus module from which this state machine pulls commands and
00266      * snapshots.
00267      */
00268     std::shared_ptr<RaftConsensus> consensus;
00269 
00270     /**
00271      * Server-wide globals. Used to unblock signal handlers in child process.
00272      */
00273     Globals& globals;
00274 
00275     /**
00276      * Used for testing the snapshot watchdog thread. The probability that a
00277      * snapshotting process will deadlock on purpose before starting, as a
00278      * percentage.
00279      */
00280     uint64_t snapshotBlockPercentage;
00281 
00282     /**
00283      * Size in bytes of smallest log to snapshot.
00284      */
00285     uint64_t snapshotMinLogSize;
00286 
00287     /**
00288      * Maximum log size as multiple of last snapshot size until server should
00289      * snapshot.
00290      */
00291     uint64_t snapshotRatio;
00292 
00293     /**
00294      * After this much time has elapsed without any progress, the snapshot
00295      * watchdog thread will kill the snapshotting process. A special value of 0
00296      * disables the watchdog entirely.
00297      */
00298     std::chrono::nanoseconds snapshotWatchdogInterval;
00299 
00300     /**
00301      * The time interval after which to remove an inactive client session, in
00302      * nanoseconds of cluster time.
00303      */
00304     uint64_t sessionTimeoutNanos;
00305 
00306     /**
00307      * The state machine logs messages when it receives a command or query that
00308      * is not understood in the current running version. This controls the
00309      * minimum interval between such messages to prevent spamming the debug
00310      * log.
00311      */
00312     std::chrono::milliseconds unknownRequestMessageBackoff;
00313 
00314     /**
00315      * Protects against concurrent access for all members of this class (except
00316      * 'consensus', which is itself a monitor.
00317      */
00318     mutable Core::Mutex mutex;
00319 
00320     /**
00321      * Notified when lastApplied changes after some entry got applied.
00322      * Also notified upon exiting.
00323      * This is used for client threads to wait; see wait().
00324      */
00325     mutable Core::ConditionVariable entriesApplied;
00326 
00327     /**
00328      * Notified when shouldTakeSnapshot(lastApplied) becomes true.
00329      * Also notified upon exiting and when #maySnapshotAt changes.
00330      * This is used for snapshotThread to wake up only when necessary.
00331      */
00332     mutable Core::ConditionVariable snapshotSuggested;
00333 
00334     /**
00335      * Notified when a snapshot process is forked.
00336      * Also notified upon exiting.
00337      * This is used so that the watchdog thread knows to begin checking the
00338      * progress of the child process, and also in #startTakingSnapshot().
00339      */
00340     mutable Core::ConditionVariable snapshotStarted;
00341 
00342     /**
00343      * Notified when a snapshot process is joined.
00344      * Also notified upon exiting.
00345      * This is used so that #stopTakingSnapshot() knows when it's done.
00346      */
00347     mutable Core::ConditionVariable snapshotCompleted;
00348 
00349     /**
00350      * applyThread sets this to true to signal that the server is shutting
00351      * down.
00352      */
00353     bool exiting;
00354 
00355     /**
00356      * The PID of snapshotThread's child process, if any. This is used by
00357      * applyThread to signal exits: if applyThread is exiting, it sends SIGTERM
00358      * to this child process. A childPid of 0 indicates that there is no child
00359      * process.
00360      */
00361     pid_t childPid;
00362 
00363     /**
00364      * The index of the last log entry that this state machine has applied.
00365      * This variable is only written to by applyThread, so applyThread is free
00366      * to access this variable without holding 'mutex'. Other readers must hold
00367      * 'mutex'.
00368      */
00369     uint64_t lastApplied;
00370 
00371     /**
00372      * The time when warnUnknownRequest() last printed a debug message. Used to
00373      * prevent spamming the debug log.
00374      */
00375     mutable TimePoint lastUnknownRequestMessage;
00376 
00377     /**
00378      * Total number of commands/queries that this state machine either did not
00379      * understand or could not process because they were introduced in a newer
00380      * version.
00381      */
00382     mutable uint64_t numUnknownRequests;
00383 
00384     /**
00385      * The number of debug messages suppressed by warnUnknownRequest() since
00386      * lastUnknownRequestMessage. Used to prevent spamming the debug log.
00387      */
00388     mutable uint64_t numUnknownRequestsSinceLastMessage;
00389 
00390     /**
00391      * The number of times a snapshot has been started.
00392      * In addition to being a useful stat, the watchdog thread uses this to
00393      * know whether it's been watching the same snapshot or whether a new one
00394      * has been started, and startTakingSnapshot() waits for this to change
00395      * before returning.
00396      */
00397     uint64_t numSnapshotsAttempted;
00398 
00399     /**
00400      * The number of times a snapshot child process has failed to exit cleanly.
00401      */
00402     uint64_t numSnapshotsFailed;
00403 
00404     /**
00405      * The number of times a log entry was processed to advance the state
00406      * machine's running version, but the state machine was already at that
00407      * version.
00408      */
00409     uint64_t numRedundantAdvanceVersionEntries;
00410 
00411     /**
00412      * The number of times a log entry was processed to advance the state
00413      * machine's running version, but the state machine was already at a larger
00414      * version.
00415      */
00416     uint64_t numRejectedAdvanceVersionEntries;
00417 
00418     /**
00419      * The number of times a log entry was processed to successfully advance
00420      * the state machine's running version, where the state machine was
00421      * previously at a smaller version.
00422      */
00423     uint64_t numSuccessfulAdvanceVersionEntries;
00424 
00425     /**
00426      * The number of times any log entry to advance the state machine's running
00427      * version was processed. Should be the sum of redundant, rejected, and
00428      * successful counts.
00429      */
00430     uint64_t numTotalAdvanceVersionEntries;
00431 
00432     /**
00433      * Set to true when an administrator has asked the server to take a
00434      * snapshot; set to false once the server starts any snapshot.
00435      * Snapshots that are requested due to this flag are permitted to begin
00436      * even if automated snapshots have been inhibited with #setInhibit().
00437      */
00438     bool isSnapshotRequested;
00439 
00440     /**
00441      * The time at which the server may begin to take automated snapshots.
00442      * Normally this is set to some time in the past. When automated snapshots
00443      * are inhibited with #setInhibit(), this will be set to a future time.
00444      */
00445     TimePoint maySnapshotAt;
00446 
00447     /**
00448      * Tracks state for a particular client.
00449      * Used to prevent duplicate processing of duplicate RPCs.
00450      */
00451     struct Session {
00452         Session()
00453             : lastModified(0)
00454             , firstOutstandingRPC(0)
00455             , responses()
00456         {
00457         }
00458         /**
00459          * When the session was last active, measured in cluster time
00460          * (roughly the number of nanoseconds that the cluster has maintained a
00461          * leader).
00462          */
00463         uint64_t lastModified;
00464         /**
00465          * Largest firstOutstandingRPC number processed from this client.
00466          */
00467         uint64_t firstOutstandingRPC;
00468         /**
00469          * Maps from RPC numbers to responses.
00470          * Responses for RPCs numbered less that firstOutstandingRPC are
00471          * discarded from this map.
00472          */
00473         std::unordered_map<uint64_t,
00474                            Protocol::Client::StateMachineCommand::Response>
00475             responses;
00476     };
00477 
00478     /**
00479      * Client ID to Session map.
00480      */
00481     std::unordered_map<uint64_t, Session> sessions;
00482 
00483     /**
00484      * The hierarchical key-value store. Used in readOnlyTreeRPC and
00485      * readWriteTreeRPC.
00486      */
00487     Tree::Tree tree;
00488 
00489     /**
00490      * The log position when the state machine was updated to each new version.
00491      * First component: log index. Second component: version number.
00492      * Used to evolve state machine over time.
00493      *
00494      * This is used by getResponse() to determine the running version at a
00495      * given log index (to determine whether a command would have been
00496      * applied), and it's used elsewhere to determine the state machine's
00497      * current running version.
00498      *
00499      * Invariant: the pair (index 0, version 1) is always present.
00500      */
00501     std::map<uint64_t, uint16_t> versionHistory;
00502 
00503     /**
00504      * The file that the snapshot is being written into. Also used by to track
00505      * the progress of the child process for the watchdog thread.
00506      * This is non-empty if and only if childPid > 0.
00507      */
00508     std::unique_ptr<Storage::SnapshotFile::Writer> writer;
00509 
00510     /**
00511      * Repeatedly calls into the consensus module to get commands to process
00512      * and applies them.
00513      */
00514     std::thread applyThread;
00515 
00516     /**
00517      * Takes snapshots with the help of a child process.
00518      */
00519     std::thread snapshotThread;
00520 
00521     /**
00522      * Watches the child process to make sure it's writing to #writer, and
00523      * kills it otherwise. This is to detect any possible deadlock that might
00524      * occur if a thread in the parent at the time of the fork held a lock that
00525      * the child process then tried to access.
00526      * See https://github.com/logcabin/logcabin/issues/121 for more rationale.
00527      */
00528     std::thread snapshotWatchdogThread;
00529 };
00530 
00531 } // namespace LogCabin::Server
00532 } // namespace LogCabin
00533 
00534 #endif // LOGCABIN_SERVER_STATEMACHINE_H
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines