LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <signal.h> 00018 00019 #include "Core/Debug.h" 00020 #include "Core/StringUtil.h" 00021 #include "Protocol/Common.h" 00022 #include "RPC/Server.h" 00023 #include "Server/ClientService.h" 00024 #include "Server/ControlService.h" 00025 #include "Server/Globals.h" 00026 #include "Server/RaftConsensus.h" 00027 #include "Server/RaftService.h" 00028 #include "Server/StateMachine.h" 00029 00030 namespace LogCabin { 00031 namespace Server { 00032 00033 ////////// Globals::SigIntHandler ////////// 00034 00035 Globals::ExitHandler::ExitHandler( 00036 Event::Loop& eventLoop, 00037 int signalNumber) 00038 : Signal(signalNumber) 00039 , eventLoop(eventLoop) 00040 { 00041 } 00042 00043 void 00044 Globals::ExitHandler::handleSignalEvent() 00045 { 00046 NOTICE("%s: shutting down", strsignal(signalNumber)); 00047 eventLoop.exit(); 00048 } 00049 00050 Globals::LogRotateHandler::LogRotateHandler( 00051 Event::Loop& eventLoop, 00052 int signalNumber) 00053 : Signal(signalNumber) 00054 , eventLoop(eventLoop) 00055 { 00056 } 00057 00058 void 00059 Globals::LogRotateHandler::handleSignalEvent() 00060 { 00061 NOTICE("%s: rotating logs", strsignal(signalNumber)); 00062 std::string error = Core::Debug::reopenLogFromFilename(); 00063 if (!error.empty()) { 00064 PANIC("Failed to rotate log file: %s", 00065 error.c_str()); 00066 } 00067 NOTICE("%s: done rotating logs", strsignal(signalNumber)); 00068 } 00069 00070 00071 ////////// Globals ////////// 00072 00073 Globals::Globals() 00074 : config() 00075 , serverStats(*this) 00076 , eventLoop() 00077 , sigIntBlocker(SIGINT) 00078 , sigTermBlocker(SIGTERM) 00079 , sigUsr1Blocker(SIGUSR1) 00080 , sigUsr2Blocker(SIGUSR2) 00081 , sigIntHandler(eventLoop, SIGINT) 00082 , sigIntMonitor(eventLoop, sigIntHandler) 00083 , sigTermHandler(eventLoop, SIGTERM) 00084 , sigTermMonitor(eventLoop, sigTermHandler) 00085 , sigUsr2Handler(eventLoop, SIGUSR2) 00086 , sigUsr2Monitor(eventLoop, sigUsr2Handler) 00087 , clusterUUID() 00088 , serverId(~0UL) 00089 , raft() 00090 , stateMachine() 00091 , controlService() 00092 , raftService() 00093 , clientService() 00094 , rpcServer() 00095 { 00096 } 00097 00098 Globals::~Globals() 00099 { 00100 serverStats.exit(); 00101 } 00102 00103 void 00104 Globals::init() 00105 { 00106 std::string uuid = config.read("clusterUUID", std::string("")); 00107 if (!uuid.empty()) 00108 clusterUUID.set(uuid); 00109 serverId = config.read<uint64_t>("serverId"); 00110 Core::Debug::processName = Core::StringUtil::format("%lu", serverId); 00111 { 00112 ServerStats::Lock serverStatsLock(serverStats); 00113 serverStatsLock->set_server_id(serverId); 00114 } 00115 if (!raft) { 00116 raft.reset(new RaftConsensus(*this)); 00117 raft->serverId = serverId; 00118 } 00119 00120 if (!controlService) { 00121 controlService.reset(new ControlService(*this)); 00122 } 00123 00124 if (!raftService) { 00125 raftService.reset(new RaftService(*this)); 00126 } 00127 00128 if (!clientService) { 00129 clientService.reset(new ClientService(*this)); 00130 } 00131 00132 if (!rpcServer) { 00133 rpcServer.reset(new RPC::Server(eventLoop, 00134 Protocol::Common::MAX_MESSAGE_LENGTH)); 00135 00136 uint32_t maxThreads = config.read<uint16_t>("maxThreads", 16); 00137 namespace ServiceId = Protocol::Common::ServiceId; 00138 rpcServer->registerService(ServiceId::CONTROL_SERVICE, 00139 controlService, 00140 maxThreads); 00141 rpcServer->registerService(ServiceId::RAFT_SERVICE, 00142 raftService, 00143 maxThreads); 00144 rpcServer->registerService(ServiceId::CLIENT_SERVICE, 00145 clientService, 00146 maxThreads); 00147 00148 std::string listenAddressesStr = 00149 config.read<std::string>("listenAddresses"); 00150 { 00151 ServerStats::Lock serverStatsLock(serverStats); 00152 serverStatsLock->set_server_id(serverId); 00153 serverStatsLock->set_addresses(listenAddressesStr); 00154 } 00155 std::vector<std::string> listenAddresses = 00156 Core::StringUtil::split(listenAddressesStr, ','); 00157 if (listenAddresses.empty()) { 00158 EXIT("No server addresses specified to listen on"); 00159 } 00160 for (auto it = listenAddresses.begin(); 00161 it != listenAddresses.end(); 00162 ++it) { 00163 RPC::Address address(*it, Protocol::Common::DEFAULT_PORT); 00164 address.refresh(RPC::Address::TimePoint::max()); 00165 std::string error = rpcServer->bind(address); 00166 if (!error.empty()) { 00167 EXIT("Could not listen on address %s: %s", 00168 address.toString().c_str(), 00169 error.c_str()); 00170 } 00171 NOTICE("Serving on %s", 00172 address.toString().c_str()); 00173 } 00174 raft->serverAddresses = listenAddressesStr; 00175 raft->init(); 00176 } 00177 00178 if (!stateMachine) { 00179 stateMachine.reset(new StateMachine(raft, config, *this)); 00180 } 00181 00182 serverStats.enable(); 00183 } 00184 00185 void 00186 Globals::leaveSignalsBlocked() 00187 { 00188 sigIntBlocker.leaveBlocked(); 00189 sigTermBlocker.leaveBlocked(); 00190 sigUsr1Blocker.leaveBlocked(); 00191 sigUsr2Blocker.leaveBlocked(); 00192 } 00193 00194 void 00195 Globals::run() 00196 { 00197 eventLoop.runForever(); 00198 } 00199 00200 void 00201 Globals::unblockAllSignals() 00202 { 00203 sigIntBlocker.unblock(); 00204 sigTermBlocker.unblock(); 00205 sigUsr1Blocker.unblock(); 00206 sigUsr2Blocker.unblock(); 00207 } 00208 00209 00210 } // namespace LogCabin::Server 00211 } // namespace LogCabin