LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2014 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include "Core/Debug.h" 00018 #include "Core/ProtoBuf.h" 00019 #include "RPC/Protocol.h" 00020 #include "RPC/ClientRPC.h" 00021 #include "RPC/ClientSession.h" 00022 00023 namespace LogCabin { 00024 namespace RPC { 00025 00026 using RPC::Protocol::RequestHeaderPrefix; 00027 using RPC::Protocol::RequestHeaderVersion1; 00028 using RPC::Protocol::ResponseHeaderPrefix; 00029 using RPC::Protocol::ResponseHeaderVersion1; 00030 typedef RPC::Protocol::Status ProtocolStatus; 00031 00032 ClientRPC::ClientRPC(std::shared_ptr<RPC::ClientSession> session, 00033 uint16_t service, 00034 uint8_t serviceSpecificErrorVersion, 00035 uint16_t opCode, 00036 const google::protobuf::Message& request) 00037 : service(service) 00038 , opCode(opCode) 00039 , opaqueRPC() // placeholder, set again below 00040 { 00041 // Serialize the request into a Buffer 00042 Core::Buffer requestBuffer; 00043 Core::ProtoBuf::serialize(request, requestBuffer, 00044 sizeof(RequestHeaderVersion1)); 00045 auto& requestHeader = 00046 *static_cast<RequestHeaderVersion1*>(requestBuffer.getData()); 00047 requestHeader.prefix.version = 1; 00048 requestHeader.prefix.toBigEndian(); 00049 requestHeader.service = service; 00050 requestHeader.serviceSpecificErrorVersion = serviceSpecificErrorVersion; 00051 requestHeader.opCode = opCode; 00052 requestHeader.toBigEndian(); 00053 00054 // Send the request to the server 00055 assert(session); // makes debugging more obvious for somewhat common error 00056 opaqueRPC = session->sendRequest(std::move(requestBuffer)); 00057 } 00058 00059 ClientRPC::ClientRPC() 00060 : service(0) 00061 , opCode(0) 00062 , opaqueRPC() 00063 { 00064 } 00065 00066 ClientRPC::ClientRPC(ClientRPC&& other) 00067 : service(other.service) 00068 , opCode(other.opCode) 00069 , opaqueRPC(std::move(other.opaqueRPC)) 00070 { 00071 } 00072 00073 ClientRPC::~ClientRPC() 00074 { 00075 } 00076 00077 ClientRPC& 00078 ClientRPC::operator=(ClientRPC&& other) 00079 { 00080 service = other.service; 00081 opCode = other.opCode; 00082 opaqueRPC = std::move(other.opaqueRPC); 00083 return *this; 00084 } 00085 00086 void 00087 ClientRPC::cancel() 00088 { 00089 opaqueRPC.cancel(); 00090 } 00091 00092 bool 00093 ClientRPC::isReady() 00094 { 00095 return opaqueRPC.getStatus() != OpaqueClientRPC::Status::NOT_READY; 00096 } 00097 00098 ClientRPC::Status 00099 ClientRPC::waitForReply(google::protobuf::Message* response, 00100 google::protobuf::Message* serviceSpecificError, 00101 TimePoint timeout) 00102 { 00103 opaqueRPC.waitForReply(timeout); 00104 switch (opaqueRPC.getStatus()) { 00105 case OpaqueClientRPC::Status::NOT_READY: 00106 if (Clock::now() > timeout) { 00107 return Status::TIMEOUT; 00108 } else { 00109 PANIC("Waited for RPC but not ready and " 00110 "timeout hasn't elapsed (timeout=%s, now=%s)", 00111 Core::StringUtil::toString(timeout).c_str(), 00112 Core::StringUtil::toString(Clock::now()).c_str()); 00113 } 00114 break; 00115 case OpaqueClientRPC::Status::OK: 00116 break; 00117 case OpaqueClientRPC::Status::ERROR: 00118 return Status::RPC_FAILED; 00119 case OpaqueClientRPC::Status::CANCELED: 00120 return Status::RPC_CANCELED; 00121 } 00122 const Core::Buffer& responseBuffer = *opaqueRPC.peekReply(); 00123 00124 // Extract the response's status field. 00125 if (responseBuffer.getLength() < sizeof(ResponseHeaderPrefix)) { 00126 PANIC("The response from the server for RPC to service %u, opcode " 00127 "%u was too short to be valid (%lu bytes). This probably " 00128 "indicates network or memory corruption.", 00129 service, opCode, responseBuffer.getLength()); 00130 } 00131 ResponseHeaderPrefix responseHeaderPrefix = 00132 *static_cast<const ResponseHeaderPrefix*>(responseBuffer.getData()); 00133 responseHeaderPrefix.fromBigEndian(); 00134 if (responseHeaderPrefix.status == ProtocolStatus::INVALID_VERSION) { 00135 // The server doesn't understand this version of the header 00136 // protocol. Since this library only runs version 1 of the 00137 // protocol, this shouldn't happen if servers continue supporting 00138 // version 1. 00139 PANIC("This client is too old to talk to the server. " 00140 "You'll need to update your client library."); 00141 } 00142 00143 if (responseBuffer.getLength() < sizeof(ResponseHeaderVersion1)) { 00144 PANIC("The response from the server for RPC to service %u, opcode " 00145 "%u was too short to be valid. This probably indicates " 00146 "network or memory corruption.", 00147 service, opCode); 00148 } 00149 ResponseHeaderVersion1 responseHeader = 00150 *static_cast<const ResponseHeaderVersion1*>(responseBuffer.getData()); 00151 responseHeader.fromBigEndian(); 00152 00153 switch (responseHeader.prefix.status) { 00154 00155 // The RPC succeeded. Parse the response into a protocol buffer. 00156 case ProtocolStatus::OK: 00157 if (response != NULL && 00158 !Core::ProtoBuf::parse(responseBuffer, *response, 00159 sizeof(responseHeader))) { 00160 PANIC("Could not parse the protocol buffer out of the server " 00161 "response for RPC to service %u, opcode %u", 00162 service, opCode); 00163 } 00164 return Status::OK; 00165 00166 // The RPC failed in a service-specific way. Parse the response into a 00167 // protocol buffer. 00168 case ProtocolStatus::SERVICE_SPECIFIC_ERROR: 00169 if (serviceSpecificError != NULL && 00170 !Core::ProtoBuf::parse(responseBuffer, *serviceSpecificError, 00171 sizeof(responseHeader))) { 00172 PANIC("Could not parse the protocol buffer out of the " 00173 "service-specific error details for RPC to service " 00174 "%u, opcode %u", 00175 service, opCode); 00176 } 00177 return Status::SERVICE_SPECIFIC_ERROR; 00178 00179 // The server is not running the requested service. 00180 case ProtocolStatus::INVALID_SERVICE: 00181 return Status::INVALID_SERVICE; 00182 00183 // The server disliked our request, probably because it doesn't support 00184 // the opcode, or maybe the request arguments were invalid. 00185 case ProtocolStatus::INVALID_REQUEST: 00186 return Status::INVALID_REQUEST; 00187 00188 default: 00189 // The server shouldn't reply back with status codes we don't 00190 // understand. That's why we gave it a version number in the 00191 // request header. 00192 PANIC("Unknown status %u returned from server after sending it " 00193 "protocol version 1 in the request header for RPC to " 00194 "service %u, opcode %u. This probably indicates a bug in " 00195 "the server.", 00196 uint32_t(responseHeader.prefix.status), service, opCode); 00197 } 00198 00199 } 00200 00201 std::string 00202 ClientRPC::getErrorMessage() const 00203 { 00204 return opaqueRPC.getErrorMessage(); 00205 } 00206 00207 ::std::ostream& 00208 operator<<(::std::ostream& os, ClientRPC::Status status) 00209 { 00210 typedef ClientRPC::Status Status; 00211 switch (status) { 00212 case Status::OK: 00213 return os << "OK"; 00214 case Status::SERVICE_SPECIFIC_ERROR: 00215 return os << "SERVICE_SPECIFIC_ERROR"; 00216 case Status::RPC_FAILED: 00217 return os << "RPC_FAILED"; 00218 case Status::RPC_CANCELED: 00219 return os << "RPC_CANCELED"; 00220 case Status::TIMEOUT: 00221 return os << "TIMEOUT"; 00222 case Status::INVALID_SERVICE: 00223 return os << "INVALID_SERVICE"; 00224 case Status::INVALID_REQUEST: 00225 return os << "INVALID_REQUEST"; 00226 default: 00227 return os << "(INVALID STATUS VALUE)"; 00228 } 00229 } 00230 00231 } // namespace LogCabin::RPC 00232 } // namespace LogCabin