LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <unistd.h> 00018 #include <sys/types.h> 00019 #include <sys/wait.h> 00020 00021 #include "Core/Debug.h" 00022 #include "Core/Mutex.h" 00023 #include "Core/ProtoBuf.h" 00024 #include "Core/Random.h" 00025 #include "Core/ThreadId.h" 00026 #include "Core/Util.h" 00027 #include "Server/Globals.h" 00028 #include "Server/RaftConsensus.h" 00029 #include "Server/StateMachine.h" 00030 #include "Storage/SnapshotFile.h" 00031 #include "Tree/ProtoBuf.h" 00032 00033 namespace LogCabin { 00034 namespace Server { 00035 00036 namespace PC = LogCabin::Protocol::Client; 00037 00038 00039 // for testing purposes 00040 bool stateMachineSuppressThreads = false; 00041 uint32_t stateMachineChildSleepMs = 0; 00042 00043 StateMachine::StateMachine(std::shared_ptr<RaftConsensus> consensus, 00044 Core::Config& config, 00045 Globals& globals) 00046 : consensus(consensus) 00047 , globals(globals) 00048 // This configuration option isn't advertised as part of the public API: 00049 // it's only useful for testing. 00050 , snapshotBlockPercentage( 00051 config.read<uint64_t>("snapshotBlockPercentage", 0)) 00052 , snapshotMinLogSize( 00053 config.read<uint64_t>("snapshotMinLogSize", 64UL * 1024 * 1024)) 00054 , snapshotRatio( 00055 config.read<uint64_t>("snapshotRatio", 4)) 00056 , snapshotWatchdogInterval(std::chrono::milliseconds( 00057 config.read<uint64_t>("snapshotWatchdogMilliseconds", 10000))) 00058 // TODO(ongaro): This should be configurable, but it must be the same for 00059 // every server, so it's dangerous to put it in the config file. Need to 00060 // use the Raft log to agree on this value. Also need to inform clients 00061 // of the value and its changes, so that they can send keep-alives at 00062 // appropriate intervals. For now, servers time out after about an hour, 00063 // and clients send keep-alives every minute. 00064 , sessionTimeoutNanos(1000UL * 1000 * 1000 * 60 * 60) 00065 , unknownRequestMessageBackoff(std::chrono::milliseconds( 00066 config.read<uint64_t>("stateMachineUnknownRequestMessage" 00067 "BackoffMilliseconds", 10000))) 00068 , mutex() 00069 , entriesApplied() 00070 , snapshotSuggested() 00071 , snapshotStarted() 00072 , snapshotCompleted() 00073 , exiting(false) 00074 , childPid(0) 00075 , lastApplied(0) 00076 , lastUnknownRequestMessage(TimePoint::min()) 00077 , numUnknownRequests(0) 00078 , numUnknownRequestsSinceLastMessage(0) 00079 , numSnapshotsAttempted(0) 00080 , numSnapshotsFailed(0) 00081 , numRedundantAdvanceVersionEntries(0) 00082 , numRejectedAdvanceVersionEntries(0) 00083 , numSuccessfulAdvanceVersionEntries(0) 00084 , numTotalAdvanceVersionEntries(0) 00085 , isSnapshotRequested(false) 00086 , maySnapshotAt(TimePoint::min()) 00087 , sessions() 00088 , tree() 00089 , versionHistory() 00090 , writer() 00091 , applyThread() 00092 , snapshotThread() 00093 , snapshotWatchdogThread() 00094 { 00095 versionHistory.insert({0, 1}); 00096 consensus->setSupportedStateMachineVersions(MIN_SUPPORTED_VERSION, 00097 MAX_SUPPORTED_VERSION); 00098 if (!stateMachineSuppressThreads) { 00099 applyThread = std::thread(&StateMachine::applyThreadMain, this); 00100 snapshotThread = std::thread(&StateMachine::snapshotThreadMain, this); 00101 snapshotWatchdogThread = std::thread( 00102 &StateMachine::snapshotWatchdogThreadMain, this); 00103 } 00104 } 00105 00106 StateMachine::~StateMachine() 00107 { 00108 NOTICE("Shutting down"); 00109 if (consensus) // sometimes missing for testing 00110 consensus->exit(); 00111 if (applyThread.joinable()) 00112 applyThread.join(); 00113 if (snapshotThread.joinable()) 00114 snapshotThread.join(); 00115 if (snapshotWatchdogThread.joinable()) 00116 snapshotWatchdogThread.join(); 00117 NOTICE("Joined with threads"); 00118 } 00119 00120 bool 00121 StateMachine::query(const Query::Request& request, 00122 Query::Response& response) const 00123 { 00124 std::lock_guard<Core::Mutex> lockGuard(mutex); 00125 if (request.has_tree()) { 00126 Tree::ProtoBuf::readOnlyTreeRPC(tree, 00127 request.tree(), 00128 *response.mutable_tree()); 00129 return true; 00130 } 00131 warnUnknownRequest(request, "does not understand the given request"); 00132 return false; 00133 } 00134 00135 void 00136 StateMachine::updateServerStats(Protocol::ServerStats& serverStats) const 00137 { 00138 std::lock_guard<Core::Mutex> lockGuard(mutex); 00139 Core::Time::SteadyTimeConverter time; 00140 serverStats.clear_state_machine(); 00141 Protocol::ServerStats::StateMachine& smStats = 00142 *serverStats.mutable_state_machine(); 00143 smStats.set_snapshotting(childPid != 0); 00144 smStats.set_last_applied(lastApplied); 00145 smStats.set_num_sessions(sessions.size()); 00146 smStats.set_num_unknown_requests(numUnknownRequests); 00147 smStats.set_num_snapshots_attempted(numSnapshotsAttempted); 00148 smStats.set_num_snapshots_failed(numSnapshotsFailed); 00149 smStats.set_num_redundant_advance_version_entries( 00150 numRedundantAdvanceVersionEntries); 00151 smStats.set_num_rejected_advance_version_entries( 00152 numRejectedAdvanceVersionEntries); 00153 smStats.set_num_successful_advance_version_entries( 00154 numSuccessfulAdvanceVersionEntries); 00155 smStats.set_num_total_advance_version_entries( 00156 numTotalAdvanceVersionEntries); 00157 smStats.set_min_supported_version(MIN_SUPPORTED_VERSION); 00158 smStats.set_max_supported_version(MAX_SUPPORTED_VERSION); 00159 smStats.set_running_version(getVersion(lastApplied)); 00160 smStats.set_may_snapshot_at(time.unixNanos(maySnapshotAt)); 00161 tree.updateServerStats(*smStats.mutable_tree()); 00162 } 00163 00164 void 00165 StateMachine::wait(uint64_t index) const 00166 { 00167 std::unique_lock<Core::Mutex> lockGuard(mutex); 00168 while (lastApplied < index) 00169 entriesApplied.wait(lockGuard); 00170 } 00171 00172 bool 00173 StateMachine::waitForResponse(uint64_t logIndex, 00174 const Command::Request& command, 00175 Command::Response& response) const 00176 { 00177 std::unique_lock<Core::Mutex> lockGuard(mutex); 00178 while (lastApplied < logIndex) 00179 entriesApplied.wait(lockGuard); 00180 00181 // Need to check whether we understood the request at the time it 00182 // was applied using getVersion(logIndex), then reply and return true/false 00183 // based on that. Existing commands have been around since version 1, so we 00184 // skip this check for now. 00185 uint16_t versionThen = getVersion(logIndex); 00186 00187 if (command.has_tree()) { 00188 const PC::ExactlyOnceRPCInfo& rpcInfo = command.tree().exactly_once(); 00189 auto sessionIt = sessions.find(rpcInfo.client_id()); 00190 if (sessionIt == sessions.end()) { 00191 WARNING("Client %lu session expired but client still active", 00192 rpcInfo.client_id()); 00193 response.mutable_tree()-> 00194 set_status(PC::Status::SESSION_EXPIRED); 00195 return true; 00196 } 00197 const Session& session = sessionIt->second; 00198 auto responseIt = session.responses.find(rpcInfo.rpc_number()); 00199 if (responseIt == session.responses.end()) { 00200 // The response for this RPC has already been removed: the client 00201 // is not waiting for it. This request is just a duplicate that is 00202 // safe to drop. 00203 WARNING("Client %lu asking for discarded response to RPC %lu", 00204 rpcInfo.client_id(), rpcInfo.rpc_number()); 00205 response.mutable_tree()-> 00206 set_status(PC::Status::SESSION_EXPIRED); 00207 return true; 00208 } 00209 response = responseIt->second; 00210 return true; 00211 } else if (command.has_open_session()) { 00212 response.mutable_open_session()-> 00213 set_client_id(logIndex); 00214 return true; 00215 } else if (versionThen >= 2 && command.has_close_session()) { 00216 response.mutable_close_session(); // no fields to set 00217 return true; 00218 } else if (command.has_advance_version()) { 00219 response.mutable_advance_version()-> 00220 set_running_version(versionThen); 00221 return true; 00222 } 00223 // don't warnUnknownRequest here, since we already did so in apply() 00224 return false; 00225 } 00226 00227 bool 00228 StateMachine::isTakingSnapshot() const 00229 { 00230 std::lock_guard<Core::Mutex> lockGuard(mutex); 00231 return childPid != 0; 00232 } 00233 00234 void 00235 StateMachine::startTakingSnapshot() 00236 { 00237 std::unique_lock<Core::Mutex> lockGuard(mutex); 00238 if (childPid == 0) { 00239 NOTICE("Administrator requested snapshot"); 00240 isSnapshotRequested = true; 00241 snapshotSuggested.notify_all(); 00242 // This waits on numSnapshotsAttempted to change, since waiting on 00243 // childPid != 0 would risk missing an entire snapshot that started and 00244 // completed before this thread was scheduled. 00245 uint64_t nextSnapshot = numSnapshotsAttempted + 1; 00246 while (!exiting && numSnapshotsAttempted < nextSnapshot) { 00247 snapshotStarted.wait(lockGuard); 00248 } 00249 } 00250 } 00251 00252 void 00253 StateMachine::stopTakingSnapshot() 00254 { 00255 std::unique_lock<Core::Mutex> lockGuard(mutex); 00256 int pid = childPid; 00257 if (pid != 0) { 00258 NOTICE("Administrator aborted snapshot"); 00259 killSnapshotProcess(Core::HoldingMutex(lockGuard), SIGTERM); 00260 while (!exiting && pid == childPid) { 00261 snapshotCompleted.wait(lockGuard); 00262 } 00263 } 00264 } 00265 00266 std::chrono::nanoseconds 00267 StateMachine::getInhibit() const 00268 { 00269 std::lock_guard<Core::Mutex> lockGuard(mutex); 00270 TimePoint now = Clock::now(); 00271 if (maySnapshotAt <= now) { 00272 return std::chrono::nanoseconds::zero(); 00273 } else { 00274 return maySnapshotAt - now; 00275 } 00276 } 00277 00278 void 00279 StateMachine::setInhibit(std::chrono::nanoseconds duration) 00280 { 00281 std::lock_guard<Core::Mutex> lockGuard(mutex); 00282 if (duration <= std::chrono::nanoseconds::zero()) { 00283 maySnapshotAt = TimePoint::min(); 00284 NOTICE("Administrator permitted snapshotting"); 00285 } else { 00286 TimePoint now = Clock::now(); 00287 maySnapshotAt = now + duration; 00288 if (maySnapshotAt < now) { // overflow 00289 maySnapshotAt = TimePoint::max(); 00290 } 00291 NOTICE("Administrator inhibited snapshotting for the next %s", 00292 Core::StringUtil::toString(maySnapshotAt - now).c_str()); 00293 } 00294 snapshotSuggested.notify_all(); 00295 } 00296 00297 00298 ////////// StateMachine private methods ////////// 00299 00300 void 00301 StateMachine::apply(const RaftConsensus::Entry& entry) 00302 { 00303 Command::Request command; 00304 if (!Core::ProtoBuf::parse(entry.command, command)) { 00305 PANIC("Failed to parse protobuf for entry %lu", 00306 entry.index); 00307 } 00308 uint16_t runningVersion = getVersion(entry.index - 1); 00309 if (command.has_tree()) { 00310 PC::ExactlyOnceRPCInfo rpcInfo = command.tree().exactly_once(); 00311 auto it = sessions.find(rpcInfo.client_id()); 00312 if (it == sessions.end()) { 00313 // session does not exist 00314 } else { 00315 // session exists 00316 Session& session = it->second; 00317 expireResponses(session, rpcInfo.first_outstanding_rpc()); 00318 if (rpcInfo.rpc_number() < session.firstOutstandingRPC) { 00319 // response already discarded, do not re-apply 00320 } else { 00321 auto inserted = session.responses.insert( 00322 {rpcInfo.rpc_number(), {}}); 00323 if (inserted.second) { 00324 // response not found, apply and save it 00325 Tree::ProtoBuf::readWriteTreeRPC( 00326 tree, 00327 command.tree(), 00328 *inserted.first->second.mutable_tree()); 00329 session.lastModified = entry.clusterTime; 00330 } else { 00331 // response exists, do not re-apply 00332 } 00333 } 00334 } 00335 } else if (command.has_open_session()) { 00336 uint64_t clientId = entry.index; 00337 Session& session = sessions.insert({clientId, {}}).first->second; 00338 session.lastModified = entry.clusterTime; 00339 } else if (command.has_close_session()) { 00340 if (runningVersion >= 2) { 00341 sessions.erase(command.close_session().client_id()); 00342 } else { 00343 // Command is ignored in version < 2. 00344 warnUnknownRequest(command, "may not process the given request, " 00345 "which was introduced in version 2"); 00346 } 00347 } else if (command.has_advance_version()) { 00348 uint16_t requested = Core::Util::downCast<uint16_t>( 00349 command.advance_version(). requested_version()); 00350 if (requested < runningVersion) { 00351 WARNING("Rejecting downgrade of state machine version " 00352 "(running version %u but command at log index %lu wants " 00353 "to switch to version %u)", 00354 runningVersion, 00355 entry.index, 00356 requested); 00357 ++numRejectedAdvanceVersionEntries; 00358 } else if (requested > runningVersion) { 00359 if (requested > MAX_SUPPORTED_VERSION) { 00360 PANIC("Cannot upgrade state machine to version %u (from %u) " 00361 "because this code only supports up to version %u", 00362 requested, 00363 runningVersion, 00364 MAX_SUPPORTED_VERSION); 00365 } else { 00366 NOTICE("Upgrading state machine to version %u (from %u)", 00367 requested, 00368 runningVersion); 00369 versionHistory.insert({entry.index, requested}); 00370 } 00371 ++numSuccessfulAdvanceVersionEntries; 00372 } else { // requested == runningVersion 00373 // nothing to do 00374 // If this stat is high, see note in RaftConsensus.cc. 00375 ++numRedundantAdvanceVersionEntries; 00376 } 00377 ++numTotalAdvanceVersionEntries; 00378 } else { // unknown command 00379 // This is (deterministically) ignored by all state machines running 00380 // the current version. 00381 warnUnknownRequest(command, "does not understand the given request"); 00382 } 00383 } 00384 00385 void 00386 StateMachine::applyThreadMain() 00387 { 00388 Core::ThreadId::setName("StateMachine"); 00389 try { 00390 while (true) { 00391 RaftConsensus::Entry entry = consensus->getNextEntry(lastApplied); 00392 std::lock_guard<Core::Mutex> lockGuard(mutex); 00393 switch (entry.type) { 00394 case RaftConsensus::Entry::SKIP: 00395 break; 00396 case RaftConsensus::Entry::DATA: 00397 apply(entry); 00398 break; 00399 case RaftConsensus::Entry::SNAPSHOT: 00400 NOTICE("Loading snapshot through entry %lu into state " 00401 "machine", entry.index); 00402 loadSnapshot(*entry.snapshotReader); 00403 NOTICE("Done loading snapshot"); 00404 break; 00405 } 00406 expireSessions(entry.clusterTime); 00407 lastApplied = entry.index; 00408 entriesApplied.notify_all(); 00409 if (shouldTakeSnapshot(lastApplied) && 00410 maySnapshotAt <= Clock::now()) { 00411 snapshotSuggested.notify_all(); 00412 } 00413 } 00414 } catch (const Core::Util::ThreadInterruptedException&) { 00415 NOTICE("exiting"); 00416 std::lock_guard<Core::Mutex> lockGuard(mutex); 00417 exiting = true; 00418 entriesApplied.notify_all(); 00419 snapshotSuggested.notify_all(); 00420 snapshotStarted.notify_all(); 00421 snapshotCompleted.notify_all(); 00422 killSnapshotProcess(Core::HoldingMutex(lockGuard), SIGTERM); 00423 } 00424 } 00425 00426 void 00427 StateMachine::serializeSessions(SnapshotStateMachine::Header& header) const 00428 { 00429 for (auto it = sessions.begin(); it != sessions.end(); ++it) { 00430 SnapshotStateMachine::Session& session = *header.add_session(); 00431 session.set_client_id(it->first); 00432 session.set_last_modified(it->second.lastModified); 00433 session.set_first_outstanding_rpc(it->second.firstOutstandingRPC); 00434 for (auto it2 = it->second.responses.begin(); 00435 it2 != it->second.responses.end(); 00436 ++it2) { 00437 SnapshotStateMachine::Response& response = 00438 *session.add_rpc_response(); 00439 response.set_rpc_number(it2->first); 00440 *response.mutable_response() = it2->second; 00441 } 00442 } 00443 } 00444 00445 void 00446 StateMachine::expireResponses(Session& session, uint64_t firstOutstandingRPC) 00447 { 00448 if (session.firstOutstandingRPC >= firstOutstandingRPC) 00449 return; 00450 session.firstOutstandingRPC = firstOutstandingRPC; 00451 auto it = session.responses.begin(); 00452 while (it != session.responses.end()) { 00453 if (it->first < session.firstOutstandingRPC) 00454 it = session.responses.erase(it); 00455 else 00456 ++it; 00457 } 00458 } 00459 00460 void 00461 StateMachine::expireSessions(uint64_t clusterTime) 00462 { 00463 auto it = sessions.begin(); 00464 while (it != sessions.end()) { 00465 Session& session = it->second; 00466 uint64_t expireTime = session.lastModified + sessionTimeoutNanos; 00467 if (expireTime < clusterTime) { 00468 uint64_t diffNanos = clusterTime - session.lastModified; 00469 NOTICE("Expiring client %lu's session after %lu.%09lu seconds " 00470 "of cluster time due to inactivity", 00471 it->first, 00472 diffNanos / (1000 * 1000 * 1000UL), 00473 diffNanos % (1000 * 1000 * 1000UL)); 00474 it = sessions.erase(it); 00475 } else { 00476 ++it; 00477 } 00478 } 00479 } 00480 00481 uint16_t 00482 StateMachine::getVersion(uint64_t logIndex) const 00483 { 00484 auto it = versionHistory.upper_bound(logIndex); 00485 --it; 00486 return it->second; 00487 } 00488 00489 void 00490 StateMachine::killSnapshotProcess(Core::HoldingMutex holdingMutex, 00491 int signum) 00492 { 00493 if (childPid != 0) { 00494 int r = kill(childPid, signum); 00495 if (r != 0) { 00496 WARNING("Could not send %s to child process (%d): %s", 00497 strsignal(signum), 00498 childPid, 00499 strerror(errno)); 00500 } 00501 } 00502 } 00503 00504 void 00505 StateMachine::loadSessions(const SnapshotStateMachine::Header& header) 00506 { 00507 sessions.clear(); 00508 for (auto it = header.session().begin(); 00509 it != header.session().end(); 00510 ++it) { 00511 Session& session = sessions.insert({it->client_id(), {}}) 00512 .first->second; 00513 session.lastModified = it->last_modified(); 00514 session.firstOutstandingRPC = it->first_outstanding_rpc(); 00515 for (auto it2 = it->rpc_response().begin(); 00516 it2 != it->rpc_response().end(); 00517 ++it2) { 00518 session.responses.insert({it2->rpc_number(), it2->response()}); 00519 } 00520 } 00521 } 00522 00523 void 00524 StateMachine::loadSnapshot(Core::ProtoBuf::InputStream& stream) 00525 { 00526 // Check that this snapshot uses format version 1 00527 uint8_t formatVersion = 0; 00528 uint64_t bytesRead = stream.readRaw(&formatVersion, sizeof(formatVersion)); 00529 if (bytesRead < sizeof(formatVersion)) { 00530 PANIC("Snapshot contents are empty (no format version field)"); 00531 } 00532 if (formatVersion != 1) { 00533 PANIC("Snapshot contents format version read was %u, but this " 00534 "code can only read version 1", 00535 formatVersion); 00536 } 00537 00538 // Load snapshot header 00539 { 00540 SnapshotStateMachine::Header header; 00541 std::string error = stream.readMessage(header); 00542 if (!error.empty()) { 00543 PANIC("Couldn't read state machine header from snapshot: %s", 00544 error.c_str()); 00545 } 00546 loadVersionHistory(header); 00547 loadSessions(header); 00548 } 00549 00550 // Load the tree's state 00551 tree.loadSnapshot(stream); 00552 } 00553 00554 void 00555 StateMachine::loadVersionHistory(const SnapshotStateMachine::Header& header) 00556 { 00557 versionHistory.clear(); 00558 versionHistory.insert({0, 1}); 00559 for (auto it = header.version_update().begin(); 00560 it != header.version_update().end(); 00561 ++it) { 00562 versionHistory.insert({it->log_index(), 00563 Core::Util::downCast<uint16_t>(it->version())}); 00564 } 00565 00566 // The version of the current state machine behavior. 00567 uint16_t running = versionHistory.rbegin()->second; 00568 if (running < MIN_SUPPORTED_VERSION || 00569 running > MAX_SUPPORTED_VERSION) { 00570 PANIC("State machine version read from snapshot was %u, but this " 00571 "code only supports %u through %u (inclusive)", 00572 running, 00573 MIN_SUPPORTED_VERSION, 00574 MAX_SUPPORTED_VERSION); 00575 } 00576 } 00577 00578 void 00579 StateMachine::serializeVersionHistory( 00580 SnapshotStateMachine::Header& header) const 00581 { 00582 for (auto it = versionHistory.begin(); 00583 it != versionHistory.end(); 00584 ++it) { 00585 SnapshotStateMachine::VersionUpdate& update = 00586 *header.add_version_update(); 00587 update.set_log_index(it->first); 00588 update.set_version(it->second); 00589 } 00590 } 00591 00592 bool 00593 StateMachine::shouldTakeSnapshot(uint64_t lastIncludedIndex) const 00594 { 00595 SnapshotStats::SnapshotStats stats = consensus->getSnapshotStats(); 00596 00597 // print every 10% but not at 100% because then we'd be printing all the 00598 // time 00599 uint64_t curr = 0; 00600 if (lastIncludedIndex > stats.last_snapshot_index()) 00601 curr = lastIncludedIndex - stats.last_snapshot_index(); 00602 uint64_t prev = curr - 1; 00603 uint64_t logEntries = stats.last_log_index() - stats.last_snapshot_index(); 00604 if (curr != logEntries && 00605 10 * prev / logEntries != 10 * curr / logEntries) { 00606 NOTICE("Have applied %lu%% of the %lu total log entries", 00607 100 * curr / logEntries, 00608 logEntries); 00609 } 00610 00611 if (stats.log_bytes() < snapshotMinLogSize) 00612 return false; 00613 if (stats.log_bytes() < stats.last_snapshot_bytes() * snapshotRatio) 00614 return false; 00615 if (lastIncludedIndex < stats.last_snapshot_index()) 00616 return false; 00617 if (lastIncludedIndex < stats.last_log_index() * 3 / 4) 00618 return false; 00619 return true; 00620 } 00621 00622 void 00623 StateMachine::snapshotThreadMain() 00624 { 00625 Core::ThreadId::setName("SnapshotStateMachine"); 00626 std::unique_lock<Core::Mutex> lockGuard(mutex); 00627 bool wasInhibited = false; 00628 while (!exiting) { 00629 bool inhibited = (maySnapshotAt > Clock::now()); 00630 00631 TimePoint waitUntil = TimePoint::max(); 00632 if (inhibited) 00633 waitUntil = maySnapshotAt; 00634 00635 if (wasInhibited && !inhibited) 00636 NOTICE("Now permitted to take snapshots"); 00637 wasInhibited = inhibited; 00638 00639 if (isSnapshotRequested || 00640 (!inhibited && shouldTakeSnapshot(lastApplied))) { 00641 00642 isSnapshotRequested = false; 00643 takeSnapshot(lastApplied, lockGuard); 00644 continue; 00645 } 00646 00647 snapshotSuggested.wait_until(lockGuard, waitUntil); 00648 } 00649 } 00650 00651 void 00652 StateMachine::snapshotWatchdogThreadMain() 00653 { 00654 using Core::StringUtil::toString; 00655 Core::ThreadId::setName("SnapshotStateMachineWatchdog"); 00656 std::unique_lock<Core::Mutex> lockGuard(mutex); 00657 00658 // The snapshot process that this thread is currently tracking, based on 00659 // numSnapshotsAttempted. If set to ~0UL, this thread is not currently 00660 // tracking a snapshot process. 00661 uint64_t tracking = ~0UL; 00662 // The value of writer->sharedBytesWritten at the "start" time. 00663 uint64_t startProgress = 0; 00664 // The time at the "start" time. 00665 TimePoint startTime = TimePoint::min(); 00666 // Special value for infinite interval. 00667 const std::chrono::nanoseconds zero = std::chrono::nanoseconds::zero(); 00668 00669 while (!exiting) { 00670 TimePoint waitUntil = TimePoint::max(); 00671 TimePoint now = Clock::now(); 00672 00673 if (childPid > 0) { // there is some child process 00674 uint64_t currentProgress = *writer->sharedBytesWritten.value; 00675 if (tracking == numSnapshotsAttempted) { // tracking current child 00676 if (snapshotWatchdogInterval != zero && 00677 now >= startTime + snapshotWatchdogInterval) { // check 00678 if (currentProgress == startProgress) { 00679 ERROR("Snapshot process (counter %lu, pid %u) made no " 00680 "progress for %s. Killing it. If this happens " 00681 "at all often, you should file a bug to " 00682 "understand the root cause.", 00683 numSnapshotsAttempted, 00684 childPid, 00685 toString(snapshotWatchdogInterval).c_str()); 00686 killSnapshotProcess(Core::HoldingMutex(lockGuard), 00687 SIGKILL); 00688 // Don't kill for another interval, 00689 // hopefully child will be reaped by then. 00690 } 00691 startProgress = currentProgress; 00692 startTime = now; 00693 } else { 00694 // woke up too early, nothing to do 00695 } 00696 } else { // not yet tracking this child 00697 VERBOSE("Beginning to track snapshot process " 00698 "(counter %lu, pid %u)", 00699 numSnapshotsAttempted, 00700 childPid); 00701 tracking = numSnapshotsAttempted; 00702 startProgress = currentProgress; 00703 startTime = now; 00704 } 00705 if (snapshotWatchdogInterval != zero) 00706 waitUntil = startTime + snapshotWatchdogInterval; 00707 } else { // no child process 00708 if (tracking != ~0UL) { 00709 VERBOSE("Snapshot ended: no longer tracking (counter %lu)", 00710 tracking); 00711 tracking = ~0UL; 00712 } 00713 } 00714 snapshotStarted.wait_until(lockGuard, waitUntil); 00715 } 00716 } 00717 00718 00719 void 00720 StateMachine::takeSnapshot(uint64_t lastIncludedIndex, 00721 std::unique_lock<Core::Mutex>& lockGuard) 00722 { 00723 // Open a snapshot file, then fork a child to write a consistent view of 00724 // the state machine to the snapshot file while this process continues 00725 // accepting requests. 00726 writer = consensus->beginSnapshot(lastIncludedIndex); 00727 // Flush the outstanding changes to the snapshot now so that they 00728 // aren't somehow double-flushed later. 00729 writer->flushToOS(); 00730 00731 ++numSnapshotsAttempted; 00732 snapshotStarted.notify_all(); 00733 00734 pid_t pid = fork(); 00735 if (pid == -1) { // error 00736 PANIC("Couldn't fork: %s", strerror(errno)); 00737 } else if (pid == 0) { // child 00738 Core::Debug::processName += "-child"; 00739 globals.unblockAllSignals(); 00740 usleep(stateMachineChildSleepMs * 1000); // for testing purposes 00741 if (snapshotBlockPercentage > 0) { // for testing purposes 00742 if (Core::Random::randomRange(0, 100) < snapshotBlockPercentage) { 00743 WARNING("Purposely deadlocking child (probability is %lu%%)", 00744 snapshotBlockPercentage); 00745 std::mutex mutex; 00746 mutex.lock(); 00747 mutex.lock(); // intentional deadlock 00748 } 00749 } 00750 00751 // Format version of snapshot contents is 1. 00752 uint8_t formatVersion = 1; 00753 writer->writeRaw(&formatVersion, sizeof(formatVersion)); 00754 // StateMachine state comes next 00755 { 00756 SnapshotStateMachine::Header header; 00757 serializeVersionHistory(header); 00758 serializeSessions(header); 00759 writer->writeMessage(header); 00760 } 00761 // Then the Tree itself (this one is potentially large) 00762 tree.dumpSnapshot(*writer); 00763 00764 // Flush the changes to the snapshot file before exiting. 00765 writer->flushToOS(); 00766 _exit(0); 00767 } else { // parent 00768 assert(childPid == 0); 00769 childPid = pid; 00770 int status = 0; 00771 { 00772 // release the lock while blocking on the child to allow 00773 // parallelism 00774 Core::MutexUnlock<Core::Mutex> unlockGuard(lockGuard); 00775 pid = waitpid(pid, &status, 0); 00776 } 00777 childPid = 0; 00778 if (pid == -1) 00779 PANIC("Couldn't waitpid: %s", strerror(errno)); 00780 if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { 00781 NOTICE("Child completed writing state machine contents to " 00782 "snapshot staging file"); 00783 writer->seekToEnd(); 00784 consensus->snapshotDone(lastIncludedIndex, std::move(writer)); 00785 } else if (exiting && 00786 WIFSIGNALED(status) && WTERMSIG(status) == SIGTERM) { 00787 writer->discard(); 00788 writer.reset(); 00789 NOTICE("Child exited from SIGTERM since this process is " 00790 "exiting"); 00791 } else { 00792 writer->discard(); 00793 writer.reset(); 00794 ++numSnapshotsFailed; 00795 ERROR("Snapshot creation failed with status %d. This server will " 00796 "try again, but something might be terribly wrong. " 00797 "%lu of %lu snapshots have failed in total.", 00798 status, 00799 numSnapshotsFailed, 00800 numSnapshotsAttempted); 00801 } 00802 snapshotCompleted.notify_all(); 00803 } 00804 } 00805 00806 void 00807 StateMachine::warnUnknownRequest( 00808 const google::protobuf::Message& request, 00809 const char* reason) const 00810 { 00811 ++numUnknownRequests; 00812 TimePoint now = Clock::now(); 00813 if (lastUnknownRequestMessage + unknownRequestMessageBackoff < now) { 00814 lastUnknownRequestMessage = now; 00815 if (numUnknownRequestsSinceLastMessage > 0) { 00816 WARNING("This version of the state machine (%u) %s " 00817 "(and %lu similar warnings " 00818 "were suppressed since the last message): %s", 00819 getVersion(~0UL), 00820 reason, 00821 numUnknownRequestsSinceLastMessage, 00822 Core::ProtoBuf::dumpString(request).c_str()); 00823 } else { 00824 WARNING("This version of the state machine (%u) %s: %s", 00825 getVersion(~0UL), 00826 reason, 00827 Core::ProtoBuf::dumpString(request).c_str()); 00828 } 00829 numUnknownRequestsSinceLastMessage = 0; 00830 } else { 00831 ++numUnknownRequestsSinceLastMessage; 00832 } 00833 } 00834 00835 00836 } // namespace LogCabin::Server 00837 } // namespace LogCabin