LogCabin
|
00001 /* Copyright (c) 2015 Diego Ongaro 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include <signal.h> 00017 00018 #include "Core/ProtoBuf.h" 00019 #include "Core/ThreadId.h" 00020 #include "Core/Time.h" 00021 #include "Event/Signal.h" 00022 #include "Server/Globals.h" 00023 #include "Server/RaftConsensus.h" 00024 #include "Server/StateMachine.h" 00025 #include "Server/ServerStats.h" 00026 00027 namespace LogCabin { 00028 namespace Server { 00029 00030 //// class ServerStats::Lock //// 00031 00032 ServerStats::Lock::Lock(ServerStats& wrapper) 00033 : wrapper(wrapper) 00034 , lockGuard(wrapper.mutex) 00035 { 00036 } 00037 00038 ServerStats::Lock::~Lock() 00039 { 00040 } 00041 00042 Protocol::ServerStats* 00043 ServerStats::Lock::operator->() 00044 { 00045 return &wrapper.stats; 00046 } 00047 00048 Protocol::ServerStats& 00049 ServerStats::Lock::operator*() 00050 { 00051 return wrapper.stats; 00052 } 00053 00054 //// class ServerStats::SignalHandler //// 00055 00056 ServerStats::SignalHandler::SignalHandler(ServerStats& serverStats) 00057 : Signal(SIGUSR1) 00058 , serverStats(serverStats) 00059 { 00060 } 00061 00062 void 00063 ServerStats::SignalHandler::handleSignalEvent() 00064 { 00065 NOTICE("Received %s: dumping ServerStats", strsignal(signalNumber)); 00066 std::lock_guard<Core::Mutex> lockGuard(serverStats.mutex); 00067 serverStats.isStatsDumpRequested = true; 00068 serverStats.statsDumpRequested.notify_all(); 00069 } 00070 00071 //// class ServerStats::Deferred //// 00072 00073 ServerStats::Deferred::Deferred(ServerStats& serverStats) 00074 : signalHandler(serverStats) 00075 , signalMonitor(serverStats.globals.eventLoop, signalHandler) 00076 , dumpInterval(std::chrono::milliseconds( 00077 serverStats.globals.config.read<uint64_t>( 00078 "statsDumpIntervalMilliseconds", 60000))) 00079 , statsDumper() 00080 { 00081 statsDumper = std::thread(&ServerStats::statsDumperMain, &serverStats); 00082 } 00083 00084 //// class ServerStats //// 00085 00086 ServerStats::ServerStats(Globals& globals) 00087 : globals(globals) 00088 , mutex() 00089 , statsDumpRequested() 00090 , exiting(false) 00091 , isStatsDumpRequested(false) 00092 , lastDumped(SteadyClock::now()) 00093 , stats() 00094 , deferred() 00095 { 00096 } 00097 00098 ServerStats::~ServerStats() 00099 { 00100 } 00101 00102 void 00103 ServerStats::enable() 00104 { 00105 Lock lock(*this); 00106 if (!deferred) { 00107 // Defer construction of timer so that TimerHandler constructor can 00108 // access globals.config. 00109 deferred.reset(new Deferred(*this)); 00110 } 00111 } 00112 00113 void 00114 ServerStats::exit() 00115 { 00116 { 00117 std::lock_guard<Core::Mutex> lockGuard(mutex); 00118 exiting = true; 00119 statsDumpRequested.notify_all(); 00120 } 00121 if (deferred && deferred->statsDumper.joinable()) 00122 deferred->statsDumper.join(); 00123 deferred.reset(); 00124 } 00125 00126 void 00127 ServerStats::dumpToDebugLog() const 00128 { 00129 std::unique_lock<Core::Mutex> lockGuard(mutex); 00130 dumpToDebugLog(lockGuard); 00131 } 00132 00133 Protocol::ServerStats 00134 ServerStats::getCurrent() const 00135 { 00136 std::unique_lock<Core::Mutex> lockGuard(mutex); 00137 return getCurrent(lockGuard); 00138 } 00139 00140 ////////// ServerStats private ////////// 00141 00142 void 00143 ServerStats::dumpToDebugLog(std::unique_lock<Core::Mutex>& lockGuard) const 00144 { 00145 isStatsDumpRequested = false; 00146 Protocol::ServerStats currentStats = getCurrent(lockGuard); 00147 NOTICE("ServerStats:\n%s", 00148 Core::ProtoBuf::dumpString(currentStats).c_str()); 00149 SteadyClock::time_point now = SteadyClock::now(); 00150 if (lastDumped < now) 00151 lastDumped = now; 00152 } 00153 00154 Protocol::ServerStats 00155 ServerStats::getCurrent(std::unique_lock<Core::Mutex>& lockGuard) const 00156 { 00157 int64_t startTime = std::chrono::nanoseconds( 00158 Core::Time::SystemClock::now().time_since_epoch()).count(); 00159 Protocol::ServerStats copy = stats; 00160 copy.set_start_at(startTime); 00161 if (deferred.get() != NULL) { // enabled 00162 // release lock to avoid deadlock and for concurrency 00163 Core::MutexUnlock<Core::Mutex> unlockGuard(lockGuard); 00164 globals.raft->updateServerStats(copy); 00165 globals.stateMachine->updateServerStats(copy); 00166 } 00167 copy.set_end_at(std::chrono::nanoseconds( 00168 Core::Time::SystemClock::now().time_since_epoch()).count()); 00169 return copy; 00170 } 00171 00172 void 00173 ServerStats::statsDumperMain() 00174 { 00175 typedef SteadyClock::time_point TimePoint; 00176 00177 Core::ThreadId::setName("StatsDumper"); 00178 std::unique_lock<Core::Mutex> lockGuard(mutex); 00179 while (!exiting) { 00180 00181 // Calculate time for next periodic dump. 00182 TimePoint nextDump = TimePoint::max(); 00183 if (deferred->dumpInterval > std::chrono::nanoseconds::zero()) { 00184 nextDump = lastDumped + deferred->dumpInterval; 00185 if (nextDump < lastDumped) // overflow 00186 nextDump = TimePoint::max(); 00187 } 00188 00189 // Dump out stats to debug log 00190 if (isStatsDumpRequested || SteadyClock::now() >= nextDump) { 00191 dumpToDebugLog(lockGuard); 00192 continue; 00193 } 00194 00195 // Wait until next time 00196 statsDumpRequested.wait_until(lockGuard, nextDump); 00197 } 00198 00199 NOTICE("Shutting down"); 00200 } 00201 00202 00203 } // namespace LogCabin::Server 00204 } // namespace LogCabin