LogCabin
RPC/ClientRPC.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2014 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include "Core/Debug.h"
00018 #include "Core/ProtoBuf.h"
00019 #include "RPC/Protocol.h"
00020 #include "RPC/ClientRPC.h"
00021 #include "RPC/ClientSession.h"
00022 
00023 namespace LogCabin {
00024 namespace RPC {
00025 
00026 using RPC::Protocol::RequestHeaderPrefix;
00027 using RPC::Protocol::RequestHeaderVersion1;
00028 using RPC::Protocol::ResponseHeaderPrefix;
00029 using RPC::Protocol::ResponseHeaderVersion1;
00030 typedef RPC::Protocol::Status ProtocolStatus;
00031 
00032 ClientRPC::ClientRPC(std::shared_ptr<RPC::ClientSession> session,
00033                      uint16_t service,
00034                      uint8_t serviceSpecificErrorVersion,
00035                      uint16_t opCode,
00036                      const google::protobuf::Message& request)
00037     : service(service)
00038     , opCode(opCode)
00039     , opaqueRPC() // placeholder, set again below
00040 {
00041     // Serialize the request into a Buffer
00042     Core::Buffer requestBuffer;
00043     Core::ProtoBuf::serialize(request, requestBuffer,
00044                               sizeof(RequestHeaderVersion1));
00045     auto& requestHeader =
00046         *static_cast<RequestHeaderVersion1*>(requestBuffer.getData());
00047     requestHeader.prefix.version = 1;
00048     requestHeader.prefix.toBigEndian();
00049     requestHeader.service = service;
00050     requestHeader.serviceSpecificErrorVersion = serviceSpecificErrorVersion;
00051     requestHeader.opCode = opCode;
00052     requestHeader.toBigEndian();
00053 
00054     // Send the request to the server
00055     assert(session); // makes debugging more obvious for somewhat common error
00056     opaqueRPC = session->sendRequest(std::move(requestBuffer));
00057 }
00058 
00059 ClientRPC::ClientRPC()
00060     : service(0)
00061     , opCode(0)
00062     , opaqueRPC()
00063 {
00064 }
00065 
00066 ClientRPC::ClientRPC(ClientRPC&& other)
00067     : service(other.service)
00068     , opCode(other.opCode)
00069     , opaqueRPC(std::move(other.opaqueRPC))
00070 {
00071 }
00072 
00073 ClientRPC::~ClientRPC()
00074 {
00075 }
00076 
00077 ClientRPC&
00078 ClientRPC::operator=(ClientRPC&& other)
00079 {
00080     service = other.service;
00081     opCode = other.opCode;
00082     opaqueRPC = std::move(other.opaqueRPC);
00083     return *this;
00084 }
00085 
00086 void
00087 ClientRPC::cancel()
00088 {
00089     opaqueRPC.cancel();
00090 }
00091 
00092 bool
00093 ClientRPC::isReady()
00094 {
00095     return opaqueRPC.getStatus() != OpaqueClientRPC::Status::NOT_READY;
00096 }
00097 
00098 ClientRPC::Status
00099 ClientRPC::waitForReply(google::protobuf::Message* response,
00100                         google::protobuf::Message* serviceSpecificError,
00101                         TimePoint timeout)
00102 {
00103     opaqueRPC.waitForReply(timeout);
00104     switch (opaqueRPC.getStatus()) {
00105         case OpaqueClientRPC::Status::NOT_READY:
00106             if (Clock::now() > timeout) {
00107                 return Status::TIMEOUT;
00108             } else {
00109                 PANIC("Waited for RPC but not ready and "
00110                       "timeout hasn't elapsed (timeout=%s, now=%s)",
00111                       Core::StringUtil::toString(timeout).c_str(),
00112                       Core::StringUtil::toString(Clock::now()).c_str());
00113             }
00114             break;
00115         case OpaqueClientRPC::Status::OK:
00116             break;
00117         case OpaqueClientRPC::Status::ERROR:
00118             return Status::RPC_FAILED;
00119         case OpaqueClientRPC::Status::CANCELED:
00120             return Status::RPC_CANCELED;
00121     }
00122     const Core::Buffer& responseBuffer = *opaqueRPC.peekReply();
00123 
00124     // Extract the response's status field.
00125     if (responseBuffer.getLength() < sizeof(ResponseHeaderPrefix)) {
00126         PANIC("The response from the server for RPC to service %u, opcode "
00127               "%u was too short to be valid (%lu bytes). This probably "
00128               "indicates network or memory corruption.",
00129               service, opCode, responseBuffer.getLength());
00130     }
00131     ResponseHeaderPrefix responseHeaderPrefix =
00132         *static_cast<const ResponseHeaderPrefix*>(responseBuffer.getData());
00133     responseHeaderPrefix.fromBigEndian();
00134     if (responseHeaderPrefix.status == ProtocolStatus::INVALID_VERSION) {
00135         // The server doesn't understand this version of the header
00136         // protocol. Since this library only runs version 1 of the
00137         // protocol, this shouldn't happen if servers continue supporting
00138         // version 1.
00139         PANIC("This client is too old to talk to the server. "
00140               "You'll need to update your client library.");
00141     }
00142 
00143     if (responseBuffer.getLength() < sizeof(ResponseHeaderVersion1)) {
00144         PANIC("The response from the server for RPC to service %u, opcode "
00145               "%u was too short to be valid. This probably indicates "
00146               "network or memory corruption.",
00147               service, opCode);
00148     }
00149     ResponseHeaderVersion1 responseHeader =
00150         *static_cast<const ResponseHeaderVersion1*>(responseBuffer.getData());
00151     responseHeader.fromBigEndian();
00152 
00153     switch (responseHeader.prefix.status) {
00154 
00155         // The RPC succeeded. Parse the response into a protocol buffer.
00156         case ProtocolStatus::OK:
00157             if (response != NULL &&
00158                 !Core::ProtoBuf::parse(responseBuffer, *response,
00159                                        sizeof(responseHeader))) {
00160                 PANIC("Could not parse the protocol buffer out of the server "
00161                       "response for RPC to service %u, opcode %u",
00162                       service, opCode);
00163             }
00164             return Status::OK;
00165 
00166         // The RPC failed in a service-specific way. Parse the response into a
00167         // protocol buffer.
00168         case ProtocolStatus::SERVICE_SPECIFIC_ERROR:
00169             if (serviceSpecificError != NULL &&
00170                 !Core::ProtoBuf::parse(responseBuffer, *serviceSpecificError,
00171                                        sizeof(responseHeader))) {
00172                 PANIC("Could not parse the protocol buffer out of the "
00173                       "service-specific error details for RPC to service "
00174                       "%u, opcode %u",
00175                       service, opCode);
00176             }
00177             return Status::SERVICE_SPECIFIC_ERROR;
00178 
00179         // The server is not running the requested service.
00180         case ProtocolStatus::INVALID_SERVICE:
00181             return Status::INVALID_SERVICE;
00182 
00183         // The server disliked our request, probably because it doesn't support
00184         // the opcode, or maybe the request arguments were invalid.
00185         case ProtocolStatus::INVALID_REQUEST:
00186             return Status::INVALID_REQUEST;
00187 
00188         default:
00189             // The server shouldn't reply back with status codes we don't
00190             // understand. That's why we gave it a version number in the
00191             // request header.
00192             PANIC("Unknown status %u returned from server after sending it "
00193                   "protocol version 1 in the request header for RPC to "
00194                   "service %u, opcode %u. This probably indicates a bug in "
00195                   "the server.",
00196                   uint32_t(responseHeader.prefix.status), service, opCode);
00197     }
00198 
00199 }
00200 
00201 std::string
00202 ClientRPC::getErrorMessage() const
00203 {
00204     return opaqueRPC.getErrorMessage();
00205 }
00206 
00207 ::std::ostream&
00208 operator<<(::std::ostream& os, ClientRPC::Status status)
00209 {
00210     typedef ClientRPC::Status Status;
00211     switch (status) {
00212         case Status::OK:
00213             return os << "OK";
00214         case Status::SERVICE_SPECIFIC_ERROR:
00215             return os << "SERVICE_SPECIFIC_ERROR";
00216         case Status::RPC_FAILED:
00217             return os << "RPC_FAILED";
00218         case Status::RPC_CANCELED:
00219             return os << "RPC_CANCELED";
00220         case Status::TIMEOUT:
00221             return os << "TIMEOUT";
00222         case Status::INVALID_SERVICE:
00223             return os << "INVALID_SERVICE";
00224         case Status::INVALID_REQUEST:
00225             return os << "INVALID_REQUEST";
00226         default:
00227             return os << "(INVALID STATUS VALUE)";
00228     }
00229 }
00230 
00231 } // namespace LogCabin::RPC
00232 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines