LogCabin
|
00001 /* Copyright (c) 2015 Diego Ongaro 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include <unistd.h> 00017 00018 #include "build/Protocol/ServerControl.pb.h" 00019 #include "Core/Debug.h" 00020 #include "RPC/ServerRPC.h" 00021 #include "Server/ControlService.h" 00022 #include "Server/Globals.h" 00023 #include "Server/RaftConsensus.h" 00024 #include "Server/StateMachine.h" 00025 00026 namespace LogCabin { 00027 namespace Server { 00028 00029 ControlService::ControlService(Globals& globals) 00030 : globals(globals) 00031 { 00032 } 00033 00034 ControlService::~ControlService() 00035 { 00036 } 00037 00038 void 00039 ControlService::handleRPC(RPC::ServerRPC rpc) 00040 { 00041 using Protocol::ServerControl::OpCode; 00042 00043 // Call the appropriate RPC handler based on the request's opCode. 00044 switch (rpc.getOpCode()) { 00045 case OpCode::DEBUG_FILENAME_GET: 00046 debugFilenameGet(std::move(rpc)); 00047 break; 00048 case OpCode::DEBUG_FILENAME_SET: 00049 debugFilenameSet(std::move(rpc)); 00050 break; 00051 case OpCode::DEBUG_POLICY_GET: 00052 debugPolicyGet(std::move(rpc)); 00053 break; 00054 case OpCode::DEBUG_POLICY_SET: 00055 debugPolicySet(std::move(rpc)); 00056 break; 00057 case OpCode::DEBUG_ROTATE: 00058 debugRotate(std::move(rpc)); 00059 break; 00060 case OpCode::SERVER_INFO_GET: 00061 serverInfoGet(std::move(rpc)); 00062 break; 00063 case OpCode::SERVER_STATS_DUMP: 00064 serverStatsDump(std::move(rpc)); 00065 break; 00066 case OpCode::SERVER_STATS_GET: 00067 serverStatsGet(std::move(rpc)); 00068 break; 00069 case OpCode::SNAPSHOT_CONTROL: 00070 snapshotControl(std::move(rpc)); 00071 break; 00072 case OpCode::SNAPSHOT_INHIBIT_GET: 00073 snapshotInhibitGet(std::move(rpc)); 00074 break; 00075 case OpCode::SNAPSHOT_INHIBIT_SET: 00076 snapshotInhibitSet(std::move(rpc)); 00077 break; 00078 default: 00079 WARNING("Client sent request with bad op code (%u) to " 00080 "ControlService", rpc.getOpCode()); 00081 rpc.rejectInvalidRequest(); 00082 } 00083 } 00084 00085 std::string 00086 ControlService::getName() const 00087 { 00088 return "ControlService"; 00089 } 00090 00091 /** 00092 * Place this at the top of each RPC handler. Afterwards, 'request' will refer 00093 * to the protocol buffer for the request with all required fields set. 00094 * 'response' will be an empty protocol buffer for you to fill in the response. 00095 */ 00096 #define PRELUDE(rpcClass) \ 00097 Protocol::ServerControl::rpcClass::Request request; \ 00098 Protocol::ServerControl::rpcClass::Response response; \ 00099 if (!rpc.getRequest(request)) \ 00100 return; 00101 00102 ////////// RPC handlers ////////// 00103 00104 void 00105 ControlService::debugFilenameGet(RPC::ServerRPC rpc) 00106 { 00107 PRELUDE(DebugFilenameGet); 00108 response.set_filename(Core::Debug::getLogFilename()); 00109 rpc.reply(response); 00110 } 00111 00112 void 00113 ControlService::debugFilenameSet(RPC::ServerRPC rpc) 00114 { 00115 PRELUDE(DebugFilenameSet); 00116 std::string prev = Core::Debug::getLogFilename(); 00117 NOTICE("Switching to log file %s", 00118 request.filename().c_str()); 00119 std::string error = Core::Debug::setLogFilename(request.filename()); 00120 if (error.empty()) { 00121 NOTICE("Switched from log file %s", 00122 prev.c_str()); 00123 } else { 00124 ERROR("Failed to switch to log file %s: %s", 00125 request.filename().c_str(), 00126 error.c_str()); 00127 response.set_error(error); 00128 } 00129 rpc.reply(response); 00130 } 00131 00132 void 00133 ControlService::debugPolicyGet(RPC::ServerRPC rpc) 00134 { 00135 PRELUDE(DebugPolicyGet); 00136 response.set_policy( 00137 Core::Debug::logPolicyToString( 00138 Core::Debug::getLogPolicy())); 00139 rpc.reply(response); 00140 } 00141 00142 void 00143 ControlService::debugPolicySet(RPC::ServerRPC rpc) 00144 { 00145 PRELUDE(DebugPolicySet); 00146 NOTICE("Switching to log policy %s", 00147 request.policy().c_str()); 00148 Core::Debug::setLogPolicy( 00149 Core::Debug::logPolicyFromString( 00150 request.policy())); 00151 rpc.reply(response); 00152 } 00153 00154 void 00155 ControlService::debugRotate(RPC::ServerRPC rpc) 00156 { 00157 PRELUDE(DebugRotate); 00158 NOTICE("Rotating logs"); 00159 std::string error = Core::Debug::reopenLogFromFilename(); 00160 if (error.empty()) { 00161 NOTICE("Done rotating logs"); 00162 } else { 00163 ERROR("Failed to rotate log file: %s", 00164 error.c_str()); 00165 response.set_error(error); 00166 } 00167 rpc.reply(response); 00168 } 00169 00170 void 00171 ControlService::serverInfoGet(RPC::ServerRPC rpc) 00172 { 00173 PRELUDE(ServerInfoGet); 00174 response.set_server_id(globals.raft->serverId); 00175 response.set_addresses(globals.raft->serverAddresses); 00176 response.set_process_id(uint64_t(getpid())); 00177 rpc.reply(response); 00178 } 00179 00180 void 00181 ControlService::serverStatsDump(RPC::ServerRPC rpc) 00182 { 00183 PRELUDE(ServerStatsDump); 00184 NOTICE("Requested dump of ServerStats through ServerControl RPC"); 00185 globals.serverStats.dumpToDebugLog(); 00186 rpc.reply(response); 00187 } 00188 00189 void 00190 ControlService::serverStatsGet(RPC::ServerRPC rpc) 00191 { 00192 PRELUDE(ServerStatsGet); 00193 *response.mutable_server_stats() = globals.serverStats.getCurrent(); 00194 rpc.reply(response); 00195 } 00196 00197 void 00198 ControlService::snapshotControl(RPC::ServerRPC rpc) 00199 { 00200 PRELUDE(SnapshotControl); 00201 using Protocol::ServerControl::SnapshotCommand; 00202 switch (request.command()) { 00203 case SnapshotCommand::START_SNAPSHOT: 00204 globals.stateMachine->startTakingSnapshot(); 00205 break; 00206 case SnapshotCommand::STOP_SNAPSHOT: 00207 globals.stateMachine->stopTakingSnapshot(); 00208 break; 00209 case SnapshotCommand::RESTART_SNAPSHOT: 00210 globals.stateMachine->stopTakingSnapshot(); 00211 globals.stateMachine->startTakingSnapshot(); 00212 break; 00213 case SnapshotCommand::UNKNOWN_SNAPSHOT_COMMAND: // fallthrough 00214 default: 00215 response.set_error("Unknown SnapshotControl command"); 00216 } 00217 rpc.reply(response); 00218 } 00219 00220 void 00221 ControlService::snapshotInhibitGet(RPC::ServerRPC rpc) 00222 { 00223 PRELUDE(SnapshotInhibitGet); 00224 std::chrono::nanoseconds duration = globals.stateMachine->getInhibit(); 00225 assert(duration >= std::chrono::nanoseconds::zero()); 00226 response.set_nanoseconds(uint64_t(duration.count())); 00227 rpc.reply(response); 00228 } 00229 00230 void 00231 ControlService::snapshotInhibitSet(RPC::ServerRPC rpc) 00232 { 00233 PRELUDE(SnapshotInhibitSet); 00234 bool abort = true; 00235 std::chrono::nanoseconds duration; 00236 if (request.has_nanoseconds()) { 00237 duration = std::chrono::nanoseconds(request.nanoseconds()); 00238 if (request.nanoseconds() > 0 && 00239 duration < std::chrono::nanoseconds::zero()) { // overflow 00240 duration = std::chrono::nanoseconds::max(); 00241 } 00242 if (request.nanoseconds() == 0) 00243 abort = false; 00244 } else { 00245 duration = std::chrono::nanoseconds::max(); 00246 } 00247 globals.stateMachine->setInhibit(duration); 00248 if (abort) { 00249 globals.stateMachine->stopTakingSnapshot(); 00250 } 00251 rpc.reply(response); 00252 } 00253 00254 00255 } // namespace LogCabin::Server 00256 } // namespace LogCabin