LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <memory> 00018 #include <mutex> 00019 #include <thread> 00020 #include <unordered_map> 00021 00022 #include "build/Protocol/Client.pb.h" 00023 #include "build/Server/SnapshotStateMachine.pb.h" 00024 #include "Core/ConditionVariable.h" 00025 #include "Core/Config.h" 00026 #include "Core/Mutex.h" 00027 #include "Core/Time.h" 00028 #include "Tree/Tree.h" 00029 00030 #ifndef LOGCABIN_SERVER_STATEMACHINE_H 00031 #define LOGCABIN_SERVER_STATEMACHINE_H 00032 00033 namespace LogCabin { 00034 namespace Server { 00035 00036 // forward declaration 00037 class Globals; 00038 class RaftConsensus; 00039 00040 /** 00041 * Interprets and executes operations that have been committed into the Raft 00042 * log. 00043 * 00044 * Version history: 00045 * - Version 1 of the State Machine shipped with LogCabin v1.0.0. 00046 * - Version 2 added the CloseSession command, which clients can use when they 00047 * gracefully shut down. 00048 */ 00049 class StateMachine { 00050 public: 00051 typedef Protocol::Client::StateMachineCommand Command; 00052 typedef Protocol::Client::StateMachineQuery Query; 00053 00054 enum { 00055 /** 00056 * This state machine code can behave like all versions between 00057 * MIN_SUPPORTED_VERSION and MAX_SUPPORTED_VERSION, inclusive. 00058 */ 00059 MIN_SUPPORTED_VERSION = 1, 00060 /** 00061 * This state machine code can behave like all versions between 00062 * MIN_SUPPORTED_VERSION and MAX_SUPPORTED_VERSION, inclusive. 00063 */ 00064 MAX_SUPPORTED_VERSION = 2, 00065 }; 00066 00067 00068 StateMachine(std::shared_ptr<RaftConsensus> consensus, 00069 Core::Config& config, 00070 Globals& globals); 00071 ~StateMachine(); 00072 00073 /** 00074 * Called by ClientService to execute read-only queries on the state 00075 * machine. 00076 * \warning 00077 * Be sure to wait() first! 00078 */ 00079 bool query(const Query::Request& request, 00080 Query::Response& response) const; 00081 00082 /** 00083 * Add information about the state machine state to the given structure. 00084 */ 00085 void updateServerStats(Protocol::ServerStats& serverStats) const; 00086 00087 /** 00088 * Return once the state machine has applied at least the given entry. 00089 */ 00090 void wait(uint64_t index) const; 00091 00092 /** 00093 * Called by ClientService to get a response for a read-write command on 00094 * the state machine. 00095 * \param logIndex 00096 * The index in the log where the command was committed. 00097 * \param command 00098 * The request. 00099 * \param[out] response 00100 * If the return value is true, the response will be filled in here. 00101 * Otherwise, this will be unmodified. 00102 */ 00103 bool waitForResponse(uint64_t logIndex, 00104 const Command::Request& command, 00105 Command::Response& response) const; 00106 00107 /** 00108 * Return true if the server is currently taking a snapshot and false 00109 * otherwise. 00110 */ 00111 bool isTakingSnapshot() const; 00112 00113 /** 00114 * If the state machine is not taking a snapshot, this starts one. This 00115 * method will return after the snapshot has been started (it may have 00116 * already completed). 00117 */ 00118 void startTakingSnapshot(); 00119 00120 /** 00121 * If the server is currently taking a snapshot, abort it. This method will 00122 * return after the existing snapshot has been stopped (it's possible that 00123 * another snapshot will have already started). 00124 */ 00125 void stopTakingSnapshot(); 00126 00127 /** 00128 * Return the time for which the state machine will not take any automated 00129 * snapshots. 00130 * \return 00131 * Zero or positive duration. 00132 */ 00133 std::chrono::nanoseconds getInhibit() const; 00134 00135 /** 00136 * Disable automated snapshots for the given duration. 00137 * \param duration 00138 * If zero or negative, immediately enables automated snapshotting. 00139 * If positive, disables automated snapshotting for the given duration 00140 * (or until a later call to #setInhibit()). 00141 * Note that this will not stop the current snapshot; the caller should 00142 * invoke #stopTakingSnapshot() separately if desired. 00143 */ 00144 void setInhibit(std::chrono::nanoseconds duration); 00145 00146 00147 private: 00148 // forward declaration 00149 struct Session; 00150 00151 /// Clock used by watchdog timer thread. 00152 typedef Core::Time::SteadyClock Clock; 00153 /// Point in time of Clock. 00154 typedef Clock::time_point TimePoint; 00155 00156 /** 00157 * Invoked once per committed entry from the Raft log. 00158 */ 00159 void apply(const RaftConsensus::Entry& entry); 00160 00161 /** 00162 * Main function for thread that waits for new commands from Raft. 00163 */ 00164 void applyThreadMain(); 00165 00166 /** 00167 * Return the #sessions table as a protobuf message for writing into a 00168 * snapshot. 00169 */ 00170 void serializeSessions(SnapshotStateMachine::Header& header) const; 00171 00172 /** 00173 * Update the session and clean up unnecessary responses. 00174 * \param session 00175 * Affected session. 00176 * \param firstOutstandingRPC 00177 * New value for the first outstanding RPC for a session. 00178 */ 00179 void expireResponses(Session& session, uint64_t firstOutstandingRPC); 00180 00181 /** 00182 * Remove old sessions. 00183 * \param clusterTime 00184 * Sessions are kept if they have been modified during the last 00185 * timeout period going backwards from the given time. 00186 */ 00187 void expireSessions(uint64_t clusterTime); 00188 00189 /** 00190 * Return the version of the state machine behavior as of the given log 00191 * index. Note that this is based on versionHistory internally, so if 00192 * you're changing that variable at the given index, update it first. 00193 */ 00194 uint16_t getVersion(uint64_t logIndex) const; 00195 00196 /** 00197 * If there is a current snapshot process, send it a signal and return 00198 * immediately. 00199 */ 00200 void killSnapshotProcess(Core::HoldingMutex holdingMutex, int signum); 00201 00202 /** 00203 * Restore the #sessions table from a snapshot. 00204 */ 00205 void loadSessions(const SnapshotStateMachine::Header& header); 00206 00207 /** 00208 * Read all of the state machine state from a snapshot file 00209 * (including version, sessions, and tree). 00210 */ 00211 void loadSnapshot(Core::ProtoBuf::InputStream& stream); 00212 00213 /** 00214 * Restore the #versionHistory table from a snapshot. 00215 */ 00216 void loadVersionHistory(const SnapshotStateMachine::Header& header); 00217 00218 /** 00219 * Return the #versionHistory table as a protobuf message for writing into 00220 * a snapshot. 00221 */ 00222 void serializeVersionHistory(SnapshotStateMachine::Header& header) const; 00223 00224 /** 00225 * Return true if it is time to create a new snapshot. 00226 * This is called by applyThread as an optimization to avoid waking up 00227 * snapshotThread upon applying every single entry. 00228 * \warning 00229 * Callers should take care to honor maySnapshotAt; this method 00230 * ignores it. 00231 */ 00232 bool shouldTakeSnapshot(uint64_t lastIncludedIndex) const; 00233 00234 /** 00235 * Main function for thread that calls takeSnapshot when appropriate. 00236 */ 00237 void snapshotThreadMain(); 00238 00239 /** 00240 * Main function for thread that checks the progress of the child process. 00241 */ 00242 void snapshotWatchdogThreadMain(); 00243 00244 /** 00245 * Called by snapshotThreadMain to actually take the snapshot. 00246 */ 00247 void takeSnapshot(uint64_t lastIncludedIndex, 00248 std::unique_lock<Core::Mutex>& lockGuard); 00249 00250 /** 00251 * Called to log a debug message if appropriate when the state machine 00252 * encounters a query or command that is not understood by the current 00253 * running version. 00254 * \param request 00255 * Problematic command/query. 00256 * \param reason 00257 * Explains why 'request' is problematic. Should complete the sentence 00258 * "This version of the state machine (%lu) " + reason, and it should 00259 * not contain end punctuation. 00260 */ 00261 void warnUnknownRequest(const google::protobuf::Message& request, 00262 const char* reason) const; 00263 00264 /** 00265 * Consensus module from which this state machine pulls commands and 00266 * snapshots. 00267 */ 00268 std::shared_ptr<RaftConsensus> consensus; 00269 00270 /** 00271 * Server-wide globals. Used to unblock signal handlers in child process. 00272 */ 00273 Globals& globals; 00274 00275 /** 00276 * Used for testing the snapshot watchdog thread. The probability that a 00277 * snapshotting process will deadlock on purpose before starting, as a 00278 * percentage. 00279 */ 00280 uint64_t snapshotBlockPercentage; 00281 00282 /** 00283 * Size in bytes of smallest log to snapshot. 00284 */ 00285 uint64_t snapshotMinLogSize; 00286 00287 /** 00288 * Maximum log size as multiple of last snapshot size until server should 00289 * snapshot. 00290 */ 00291 uint64_t snapshotRatio; 00292 00293 /** 00294 * After this much time has elapsed without any progress, the snapshot 00295 * watchdog thread will kill the snapshotting process. A special value of 0 00296 * disables the watchdog entirely. 00297 */ 00298 std::chrono::nanoseconds snapshotWatchdogInterval; 00299 00300 /** 00301 * The time interval after which to remove an inactive client session, in 00302 * nanoseconds of cluster time. 00303 */ 00304 uint64_t sessionTimeoutNanos; 00305 00306 /** 00307 * The state machine logs messages when it receives a command or query that 00308 * is not understood in the current running version. This controls the 00309 * minimum interval between such messages to prevent spamming the debug 00310 * log. 00311 */ 00312 std::chrono::milliseconds unknownRequestMessageBackoff; 00313 00314 /** 00315 * Protects against concurrent access for all members of this class (except 00316 * 'consensus', which is itself a monitor. 00317 */ 00318 mutable Core::Mutex mutex; 00319 00320 /** 00321 * Notified when lastApplied changes after some entry got applied. 00322 * Also notified upon exiting. 00323 * This is used for client threads to wait; see wait(). 00324 */ 00325 mutable Core::ConditionVariable entriesApplied; 00326 00327 /** 00328 * Notified when shouldTakeSnapshot(lastApplied) becomes true. 00329 * Also notified upon exiting and when #maySnapshotAt changes. 00330 * This is used for snapshotThread to wake up only when necessary. 00331 */ 00332 mutable Core::ConditionVariable snapshotSuggested; 00333 00334 /** 00335 * Notified when a snapshot process is forked. 00336 * Also notified upon exiting. 00337 * This is used so that the watchdog thread knows to begin checking the 00338 * progress of the child process, and also in #startTakingSnapshot(). 00339 */ 00340 mutable Core::ConditionVariable snapshotStarted; 00341 00342 /** 00343 * Notified when a snapshot process is joined. 00344 * Also notified upon exiting. 00345 * This is used so that #stopTakingSnapshot() knows when it's done. 00346 */ 00347 mutable Core::ConditionVariable snapshotCompleted; 00348 00349 /** 00350 * applyThread sets this to true to signal that the server is shutting 00351 * down. 00352 */ 00353 bool exiting; 00354 00355 /** 00356 * The PID of snapshotThread's child process, if any. This is used by 00357 * applyThread to signal exits: if applyThread is exiting, it sends SIGTERM 00358 * to this child process. A childPid of 0 indicates that there is no child 00359 * process. 00360 */ 00361 pid_t childPid; 00362 00363 /** 00364 * The index of the last log entry that this state machine has applied. 00365 * This variable is only written to by applyThread, so applyThread is free 00366 * to access this variable without holding 'mutex'. Other readers must hold 00367 * 'mutex'. 00368 */ 00369 uint64_t lastApplied; 00370 00371 /** 00372 * The time when warnUnknownRequest() last printed a debug message. Used to 00373 * prevent spamming the debug log. 00374 */ 00375 mutable TimePoint lastUnknownRequestMessage; 00376 00377 /** 00378 * Total number of commands/queries that this state machine either did not 00379 * understand or could not process because they were introduced in a newer 00380 * version. 00381 */ 00382 mutable uint64_t numUnknownRequests; 00383 00384 /** 00385 * The number of debug messages suppressed by warnUnknownRequest() since 00386 * lastUnknownRequestMessage. Used to prevent spamming the debug log. 00387 */ 00388 mutable uint64_t numUnknownRequestsSinceLastMessage; 00389 00390 /** 00391 * The number of times a snapshot has been started. 00392 * In addition to being a useful stat, the watchdog thread uses this to 00393 * know whether it's been watching the same snapshot or whether a new one 00394 * has been started, and startTakingSnapshot() waits for this to change 00395 * before returning. 00396 */ 00397 uint64_t numSnapshotsAttempted; 00398 00399 /** 00400 * The number of times a snapshot child process has failed to exit cleanly. 00401 */ 00402 uint64_t numSnapshotsFailed; 00403 00404 /** 00405 * The number of times a log entry was processed to advance the state 00406 * machine's running version, but the state machine was already at that 00407 * version. 00408 */ 00409 uint64_t numRedundantAdvanceVersionEntries; 00410 00411 /** 00412 * The number of times a log entry was processed to advance the state 00413 * machine's running version, but the state machine was already at a larger 00414 * version. 00415 */ 00416 uint64_t numRejectedAdvanceVersionEntries; 00417 00418 /** 00419 * The number of times a log entry was processed to successfully advance 00420 * the state machine's running version, where the state machine was 00421 * previously at a smaller version. 00422 */ 00423 uint64_t numSuccessfulAdvanceVersionEntries; 00424 00425 /** 00426 * The number of times any log entry to advance the state machine's running 00427 * version was processed. Should be the sum of redundant, rejected, and 00428 * successful counts. 00429 */ 00430 uint64_t numTotalAdvanceVersionEntries; 00431 00432 /** 00433 * Set to true when an administrator has asked the server to take a 00434 * snapshot; set to false once the server starts any snapshot. 00435 * Snapshots that are requested due to this flag are permitted to begin 00436 * even if automated snapshots have been inhibited with #setInhibit(). 00437 */ 00438 bool isSnapshotRequested; 00439 00440 /** 00441 * The time at which the server may begin to take automated snapshots. 00442 * Normally this is set to some time in the past. When automated snapshots 00443 * are inhibited with #setInhibit(), this will be set to a future time. 00444 */ 00445 TimePoint maySnapshotAt; 00446 00447 /** 00448 * Tracks state for a particular client. 00449 * Used to prevent duplicate processing of duplicate RPCs. 00450 */ 00451 struct Session { 00452 Session() 00453 : lastModified(0) 00454 , firstOutstandingRPC(0) 00455 , responses() 00456 { 00457 } 00458 /** 00459 * When the session was last active, measured in cluster time 00460 * (roughly the number of nanoseconds that the cluster has maintained a 00461 * leader). 00462 */ 00463 uint64_t lastModified; 00464 /** 00465 * Largest firstOutstandingRPC number processed from this client. 00466 */ 00467 uint64_t firstOutstandingRPC; 00468 /** 00469 * Maps from RPC numbers to responses. 00470 * Responses for RPCs numbered less that firstOutstandingRPC are 00471 * discarded from this map. 00472 */ 00473 std::unordered_map<uint64_t, 00474 Protocol::Client::StateMachineCommand::Response> 00475 responses; 00476 }; 00477 00478 /** 00479 * Client ID to Session map. 00480 */ 00481 std::unordered_map<uint64_t, Session> sessions; 00482 00483 /** 00484 * The hierarchical key-value store. Used in readOnlyTreeRPC and 00485 * readWriteTreeRPC. 00486 */ 00487 Tree::Tree tree; 00488 00489 /** 00490 * The log position when the state machine was updated to each new version. 00491 * First component: log index. Second component: version number. 00492 * Used to evolve state machine over time. 00493 * 00494 * This is used by getResponse() to determine the running version at a 00495 * given log index (to determine whether a command would have been 00496 * applied), and it's used elsewhere to determine the state machine's 00497 * current running version. 00498 * 00499 * Invariant: the pair (index 0, version 1) is always present. 00500 */ 00501 std::map<uint64_t, uint16_t> versionHistory; 00502 00503 /** 00504 * The file that the snapshot is being written into. Also used by to track 00505 * the progress of the child process for the watchdog thread. 00506 * This is non-empty if and only if childPid > 0. 00507 */ 00508 std::unique_ptr<Storage::SnapshotFile::Writer> writer; 00509 00510 /** 00511 * Repeatedly calls into the consensus module to get commands to process 00512 * and applies them. 00513 */ 00514 std::thread applyThread; 00515 00516 /** 00517 * Takes snapshots with the help of a child process. 00518 */ 00519 std::thread snapshotThread; 00520 00521 /** 00522 * Watches the child process to make sure it's writing to #writer, and 00523 * kills it otherwise. This is to detect any possible deadlock that might 00524 * occur if a thread in the parent at the time of the fork held a lock that 00525 * the child process then tried to access. 00526 * See https://github.com/logcabin/logcabin/issues/121 for more rationale. 00527 */ 00528 std::thread snapshotWatchdogThread; 00529 }; 00530 00531 } // namespace LogCabin::Server 00532 } // namespace LogCabin 00533 00534 #endif // LOGCABIN_SERVER_STATEMACHINE_H