LogCabin
Server/StateMachine.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <unistd.h>
00018 #include <sys/types.h>
00019 #include <sys/wait.h>
00020 
00021 #include "Core/Debug.h"
00022 #include "Core/Mutex.h"
00023 #include "Core/ProtoBuf.h"
00024 #include "Core/Random.h"
00025 #include "Core/ThreadId.h"
00026 #include "Core/Util.h"
00027 #include "Server/Globals.h"
00028 #include "Server/RaftConsensus.h"
00029 #include "Server/StateMachine.h"
00030 #include "Storage/SnapshotFile.h"
00031 #include "Tree/ProtoBuf.h"
00032 
00033 namespace LogCabin {
00034 namespace Server {
00035 
00036 namespace PC = LogCabin::Protocol::Client;
00037 
00038 
00039 // for testing purposes
00040 bool stateMachineSuppressThreads = false;
00041 uint32_t stateMachineChildSleepMs = 0;
00042 
00043 StateMachine::StateMachine(std::shared_ptr<RaftConsensus> consensus,
00044                            Core::Config& config,
00045                            Globals& globals)
00046     : consensus(consensus)
00047     , globals(globals)
00048       // This configuration option isn't advertised as part of the public API:
00049       // it's only useful for testing.
00050     , snapshotBlockPercentage(
00051             config.read<uint64_t>("snapshotBlockPercentage", 0))
00052     , snapshotMinLogSize(
00053             config.read<uint64_t>("snapshotMinLogSize", 64UL * 1024 * 1024))
00054     , snapshotRatio(
00055             config.read<uint64_t>("snapshotRatio", 4))
00056     , snapshotWatchdogInterval(std::chrono::milliseconds(
00057             config.read<uint64_t>("snapshotWatchdogMilliseconds", 10000)))
00058       // TODO(ongaro): This should be configurable, but it must be the same for
00059       // every server, so it's dangerous to put it in the config file. Need to
00060       // use the Raft log to agree on this value. Also need to inform clients
00061       // of the value and its changes, so that they can send keep-alives at
00062       // appropriate intervals. For now, servers time out after about an hour,
00063       // and clients send keep-alives every minute.
00064     , sessionTimeoutNanos(1000UL * 1000 * 1000 * 60 * 60)
00065     , unknownRequestMessageBackoff(std::chrono::milliseconds(
00066             config.read<uint64_t>("stateMachineUnknownRequestMessage"
00067                                   "BackoffMilliseconds", 10000)))
00068     , mutex()
00069     , entriesApplied()
00070     , snapshotSuggested()
00071     , snapshotStarted()
00072     , snapshotCompleted()
00073     , exiting(false)
00074     , childPid(0)
00075     , lastApplied(0)
00076     , lastUnknownRequestMessage(TimePoint::min())
00077     , numUnknownRequests(0)
00078     , numUnknownRequestsSinceLastMessage(0)
00079     , numSnapshotsAttempted(0)
00080     , numSnapshotsFailed(0)
00081     , numRedundantAdvanceVersionEntries(0)
00082     , numRejectedAdvanceVersionEntries(0)
00083     , numSuccessfulAdvanceVersionEntries(0)
00084     , numTotalAdvanceVersionEntries(0)
00085     , isSnapshotRequested(false)
00086     , maySnapshotAt(TimePoint::min())
00087     , sessions()
00088     , tree()
00089     , versionHistory()
00090     , writer()
00091     , applyThread()
00092     , snapshotThread()
00093     , snapshotWatchdogThread()
00094 {
00095     versionHistory.insert({0, 1});
00096     consensus->setSupportedStateMachineVersions(MIN_SUPPORTED_VERSION,
00097                                                 MAX_SUPPORTED_VERSION);
00098     if (!stateMachineSuppressThreads) {
00099         applyThread = std::thread(&StateMachine::applyThreadMain, this);
00100         snapshotThread = std::thread(&StateMachine::snapshotThreadMain, this);
00101         snapshotWatchdogThread = std::thread(
00102                 &StateMachine::snapshotWatchdogThreadMain, this);
00103     }
00104 }
00105 
00106 StateMachine::~StateMachine()
00107 {
00108     NOTICE("Shutting down");
00109     if (consensus) // sometimes missing for testing
00110         consensus->exit();
00111     if (applyThread.joinable())
00112         applyThread.join();
00113     if (snapshotThread.joinable())
00114         snapshotThread.join();
00115     if (snapshotWatchdogThread.joinable())
00116         snapshotWatchdogThread.join();
00117     NOTICE("Joined with threads");
00118 }
00119 
00120 bool
00121 StateMachine::query(const Query::Request& request,
00122                     Query::Response& response) const
00123 {
00124     std::lock_guard<Core::Mutex> lockGuard(mutex);
00125     if (request.has_tree()) {
00126         Tree::ProtoBuf::readOnlyTreeRPC(tree,
00127                                         request.tree(),
00128                                         *response.mutable_tree());
00129         return true;
00130     }
00131     warnUnknownRequest(request, "does not understand the given request");
00132     return false;
00133 }
00134 
00135 void
00136 StateMachine::updateServerStats(Protocol::ServerStats& serverStats) const
00137 {
00138     std::lock_guard<Core::Mutex> lockGuard(mutex);
00139     Core::Time::SteadyTimeConverter time;
00140     serverStats.clear_state_machine();
00141     Protocol::ServerStats::StateMachine& smStats =
00142         *serverStats.mutable_state_machine();
00143     smStats.set_snapshotting(childPid != 0);
00144     smStats.set_last_applied(lastApplied);
00145     smStats.set_num_sessions(sessions.size());
00146     smStats.set_num_unknown_requests(numUnknownRequests);
00147     smStats.set_num_snapshots_attempted(numSnapshotsAttempted);
00148     smStats.set_num_snapshots_failed(numSnapshotsFailed);
00149     smStats.set_num_redundant_advance_version_entries(
00150         numRedundantAdvanceVersionEntries);
00151     smStats.set_num_rejected_advance_version_entries(
00152         numRejectedAdvanceVersionEntries);
00153     smStats.set_num_successful_advance_version_entries(
00154         numSuccessfulAdvanceVersionEntries);
00155     smStats.set_num_total_advance_version_entries(
00156         numTotalAdvanceVersionEntries);
00157     smStats.set_min_supported_version(MIN_SUPPORTED_VERSION);
00158     smStats.set_max_supported_version(MAX_SUPPORTED_VERSION);
00159     smStats.set_running_version(getVersion(lastApplied));
00160     smStats.set_may_snapshot_at(time.unixNanos(maySnapshotAt));
00161     tree.updateServerStats(*smStats.mutable_tree());
00162 }
00163 
00164 void
00165 StateMachine::wait(uint64_t index) const
00166 {
00167     std::unique_lock<Core::Mutex> lockGuard(mutex);
00168     while (lastApplied < index)
00169         entriesApplied.wait(lockGuard);
00170 }
00171 
00172 bool
00173 StateMachine::waitForResponse(uint64_t logIndex,
00174                               const Command::Request& command,
00175                               Command::Response& response) const
00176 {
00177     std::unique_lock<Core::Mutex> lockGuard(mutex);
00178     while (lastApplied < logIndex)
00179         entriesApplied.wait(lockGuard);
00180 
00181     // Need to check whether we understood the request at the time it
00182     // was applied using getVersion(logIndex), then reply and return true/false
00183     // based on that. Existing commands have been around since version 1, so we
00184     // skip this check for now.
00185     uint16_t versionThen = getVersion(logIndex);
00186 
00187     if (command.has_tree()) {
00188         const PC::ExactlyOnceRPCInfo& rpcInfo = command.tree().exactly_once();
00189         auto sessionIt = sessions.find(rpcInfo.client_id());
00190         if (sessionIt == sessions.end()) {
00191             WARNING("Client %lu session expired but client still active",
00192                     rpcInfo.client_id());
00193             response.mutable_tree()->
00194                 set_status(PC::Status::SESSION_EXPIRED);
00195             return true;
00196         }
00197         const Session& session = sessionIt->second;
00198         auto responseIt = session.responses.find(rpcInfo.rpc_number());
00199         if (responseIt == session.responses.end()) {
00200             // The response for this RPC has already been removed: the client
00201             // is not waiting for it. This request is just a duplicate that is
00202             // safe to drop.
00203             WARNING("Client %lu asking for discarded response to RPC %lu",
00204                     rpcInfo.client_id(), rpcInfo.rpc_number());
00205             response.mutable_tree()->
00206                 set_status(PC::Status::SESSION_EXPIRED);
00207             return true;
00208         }
00209         response = responseIt->second;
00210         return true;
00211     } else if (command.has_open_session()) {
00212         response.mutable_open_session()->
00213             set_client_id(logIndex);
00214         return true;
00215     } else if (versionThen >= 2 && command.has_close_session()) {
00216         response.mutable_close_session(); // no fields to set
00217         return true;
00218     } else if (command.has_advance_version()) {
00219         response.mutable_advance_version()->
00220             set_running_version(versionThen);
00221         return true;
00222     }
00223     // don't warnUnknownRequest here, since we already did so in apply()
00224     return false;
00225 }
00226 
00227 bool
00228 StateMachine::isTakingSnapshot() const
00229 {
00230     std::lock_guard<Core::Mutex> lockGuard(mutex);
00231     return childPid != 0;
00232 }
00233 
00234 void
00235 StateMachine::startTakingSnapshot()
00236 {
00237     std::unique_lock<Core::Mutex> lockGuard(mutex);
00238     if (childPid == 0) {
00239         NOTICE("Administrator requested snapshot");
00240         isSnapshotRequested = true;
00241         snapshotSuggested.notify_all();
00242         // This waits on numSnapshotsAttempted to change, since waiting on
00243         // childPid != 0 would risk missing an entire snapshot that started and
00244         // completed before this thread was scheduled.
00245         uint64_t nextSnapshot = numSnapshotsAttempted + 1;
00246         while (!exiting && numSnapshotsAttempted < nextSnapshot) {
00247             snapshotStarted.wait(lockGuard);
00248         }
00249     }
00250 }
00251 
00252 void
00253 StateMachine::stopTakingSnapshot()
00254 {
00255     std::unique_lock<Core::Mutex> lockGuard(mutex);
00256     int pid = childPid;
00257     if (pid != 0) {
00258         NOTICE("Administrator aborted snapshot");
00259         killSnapshotProcess(Core::HoldingMutex(lockGuard), SIGTERM);
00260         while (!exiting && pid == childPid) {
00261             snapshotCompleted.wait(lockGuard);
00262         }
00263     }
00264 }
00265 
00266 std::chrono::nanoseconds
00267 StateMachine::getInhibit() const
00268 {
00269     std::lock_guard<Core::Mutex> lockGuard(mutex);
00270     TimePoint now = Clock::now();
00271     if (maySnapshotAt <= now) {
00272         return std::chrono::nanoseconds::zero();
00273     } else {
00274         return maySnapshotAt - now;
00275     }
00276 }
00277 
00278 void
00279 StateMachine::setInhibit(std::chrono::nanoseconds duration)
00280 {
00281     std::lock_guard<Core::Mutex> lockGuard(mutex);
00282     if (duration <= std::chrono::nanoseconds::zero()) {
00283         maySnapshotAt = TimePoint::min();
00284         NOTICE("Administrator permitted snapshotting");
00285     } else {
00286         TimePoint now = Clock::now();
00287         maySnapshotAt = now + duration;
00288         if (maySnapshotAt < now) { // overflow
00289             maySnapshotAt = TimePoint::max();
00290         }
00291         NOTICE("Administrator inhibited snapshotting for the next %s",
00292                Core::StringUtil::toString(maySnapshotAt - now).c_str());
00293     }
00294     snapshotSuggested.notify_all();
00295 }
00296 
00297 
00298 ////////// StateMachine private methods //////////
00299 
00300 void
00301 StateMachine::apply(const RaftConsensus::Entry& entry)
00302 {
00303     Command::Request command;
00304     if (!Core::ProtoBuf::parse(entry.command, command)) {
00305         PANIC("Failed to parse protobuf for entry %lu",
00306               entry.index);
00307     }
00308     uint16_t runningVersion = getVersion(entry.index - 1);
00309     if (command.has_tree()) {
00310         PC::ExactlyOnceRPCInfo rpcInfo = command.tree().exactly_once();
00311         auto it = sessions.find(rpcInfo.client_id());
00312         if (it == sessions.end()) {
00313             // session does not exist
00314         } else {
00315             // session exists
00316             Session& session = it->second;
00317             expireResponses(session, rpcInfo.first_outstanding_rpc());
00318             if (rpcInfo.rpc_number() < session.firstOutstandingRPC) {
00319                 // response already discarded, do not re-apply
00320             } else {
00321                 auto inserted = session.responses.insert(
00322                                                 {rpcInfo.rpc_number(), {}});
00323                 if (inserted.second) {
00324                     // response not found, apply and save it
00325                     Tree::ProtoBuf::readWriteTreeRPC(
00326                         tree,
00327                         command.tree(),
00328                         *inserted.first->second.mutable_tree());
00329                     session.lastModified = entry.clusterTime;
00330                 } else {
00331                     // response exists, do not re-apply
00332                 }
00333             }
00334         }
00335     } else if (command.has_open_session()) {
00336         uint64_t clientId = entry.index;
00337         Session& session = sessions.insert({clientId, {}}).first->second;
00338         session.lastModified = entry.clusterTime;
00339     } else if (command.has_close_session()) {
00340         if (runningVersion >= 2) {
00341             sessions.erase(command.close_session().client_id());
00342         } else {
00343             // Command is ignored in version < 2.
00344             warnUnknownRequest(command, "may not process the given request, "
00345                                "which was introduced in version 2");
00346         }
00347     } else if (command.has_advance_version()) {
00348         uint16_t requested = Core::Util::downCast<uint16_t>(
00349                 command.advance_version(). requested_version());
00350         if (requested < runningVersion) {
00351             WARNING("Rejecting downgrade of state machine version "
00352                     "(running version %u but command at log index %lu wants "
00353                     "to switch to version %u)",
00354                     runningVersion,
00355                     entry.index,
00356                     requested);
00357             ++numRejectedAdvanceVersionEntries;
00358         } else if (requested > runningVersion) {
00359             if (requested > MAX_SUPPORTED_VERSION) {
00360                 PANIC("Cannot upgrade state machine to version %u (from %u) "
00361                       "because this code only supports up to version %u",
00362                       requested,
00363                       runningVersion,
00364                       MAX_SUPPORTED_VERSION);
00365             } else {
00366                 NOTICE("Upgrading state machine to version %u (from %u)",
00367                        requested,
00368                        runningVersion);
00369                 versionHistory.insert({entry.index, requested});
00370             }
00371             ++numSuccessfulAdvanceVersionEntries;
00372         } else { // requested == runningVersion
00373             // nothing to do
00374             // If this stat is high, see note in RaftConsensus.cc.
00375             ++numRedundantAdvanceVersionEntries;
00376         }
00377         ++numTotalAdvanceVersionEntries;
00378     } else { // unknown command
00379         // This is (deterministically) ignored by all state machines running
00380         // the current version.
00381         warnUnknownRequest(command, "does not understand the given request");
00382     }
00383 }
00384 
00385 void
00386 StateMachine::applyThreadMain()
00387 {
00388     Core::ThreadId::setName("StateMachine");
00389     try {
00390         while (true) {
00391             RaftConsensus::Entry entry = consensus->getNextEntry(lastApplied);
00392             std::lock_guard<Core::Mutex> lockGuard(mutex);
00393             switch (entry.type) {
00394                 case RaftConsensus::Entry::SKIP:
00395                     break;
00396                 case RaftConsensus::Entry::DATA:
00397                     apply(entry);
00398                     break;
00399                 case RaftConsensus::Entry::SNAPSHOT:
00400                     NOTICE("Loading snapshot through entry %lu into state "
00401                            "machine", entry.index);
00402                     loadSnapshot(*entry.snapshotReader);
00403                     NOTICE("Done loading snapshot");
00404                     break;
00405             }
00406             expireSessions(entry.clusterTime);
00407             lastApplied = entry.index;
00408             entriesApplied.notify_all();
00409             if (shouldTakeSnapshot(lastApplied) &&
00410                 maySnapshotAt <= Clock::now()) {
00411                 snapshotSuggested.notify_all();
00412             }
00413         }
00414     } catch (const Core::Util::ThreadInterruptedException&) {
00415         NOTICE("exiting");
00416         std::lock_guard<Core::Mutex> lockGuard(mutex);
00417         exiting = true;
00418         entriesApplied.notify_all();
00419         snapshotSuggested.notify_all();
00420         snapshotStarted.notify_all();
00421         snapshotCompleted.notify_all();
00422         killSnapshotProcess(Core::HoldingMutex(lockGuard), SIGTERM);
00423     }
00424 }
00425 
00426 void
00427 StateMachine::serializeSessions(SnapshotStateMachine::Header& header) const
00428 {
00429     for (auto it = sessions.begin(); it != sessions.end(); ++it) {
00430         SnapshotStateMachine::Session& session = *header.add_session();
00431         session.set_client_id(it->first);
00432         session.set_last_modified(it->second.lastModified);
00433         session.set_first_outstanding_rpc(it->second.firstOutstandingRPC);
00434         for (auto it2 = it->second.responses.begin();
00435              it2 != it->second.responses.end();
00436              ++it2) {
00437             SnapshotStateMachine::Response& response =
00438                 *session.add_rpc_response();
00439             response.set_rpc_number(it2->first);
00440             *response.mutable_response() = it2->second;
00441         }
00442     }
00443 }
00444 
00445 void
00446 StateMachine::expireResponses(Session& session, uint64_t firstOutstandingRPC)
00447 {
00448     if (session.firstOutstandingRPC >= firstOutstandingRPC)
00449         return;
00450     session.firstOutstandingRPC = firstOutstandingRPC;
00451     auto it = session.responses.begin();
00452     while (it != session.responses.end()) {
00453         if (it->first < session.firstOutstandingRPC)
00454             it = session.responses.erase(it);
00455         else
00456             ++it;
00457     }
00458 }
00459 
00460 void
00461 StateMachine::expireSessions(uint64_t clusterTime)
00462 {
00463     auto it = sessions.begin();
00464     while (it != sessions.end()) {
00465         Session& session = it->second;
00466         uint64_t expireTime = session.lastModified + sessionTimeoutNanos;
00467         if (expireTime < clusterTime) {
00468             uint64_t diffNanos = clusterTime - session.lastModified;
00469             NOTICE("Expiring client %lu's session after %lu.%09lu seconds "
00470                    "of cluster time due to inactivity",
00471                    it->first,
00472                    diffNanos / (1000 * 1000 * 1000UL),
00473                    diffNanos % (1000 * 1000 * 1000UL));
00474             it = sessions.erase(it);
00475         } else {
00476             ++it;
00477         }
00478     }
00479 }
00480 
00481 uint16_t
00482 StateMachine::getVersion(uint64_t logIndex) const
00483 {
00484     auto it = versionHistory.upper_bound(logIndex);
00485     --it;
00486     return it->second;
00487 }
00488 
00489 void
00490 StateMachine::killSnapshotProcess(Core::HoldingMutex holdingMutex,
00491                                   int signum)
00492 {
00493     if (childPid != 0) {
00494         int r = kill(childPid, signum);
00495         if (r != 0) {
00496             WARNING("Could not send %s to child process (%d): %s",
00497                     strsignal(signum),
00498                     childPid,
00499                     strerror(errno));
00500         }
00501     }
00502 }
00503 
00504 void
00505 StateMachine::loadSessions(const SnapshotStateMachine::Header& header)
00506 {
00507     sessions.clear();
00508     for (auto it = header.session().begin();
00509          it != header.session().end();
00510          ++it) {
00511         Session& session = sessions.insert({it->client_id(), {}})
00512                                                         .first->second;
00513         session.lastModified = it->last_modified();
00514         session.firstOutstandingRPC = it->first_outstanding_rpc();
00515         for (auto it2 = it->rpc_response().begin();
00516              it2 != it->rpc_response().end();
00517              ++it2) {
00518             session.responses.insert({it2->rpc_number(), it2->response()});
00519         }
00520     }
00521 }
00522 
00523 void
00524 StateMachine::loadSnapshot(Core::ProtoBuf::InputStream& stream)
00525 {
00526     // Check that this snapshot uses format version 1
00527     uint8_t formatVersion = 0;
00528     uint64_t bytesRead = stream.readRaw(&formatVersion, sizeof(formatVersion));
00529     if (bytesRead < sizeof(formatVersion)) {
00530         PANIC("Snapshot contents are empty (no format version field)");
00531     }
00532     if (formatVersion != 1) {
00533         PANIC("Snapshot contents format version read was %u, but this "
00534               "code can only read version 1",
00535               formatVersion);
00536     }
00537 
00538     // Load snapshot header
00539     {
00540         SnapshotStateMachine::Header header;
00541         std::string error = stream.readMessage(header);
00542         if (!error.empty()) {
00543             PANIC("Couldn't read state machine header from snapshot: %s",
00544                   error.c_str());
00545         }
00546         loadVersionHistory(header);
00547         loadSessions(header);
00548     }
00549 
00550     // Load the tree's state
00551     tree.loadSnapshot(stream);
00552 }
00553 
00554 void
00555 StateMachine::loadVersionHistory(const SnapshotStateMachine::Header& header)
00556 {
00557     versionHistory.clear();
00558     versionHistory.insert({0, 1});
00559     for (auto it = header.version_update().begin();
00560          it != header.version_update().end();
00561          ++it) {
00562         versionHistory.insert({it->log_index(),
00563                                Core::Util::downCast<uint16_t>(it->version())});
00564     }
00565 
00566     // The version of the current state machine behavior.
00567     uint16_t running = versionHistory.rbegin()->second;
00568     if (running < MIN_SUPPORTED_VERSION ||
00569         running > MAX_SUPPORTED_VERSION) {
00570         PANIC("State machine version read from snapshot was %u, but this "
00571               "code only supports %u through %u (inclusive)",
00572               running,
00573               MIN_SUPPORTED_VERSION,
00574               MAX_SUPPORTED_VERSION);
00575     }
00576 }
00577 
00578 void
00579 StateMachine::serializeVersionHistory(
00580         SnapshotStateMachine::Header& header) const
00581 {
00582     for (auto it = versionHistory.begin();
00583          it != versionHistory.end();
00584          ++it) {
00585         SnapshotStateMachine::VersionUpdate& update =
00586             *header.add_version_update();
00587         update.set_log_index(it->first);
00588         update.set_version(it->second);
00589     }
00590 }
00591 
00592 bool
00593 StateMachine::shouldTakeSnapshot(uint64_t lastIncludedIndex) const
00594 {
00595     SnapshotStats::SnapshotStats stats = consensus->getSnapshotStats();
00596 
00597     // print every 10% but not at 100% because then we'd be printing all the
00598     // time
00599     uint64_t curr = 0;
00600     if (lastIncludedIndex > stats.last_snapshot_index())
00601         curr = lastIncludedIndex - stats.last_snapshot_index();
00602     uint64_t prev = curr - 1;
00603     uint64_t logEntries = stats.last_log_index() - stats.last_snapshot_index();
00604     if (curr != logEntries &&
00605         10 * prev / logEntries != 10 * curr / logEntries) {
00606         NOTICE("Have applied %lu%% of the %lu total log entries",
00607                100 * curr / logEntries,
00608                logEntries);
00609     }
00610 
00611     if (stats.log_bytes() < snapshotMinLogSize)
00612         return false;
00613     if (stats.log_bytes() < stats.last_snapshot_bytes() * snapshotRatio)
00614         return false;
00615     if (lastIncludedIndex < stats.last_snapshot_index())
00616         return false;
00617     if (lastIncludedIndex < stats.last_log_index() * 3 / 4)
00618         return false;
00619     return true;
00620 }
00621 
00622 void
00623 StateMachine::snapshotThreadMain()
00624 {
00625     Core::ThreadId::setName("SnapshotStateMachine");
00626     std::unique_lock<Core::Mutex> lockGuard(mutex);
00627     bool wasInhibited = false;
00628     while (!exiting) {
00629         bool inhibited = (maySnapshotAt > Clock::now());
00630 
00631         TimePoint waitUntil = TimePoint::max();
00632         if (inhibited)
00633             waitUntil = maySnapshotAt;
00634 
00635         if (wasInhibited && !inhibited)
00636             NOTICE("Now permitted to take snapshots");
00637         wasInhibited = inhibited;
00638 
00639         if (isSnapshotRequested ||
00640             (!inhibited && shouldTakeSnapshot(lastApplied))) {
00641 
00642             isSnapshotRequested = false;
00643             takeSnapshot(lastApplied, lockGuard);
00644             continue;
00645         }
00646 
00647         snapshotSuggested.wait_until(lockGuard, waitUntil);
00648     }
00649 }
00650 
00651 void
00652 StateMachine::snapshotWatchdogThreadMain()
00653 {
00654     using Core::StringUtil::toString;
00655     Core::ThreadId::setName("SnapshotStateMachineWatchdog");
00656     std::unique_lock<Core::Mutex> lockGuard(mutex);
00657 
00658     // The snapshot process that this thread is currently tracking, based on
00659     // numSnapshotsAttempted. If set to ~0UL, this thread is not currently
00660     // tracking a snapshot process.
00661     uint64_t tracking = ~0UL;
00662     // The value of writer->sharedBytesWritten at the "start" time.
00663     uint64_t startProgress = 0;
00664     // The time at the "start" time.
00665     TimePoint startTime = TimePoint::min();
00666     // Special value for infinite interval.
00667     const std::chrono::nanoseconds zero = std::chrono::nanoseconds::zero();
00668 
00669     while (!exiting) {
00670         TimePoint waitUntil = TimePoint::max();
00671         TimePoint now = Clock::now();
00672 
00673         if (childPid > 0) { // there is some child process
00674             uint64_t currentProgress = *writer->sharedBytesWritten.value;
00675             if (tracking == numSnapshotsAttempted) { // tracking current child
00676                 if (snapshotWatchdogInterval != zero &&
00677                     now >= startTime + snapshotWatchdogInterval) { // check
00678                     if (currentProgress == startProgress) {
00679                         ERROR("Snapshot process (counter %lu, pid %u) made no "
00680                               "progress for %s. Killing it. If this happens "
00681                               "at all often, you should file a bug to "
00682                               "understand the root cause.",
00683                               numSnapshotsAttempted,
00684                               childPid,
00685                               toString(snapshotWatchdogInterval).c_str());
00686                         killSnapshotProcess(Core::HoldingMutex(lockGuard),
00687                                             SIGKILL);
00688                         // Don't kill for another interval,
00689                         // hopefully child will be reaped by then.
00690                     }
00691                     startProgress = currentProgress;
00692                     startTime = now;
00693                 } else {
00694                     // woke up too early, nothing to do
00695                 }
00696             } else { // not yet tracking this child
00697                 VERBOSE("Beginning to track snapshot process "
00698                         "(counter %lu, pid %u)",
00699                         numSnapshotsAttempted,
00700                         childPid);
00701                 tracking = numSnapshotsAttempted;
00702                 startProgress = currentProgress;
00703                 startTime = now;
00704             }
00705             if (snapshotWatchdogInterval != zero)
00706                 waitUntil = startTime + snapshotWatchdogInterval;
00707         } else { // no child process
00708             if (tracking != ~0UL) {
00709                 VERBOSE("Snapshot ended: no longer tracking (counter %lu)",
00710                         tracking);
00711                 tracking = ~0UL;
00712             }
00713         }
00714         snapshotStarted.wait_until(lockGuard, waitUntil);
00715     }
00716 }
00717 
00718 
00719 void
00720 StateMachine::takeSnapshot(uint64_t lastIncludedIndex,
00721                            std::unique_lock<Core::Mutex>& lockGuard)
00722 {
00723     // Open a snapshot file, then fork a child to write a consistent view of
00724     // the state machine to the snapshot file while this process continues
00725     // accepting requests.
00726     writer = consensus->beginSnapshot(lastIncludedIndex);
00727     // Flush the outstanding changes to the snapshot now so that they
00728     // aren't somehow double-flushed later.
00729     writer->flushToOS();
00730 
00731     ++numSnapshotsAttempted;
00732     snapshotStarted.notify_all();
00733 
00734     pid_t pid = fork();
00735     if (pid == -1) { // error
00736         PANIC("Couldn't fork: %s", strerror(errno));
00737     } else if (pid == 0) { // child
00738         Core::Debug::processName += "-child";
00739         globals.unblockAllSignals();
00740         usleep(stateMachineChildSleepMs * 1000); // for testing purposes
00741         if (snapshotBlockPercentage > 0) { // for testing purposes
00742             if (Core::Random::randomRange(0, 100) < snapshotBlockPercentage) {
00743                 WARNING("Purposely deadlocking child (probability is %lu%%)",
00744                         snapshotBlockPercentage);
00745                 std::mutex mutex;
00746                 mutex.lock();
00747                 mutex.lock(); // intentional deadlock
00748             }
00749         }
00750 
00751         // Format version of snapshot contents is 1.
00752         uint8_t formatVersion = 1;
00753         writer->writeRaw(&formatVersion, sizeof(formatVersion));
00754         // StateMachine state comes next
00755         {
00756             SnapshotStateMachine::Header header;
00757             serializeVersionHistory(header);
00758             serializeSessions(header);
00759             writer->writeMessage(header);
00760         }
00761         // Then the Tree itself (this one is potentially large)
00762         tree.dumpSnapshot(*writer);
00763 
00764         // Flush the changes to the snapshot file before exiting.
00765         writer->flushToOS();
00766         _exit(0);
00767     } else { // parent
00768         assert(childPid == 0);
00769         childPid = pid;
00770         int status = 0;
00771         {
00772             // release the lock while blocking on the child to allow
00773             // parallelism
00774             Core::MutexUnlock<Core::Mutex> unlockGuard(lockGuard);
00775             pid = waitpid(pid, &status, 0);
00776         }
00777         childPid = 0;
00778         if (pid == -1)
00779             PANIC("Couldn't waitpid: %s", strerror(errno));
00780         if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
00781             NOTICE("Child completed writing state machine contents to "
00782                    "snapshot staging file");
00783             writer->seekToEnd();
00784             consensus->snapshotDone(lastIncludedIndex, std::move(writer));
00785         } else if (exiting &&
00786                    WIFSIGNALED(status) && WTERMSIG(status) == SIGTERM) {
00787             writer->discard();
00788             writer.reset();
00789             NOTICE("Child exited from SIGTERM since this process is "
00790                    "exiting");
00791         } else {
00792             writer->discard();
00793             writer.reset();
00794             ++numSnapshotsFailed;
00795             ERROR("Snapshot creation failed with status %d. This server will "
00796                   "try again, but something might be terribly wrong. "
00797                   "%lu of %lu snapshots have failed in total.",
00798                   status,
00799                   numSnapshotsFailed,
00800                   numSnapshotsAttempted);
00801         }
00802         snapshotCompleted.notify_all();
00803     }
00804 }
00805 
00806 void
00807 StateMachine::warnUnknownRequest(
00808         const google::protobuf::Message& request,
00809         const char* reason) const
00810 {
00811     ++numUnknownRequests;
00812     TimePoint now = Clock::now();
00813     if (lastUnknownRequestMessage + unknownRequestMessageBackoff < now) {
00814         lastUnknownRequestMessage = now;
00815         if (numUnknownRequestsSinceLastMessage > 0) {
00816             WARNING("This version of the state machine (%u) %s "
00817                     "(and %lu similar warnings "
00818                     "were suppressed since the last message): %s",
00819                     getVersion(~0UL),
00820                     reason,
00821                     numUnknownRequestsSinceLastMessage,
00822                     Core::ProtoBuf::dumpString(request).c_str());
00823         } else {
00824             WARNING("This version of the state machine (%u) %s: %s",
00825                     getVersion(~0UL),
00826                     reason,
00827                     Core::ProtoBuf::dumpString(request).c_str());
00828         }
00829         numUnknownRequestsSinceLastMessage = 0;
00830     } else {
00831         ++numUnknownRequestsSinceLastMessage;
00832     }
00833 }
00834 
00835 
00836 } // namespace LogCabin::Server
00837 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines