LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <string.h> 00018 00019 #include "build/Protocol/Client.pb.h" 00020 #include "Core/Buffer.h" 00021 #include "Core/ProtoBuf.h" 00022 #include "Core/Time.h" 00023 #include "RPC/ServerRPC.h" 00024 #include "Server/RaftConsensus.h" 00025 #include "Server/ClientService.h" 00026 #include "Server/Globals.h" 00027 #include "Server/StateMachine.h" 00028 00029 namespace LogCabin { 00030 namespace Server { 00031 00032 typedef RaftConsensus::ClientResult Result; 00033 00034 ClientService::ClientService(Globals& globals) 00035 : globals(globals) 00036 { 00037 } 00038 00039 ClientService::~ClientService() 00040 { 00041 } 00042 00043 void 00044 ClientService::handleRPC(RPC::ServerRPC rpc) 00045 { 00046 using Protocol::Client::OpCode; 00047 00048 // Call the appropriate RPC handler based on the request's opCode. 00049 switch (rpc.getOpCode()) { 00050 case OpCode::GET_SERVER_INFO: 00051 getServerInfo(std::move(rpc)); 00052 break; 00053 case OpCode::VERIFY_RECIPIENT: 00054 verifyRecipient(std::move(rpc)); 00055 break; 00056 case OpCode::GET_CONFIGURATION: 00057 getConfiguration(std::move(rpc)); 00058 break; 00059 case OpCode::SET_CONFIGURATION: 00060 setConfiguration(std::move(rpc)); 00061 break; 00062 case OpCode::STATE_MACHINE_COMMAND: 00063 stateMachineCommand(std::move(rpc)); 00064 break; 00065 case OpCode::STATE_MACHINE_QUERY: 00066 stateMachineQuery(std::move(rpc)); 00067 break; 00068 default: 00069 WARNING("Received RPC request with unknown opcode %u: " 00070 "rejecting it as invalid request", 00071 rpc.getOpCode()); 00072 rpc.rejectInvalidRequest(); 00073 } 00074 } 00075 00076 std::string 00077 ClientService::getName() const 00078 { 00079 return "ClientService"; 00080 } 00081 00082 00083 /** 00084 * Place this at the top of each RPC handler. Afterwards, 'request' will refer 00085 * to the protocol buffer for the request with all required fields set. 00086 * 'response' will be an empty protocol buffer for you to fill in the response. 00087 */ 00088 #define PRELUDE(rpcClass) \ 00089 Protocol::Client::rpcClass::Request request; \ 00090 Protocol::Client::rpcClass::Response response; \ 00091 if (!rpc.getRequest(request)) \ 00092 return; 00093 00094 ////////// RPC handlers ////////// 00095 00096 00097 void 00098 ClientService::getServerInfo(RPC::ServerRPC rpc) 00099 { 00100 PRELUDE(GetServerInfo); 00101 Protocol::Client::Server& info = *response.mutable_server_info(); 00102 info.set_server_id(globals.raft->serverId); 00103 info.set_addresses(globals.raft->serverAddresses); 00104 rpc.reply(response); 00105 } 00106 00107 void 00108 ClientService::getConfiguration(RPC::ServerRPC rpc) 00109 { 00110 PRELUDE(GetConfiguration); 00111 Protocol::Raft::SimpleConfiguration configuration; 00112 uint64_t id; 00113 Result result = globals.raft->getConfiguration(configuration, id); 00114 if (result == Result::RETRY || result == Result::NOT_LEADER) { 00115 Protocol::Client::Error error; 00116 error.set_error_code(Protocol::Client::Error::NOT_LEADER); 00117 std::string leaderHint = globals.raft->getLeaderHint(); 00118 if (!leaderHint.empty()) 00119 error.set_leader_hint(leaderHint); 00120 rpc.returnError(error); 00121 return; 00122 } 00123 response.set_id(id); 00124 for (auto it = configuration.servers().begin(); 00125 it != configuration.servers().end(); 00126 ++it) { 00127 Protocol::Client::Server* server = response.add_servers(); 00128 server->set_server_id(it->server_id()); 00129 server->set_addresses(it->addresses()); 00130 } 00131 rpc.reply(response); 00132 } 00133 00134 void 00135 ClientService::setConfiguration(RPC::ServerRPC rpc) 00136 { 00137 PRELUDE(SetConfiguration); 00138 Result result = globals.raft->setConfiguration(request, response); 00139 if (result == Result::RETRY || result == Result::NOT_LEADER) { 00140 Protocol::Client::Error error; 00141 error.set_error_code(Protocol::Client::Error::NOT_LEADER); 00142 std::string leaderHint = globals.raft->getLeaderHint(); 00143 if (!leaderHint.empty()) 00144 error.set_leader_hint(leaderHint); 00145 rpc.returnError(error); 00146 return; 00147 } 00148 rpc.reply(response); 00149 } 00150 00151 void 00152 ClientService::stateMachineCommand(RPC::ServerRPC rpc) 00153 { 00154 PRELUDE(StateMachineCommand); 00155 Core::Buffer cmdBuffer; 00156 rpc.getRequest(cmdBuffer); 00157 std::pair<Result, uint64_t> result = globals.raft->replicate(cmdBuffer); 00158 if (result.first == Result::RETRY || result.first == Result::NOT_LEADER) { 00159 Protocol::Client::Error error; 00160 error.set_error_code(Protocol::Client::Error::NOT_LEADER); 00161 std::string leaderHint = globals.raft->getLeaderHint(); 00162 if (!leaderHint.empty()) 00163 error.set_leader_hint(leaderHint); 00164 rpc.returnError(error); 00165 return; 00166 } 00167 assert(result.first == Result::SUCCESS); 00168 uint64_t logIndex = result.second; 00169 if (!globals.stateMachine->waitForResponse(logIndex, request, response)) { 00170 rpc.rejectInvalidRequest(); 00171 return; 00172 } 00173 rpc.reply(response); 00174 } 00175 00176 void 00177 ClientService::stateMachineQuery(RPC::ServerRPC rpc) 00178 { 00179 PRELUDE(StateMachineQuery); 00180 std::pair<Result, uint64_t> result = globals.raft->getLastCommitIndex(); 00181 if (result.first == Result::RETRY || result.first == Result::NOT_LEADER) { 00182 Protocol::Client::Error error; 00183 error.set_error_code(Protocol::Client::Error::NOT_LEADER); 00184 std::string leaderHint = globals.raft->getLeaderHint(); 00185 if (!leaderHint.empty()) 00186 error.set_leader_hint(leaderHint); 00187 rpc.returnError(error); 00188 return; 00189 } 00190 assert(result.first == Result::SUCCESS); 00191 uint64_t logIndex = result.second; 00192 globals.stateMachine->wait(logIndex); 00193 if (!globals.stateMachine->query(request, response)) 00194 rpc.rejectInvalidRequest(); 00195 rpc.reply(response); 00196 } 00197 00198 void 00199 ClientService::verifyRecipient(RPC::ServerRPC rpc) 00200 { 00201 PRELUDE(VerifyRecipient); 00202 00203 std::string clusterUUID = globals.clusterUUID.getOrDefault(); 00204 uint64_t serverId = globals.serverId; 00205 00206 if (!clusterUUID.empty()) 00207 response.set_cluster_uuid(clusterUUID); 00208 response.set_server_id(serverId); 00209 00210 if (request.has_cluster_uuid() && 00211 !request.cluster_uuid().empty() && 00212 !clusterUUID.empty() && 00213 clusterUUID != request.cluster_uuid()) { 00214 response.set_ok(false); 00215 response.set_error(Core::StringUtil::format( 00216 "Mismatched cluster UUIDs: request intended for %s, " 00217 "but this server is in %s", 00218 request.cluster_uuid().c_str(), 00219 clusterUUID.c_str())); 00220 } else if (request.has_server_id() && 00221 serverId != request.server_id()) { 00222 response.set_ok(false); 00223 response.set_error(Core::StringUtil::format( 00224 "Mismatched server IDs: request intended for %lu, " 00225 "but this server is %lu", 00226 request.server_id(), 00227 serverId)); 00228 } else { 00229 response.set_ok(true); 00230 if (clusterUUID.empty() && 00231 request.has_cluster_uuid() && 00232 !request.cluster_uuid().empty()) { 00233 NOTICE("Adopting cluster UUID %s", 00234 request.cluster_uuid().c_str()); 00235 globals.clusterUUID.set(request.cluster_uuid()); 00236 response.set_cluster_uuid(request.cluster_uuid()); 00237 } 00238 } 00239 rpc.reply(response); 00240 } 00241 00242 } // namespace LogCabin::Server 00243 } // namespace LogCabin