LogCabin
Server/Globals.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <signal.h>
00018 
00019 #include "Core/Debug.h"
00020 #include "Core/StringUtil.h"
00021 #include "Protocol/Common.h"
00022 #include "RPC/Server.h"
00023 #include "Server/ClientService.h"
00024 #include "Server/ControlService.h"
00025 #include "Server/Globals.h"
00026 #include "Server/RaftConsensus.h"
00027 #include "Server/RaftService.h"
00028 #include "Server/StateMachine.h"
00029 
00030 namespace LogCabin {
00031 namespace Server {
00032 
00033 ////////// Globals::SigIntHandler //////////
00034 
00035 Globals::ExitHandler::ExitHandler(
00036         Event::Loop& eventLoop,
00037         int signalNumber)
00038     : Signal(signalNumber)
00039     , eventLoop(eventLoop)
00040 {
00041 }
00042 
00043 void
00044 Globals::ExitHandler::handleSignalEvent()
00045 {
00046     NOTICE("%s: shutting down", strsignal(signalNumber));
00047     eventLoop.exit();
00048 }
00049 
00050 Globals::LogRotateHandler::LogRotateHandler(
00051         Event::Loop& eventLoop,
00052         int signalNumber)
00053     : Signal(signalNumber)
00054     , eventLoop(eventLoop)
00055 {
00056 }
00057 
00058 void
00059 Globals::LogRotateHandler::handleSignalEvent()
00060 {
00061     NOTICE("%s: rotating logs", strsignal(signalNumber));
00062     std::string error = Core::Debug::reopenLogFromFilename();
00063     if (!error.empty()) {
00064         PANIC("Failed to rotate log file: %s",
00065               error.c_str());
00066     }
00067     NOTICE("%s: done rotating logs", strsignal(signalNumber));
00068 }
00069 
00070 
00071 ////////// Globals //////////
00072 
00073 Globals::Globals()
00074     : config()
00075     , serverStats(*this)
00076     , eventLoop()
00077     , sigIntBlocker(SIGINT)
00078     , sigTermBlocker(SIGTERM)
00079     , sigUsr1Blocker(SIGUSR1)
00080     , sigUsr2Blocker(SIGUSR2)
00081     , sigIntHandler(eventLoop, SIGINT)
00082     , sigIntMonitor(eventLoop, sigIntHandler)
00083     , sigTermHandler(eventLoop, SIGTERM)
00084     , sigTermMonitor(eventLoop, sigTermHandler)
00085     , sigUsr2Handler(eventLoop, SIGUSR2)
00086     , sigUsr2Monitor(eventLoop, sigUsr2Handler)
00087     , clusterUUID()
00088     , serverId(~0UL)
00089     , raft()
00090     , stateMachine()
00091     , controlService()
00092     , raftService()
00093     , clientService()
00094     , rpcServer()
00095 {
00096 }
00097 
00098 Globals::~Globals()
00099 {
00100     serverStats.exit();
00101 }
00102 
00103 void
00104 Globals::init()
00105 {
00106     std::string uuid = config.read("clusterUUID", std::string(""));
00107     if (!uuid.empty())
00108         clusterUUID.set(uuid);
00109     serverId = config.read<uint64_t>("serverId");
00110     Core::Debug::processName = Core::StringUtil::format("%lu", serverId);
00111     {
00112         ServerStats::Lock serverStatsLock(serverStats);
00113         serverStatsLock->set_server_id(serverId);
00114     }
00115     if (!raft) {
00116         raft.reset(new RaftConsensus(*this));
00117         raft->serverId = serverId;
00118     }
00119 
00120     if (!controlService) {
00121         controlService.reset(new ControlService(*this));
00122     }
00123 
00124     if (!raftService) {
00125         raftService.reset(new RaftService(*this));
00126     }
00127 
00128     if (!clientService) {
00129         clientService.reset(new ClientService(*this));
00130     }
00131 
00132     if (!rpcServer) {
00133         rpcServer.reset(new RPC::Server(eventLoop,
00134                                         Protocol::Common::MAX_MESSAGE_LENGTH));
00135 
00136         uint32_t maxThreads = config.read<uint16_t>("maxThreads", 16);
00137         namespace ServiceId = Protocol::Common::ServiceId;
00138         rpcServer->registerService(ServiceId::CONTROL_SERVICE,
00139                                    controlService,
00140                                    maxThreads);
00141         rpcServer->registerService(ServiceId::RAFT_SERVICE,
00142                                    raftService,
00143                                    maxThreads);
00144         rpcServer->registerService(ServiceId::CLIENT_SERVICE,
00145                                    clientService,
00146                                    maxThreads);
00147 
00148         std::string listenAddressesStr =
00149             config.read<std::string>("listenAddresses");
00150         {
00151             ServerStats::Lock serverStatsLock(serverStats);
00152             serverStatsLock->set_server_id(serverId);
00153             serverStatsLock->set_addresses(listenAddressesStr);
00154         }
00155         std::vector<std::string> listenAddresses =
00156             Core::StringUtil::split(listenAddressesStr, ',');
00157         if (listenAddresses.empty()) {
00158             EXIT("No server addresses specified to listen on");
00159         }
00160         for (auto it = listenAddresses.begin();
00161              it != listenAddresses.end();
00162              ++it) {
00163             RPC::Address address(*it, Protocol::Common::DEFAULT_PORT);
00164             address.refresh(RPC::Address::TimePoint::max());
00165             std::string error = rpcServer->bind(address);
00166             if (!error.empty()) {
00167                 EXIT("Could not listen on address %s: %s",
00168                      address.toString().c_str(),
00169                      error.c_str());
00170             }
00171             NOTICE("Serving on %s",
00172                    address.toString().c_str());
00173         }
00174         raft->serverAddresses = listenAddressesStr;
00175         raft->init();
00176     }
00177 
00178     if (!stateMachine) {
00179         stateMachine.reset(new StateMachine(raft, config, *this));
00180     }
00181 
00182     serverStats.enable();
00183 }
00184 
00185 void
00186 Globals::leaveSignalsBlocked()
00187 {
00188     sigIntBlocker.leaveBlocked();
00189     sigTermBlocker.leaveBlocked();
00190     sigUsr1Blocker.leaveBlocked();
00191     sigUsr2Blocker.leaveBlocked();
00192 }
00193 
00194 void
00195 Globals::run()
00196 {
00197     eventLoop.runForever();
00198 }
00199 
00200 void
00201 Globals::unblockAllSignals()
00202 {
00203     sigIntBlocker.unblock();
00204     sigTermBlocker.unblock();
00205     sigUsr1Blocker.unblock();
00206     sigUsr2Blocker.unblock();
00207 }
00208 
00209 
00210 } // namespace LogCabin::Server
00211 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines