LogCabin
Server/ClientService.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <string.h>
00018 
00019 #include "build/Protocol/Client.pb.h"
00020 #include "Core/Buffer.h"
00021 #include "Core/ProtoBuf.h"
00022 #include "Core/Time.h"
00023 #include "RPC/ServerRPC.h"
00024 #include "Server/RaftConsensus.h"
00025 #include "Server/ClientService.h"
00026 #include "Server/Globals.h"
00027 #include "Server/StateMachine.h"
00028 
00029 namespace LogCabin {
00030 namespace Server {
00031 
00032 typedef RaftConsensus::ClientResult Result;
00033 
00034 ClientService::ClientService(Globals& globals)
00035     : globals(globals)
00036 {
00037 }
00038 
00039 ClientService::~ClientService()
00040 {
00041 }
00042 
00043 void
00044 ClientService::handleRPC(RPC::ServerRPC rpc)
00045 {
00046     using Protocol::Client::OpCode;
00047 
00048     // Call the appropriate RPC handler based on the request's opCode.
00049     switch (rpc.getOpCode()) {
00050         case OpCode::GET_SERVER_INFO:
00051             getServerInfo(std::move(rpc));
00052             break;
00053         case OpCode::VERIFY_RECIPIENT:
00054             verifyRecipient(std::move(rpc));
00055             break;
00056         case OpCode::GET_CONFIGURATION:
00057             getConfiguration(std::move(rpc));
00058             break;
00059         case OpCode::SET_CONFIGURATION:
00060             setConfiguration(std::move(rpc));
00061             break;
00062         case OpCode::STATE_MACHINE_COMMAND:
00063             stateMachineCommand(std::move(rpc));
00064             break;
00065         case OpCode::STATE_MACHINE_QUERY:
00066             stateMachineQuery(std::move(rpc));
00067             break;
00068         default:
00069             WARNING("Received RPC request with unknown opcode %u: "
00070                     "rejecting it as invalid request",
00071                     rpc.getOpCode());
00072             rpc.rejectInvalidRequest();
00073     }
00074 }
00075 
00076 std::string
00077 ClientService::getName() const
00078 {
00079     return "ClientService";
00080 }
00081 
00082 
00083 /**
00084  * Place this at the top of each RPC handler. Afterwards, 'request' will refer
00085  * to the protocol buffer for the request with all required fields set.
00086  * 'response' will be an empty protocol buffer for you to fill in the response.
00087  */
00088 #define PRELUDE(rpcClass) \
00089     Protocol::Client::rpcClass::Request request; \
00090     Protocol::Client::rpcClass::Response response; \
00091     if (!rpc.getRequest(request)) \
00092         return;
00093 
00094 ////////// RPC handlers //////////
00095 
00096 
00097 void
00098 ClientService::getServerInfo(RPC::ServerRPC rpc)
00099 {
00100     PRELUDE(GetServerInfo);
00101     Protocol::Client::Server& info = *response.mutable_server_info();
00102     info.set_server_id(globals.raft->serverId);
00103     info.set_addresses(globals.raft->serverAddresses);
00104     rpc.reply(response);
00105 }
00106 
00107 void
00108 ClientService::getConfiguration(RPC::ServerRPC rpc)
00109 {
00110     PRELUDE(GetConfiguration);
00111     Protocol::Raft::SimpleConfiguration configuration;
00112     uint64_t id;
00113     Result result = globals.raft->getConfiguration(configuration, id);
00114     if (result == Result::RETRY || result == Result::NOT_LEADER) {
00115         Protocol::Client::Error error;
00116         error.set_error_code(Protocol::Client::Error::NOT_LEADER);
00117         std::string leaderHint = globals.raft->getLeaderHint();
00118         if (!leaderHint.empty())
00119             error.set_leader_hint(leaderHint);
00120         rpc.returnError(error);
00121         return;
00122     }
00123     response.set_id(id);
00124     for (auto it = configuration.servers().begin();
00125          it != configuration.servers().end();
00126          ++it) {
00127         Protocol::Client::Server* server = response.add_servers();
00128         server->set_server_id(it->server_id());
00129         server->set_addresses(it->addresses());
00130     }
00131     rpc.reply(response);
00132 }
00133 
00134 void
00135 ClientService::setConfiguration(RPC::ServerRPC rpc)
00136 {
00137     PRELUDE(SetConfiguration);
00138     Result result = globals.raft->setConfiguration(request, response);
00139     if (result == Result::RETRY || result == Result::NOT_LEADER) {
00140         Protocol::Client::Error error;
00141         error.set_error_code(Protocol::Client::Error::NOT_LEADER);
00142         std::string leaderHint = globals.raft->getLeaderHint();
00143         if (!leaderHint.empty())
00144             error.set_leader_hint(leaderHint);
00145         rpc.returnError(error);
00146         return;
00147     }
00148     rpc.reply(response);
00149 }
00150 
00151 void
00152 ClientService::stateMachineCommand(RPC::ServerRPC rpc)
00153 {
00154     PRELUDE(StateMachineCommand);
00155     Core::Buffer cmdBuffer;
00156     rpc.getRequest(cmdBuffer);
00157     std::pair<Result, uint64_t> result = globals.raft->replicate(cmdBuffer);
00158     if (result.first == Result::RETRY || result.first == Result::NOT_LEADER) {
00159         Protocol::Client::Error error;
00160         error.set_error_code(Protocol::Client::Error::NOT_LEADER);
00161         std::string leaderHint = globals.raft->getLeaderHint();
00162         if (!leaderHint.empty())
00163             error.set_leader_hint(leaderHint);
00164         rpc.returnError(error);
00165         return;
00166     }
00167     assert(result.first == Result::SUCCESS);
00168     uint64_t logIndex = result.second;
00169     if (!globals.stateMachine->waitForResponse(logIndex, request, response)) {
00170         rpc.rejectInvalidRequest();
00171         return;
00172     }
00173     rpc.reply(response);
00174 }
00175 
00176 void
00177 ClientService::stateMachineQuery(RPC::ServerRPC rpc)
00178 {
00179     PRELUDE(StateMachineQuery);
00180     std::pair<Result, uint64_t> result = globals.raft->getLastCommitIndex();
00181     if (result.first == Result::RETRY || result.first == Result::NOT_LEADER) {
00182         Protocol::Client::Error error;
00183         error.set_error_code(Protocol::Client::Error::NOT_LEADER);
00184         std::string leaderHint = globals.raft->getLeaderHint();
00185         if (!leaderHint.empty())
00186             error.set_leader_hint(leaderHint);
00187         rpc.returnError(error);
00188         return;
00189     }
00190     assert(result.first == Result::SUCCESS);
00191     uint64_t logIndex = result.second;
00192     globals.stateMachine->wait(logIndex);
00193     if (!globals.stateMachine->query(request, response))
00194         rpc.rejectInvalidRequest();
00195     rpc.reply(response);
00196 }
00197 
00198 void
00199 ClientService::verifyRecipient(RPC::ServerRPC rpc)
00200 {
00201     PRELUDE(VerifyRecipient);
00202 
00203     std::string clusterUUID = globals.clusterUUID.getOrDefault();
00204     uint64_t serverId = globals.serverId;
00205 
00206     if (!clusterUUID.empty())
00207         response.set_cluster_uuid(clusterUUID);
00208     response.set_server_id(serverId);
00209 
00210     if (request.has_cluster_uuid() &&
00211         !request.cluster_uuid().empty() &&
00212         !clusterUUID.empty() &&
00213         clusterUUID != request.cluster_uuid()) {
00214         response.set_ok(false);
00215         response.set_error(Core::StringUtil::format(
00216            "Mismatched cluster UUIDs: request intended for %s, "
00217            "but this server is in %s",
00218            request.cluster_uuid().c_str(),
00219            clusterUUID.c_str()));
00220     } else if (request.has_server_id() &&
00221                serverId != request.server_id()) {
00222         response.set_ok(false);
00223         response.set_error(Core::StringUtil::format(
00224            "Mismatched server IDs: request intended for %lu, "
00225            "but this server is %lu",
00226            request.server_id(),
00227            serverId));
00228     } else {
00229         response.set_ok(true);
00230         if (clusterUUID.empty() &&
00231             request.has_cluster_uuid() &&
00232             !request.cluster_uuid().empty()) {
00233             NOTICE("Adopting cluster UUID %s",
00234                    request.cluster_uuid().c_str());
00235             globals.clusterUUID.set(request.cluster_uuid());
00236             response.set_cluster_uuid(request.cluster_uuid());
00237         }
00238     }
00239     rpc.reply(response);
00240 }
00241 
00242 } // namespace LogCabin::Server
00243 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines