LogCabin
RPC/ServerRPC.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include "Core/ProtoBuf.h"
00018 #include "RPC/ServerRPC.h"
00019 
00020 namespace LogCabin {
00021 namespace RPC {
00022 
00023 using RPC::Protocol::RequestHeaderPrefix;
00024 using RPC::Protocol::RequestHeaderVersion1;
00025 using RPC::Protocol::ResponseHeaderPrefix;
00026 using RPC::Protocol::ResponseHeaderVersion1;
00027 using RPC::Protocol::Status;
00028 
00029 ServerRPC::ServerRPC(OpaqueServerRPC opaqueRPC)
00030     : opaqueRPC(std::move(opaqueRPC))
00031     , active(true)
00032     , service(0)
00033     , serviceSpecificErrorVersion(0)
00034     , opCode(0)
00035 {
00036     const Core::Buffer& request = this->opaqueRPC.request;
00037 
00038     // Carefully read the headers.
00039     if (request.getLength() < sizeof(RequestHeaderPrefix)) {
00040         reject(Status::INVALID_REQUEST);
00041         return;
00042     }
00043     RequestHeaderPrefix requestHeaderPrefix =
00044         *static_cast<const RequestHeaderPrefix*>(request.getData());
00045     requestHeaderPrefix.fromBigEndian();
00046     if (requestHeaderPrefix.version != 1 ||
00047         request.getLength() < sizeof(RequestHeaderVersion1)) {
00048         reject(Status::INVALID_VERSION);
00049         return;
00050     }
00051     RequestHeaderVersion1 requestHeader =
00052         *static_cast<const RequestHeaderVersion1*>(request.getData());
00053     requestHeader.fromBigEndian();
00054 
00055     service = requestHeader.service;
00056     serviceSpecificErrorVersion = requestHeader.serviceSpecificErrorVersion;
00057     opCode = requestHeader.opCode;
00058 }
00059 
00060 ServerRPC::ServerRPC()
00061     : opaqueRPC()
00062     , active(false)
00063     , service(0)
00064     , serviceSpecificErrorVersion(0)
00065     , opCode(0)
00066 {
00067 }
00068 
00069 ServerRPC::ServerRPC(ServerRPC&& other)
00070     : opaqueRPC(std::move(other.opaqueRPC))
00071     , active(other.active)
00072     , service(other.service)
00073     , serviceSpecificErrorVersion(other.serviceSpecificErrorVersion)
00074     , opCode(other.opCode)
00075 {
00076     other.active = false;
00077 }
00078 
00079 ServerRPC::~ServerRPC()
00080 {
00081     if (active) {
00082         WARNING("ServerRPC destroyed without a reply (service %u, opcode %u). "
00083                 "This may cause the client of the RPC to hang",
00084                 service, opCode);
00085     }
00086 }
00087 
00088 ServerRPC&
00089 ServerRPC::operator=(ServerRPC&& other)
00090 {
00091     opaqueRPC = std::move(other.opaqueRPC);
00092     active = other.active;
00093     other.active = false;
00094     service = other.service;
00095     serviceSpecificErrorVersion = other.serviceSpecificErrorVersion;
00096     opCode = other.opCode;
00097     return *this;
00098 }
00099 
00100 bool
00101 ServerRPC::getRequest(google::protobuf::Message& request)
00102 {
00103     if (!active)
00104         return false;
00105     if (!Core::ProtoBuf::parse(opaqueRPC.request, request,
00106                                sizeof(RequestHeaderVersion1))) {
00107         rejectInvalidRequest();
00108         return false;
00109     }
00110     return true;
00111 }
00112 
00113 bool
00114 ServerRPC::getRequest(Core::Buffer& buffer) const
00115 {
00116     if (!active)
00117         return false;
00118     uint64_t bytes = opaqueRPC.request.getLength();
00119     assert(bytes >= sizeof(RequestHeaderVersion1));
00120     bytes -= sizeof(RequestHeaderVersion1);
00121     buffer.setData(new char[bytes],
00122                    bytes,
00123                    Core::Buffer::deleteArrayFn<char>);
00124     memcpy(buffer.getData(),
00125            (static_cast<const char*>(opaqueRPC.request.getData()) +
00126             sizeof(RequestHeaderVersion1)),
00127            bytes);
00128     return true;
00129 }
00130 
00131 void
00132 ServerRPC::reply(const google::protobuf::Message& payload)
00133 {
00134     active = false;
00135     Core::Buffer buffer;
00136     Core::ProtoBuf::serialize(payload, buffer,
00137                               sizeof(ResponseHeaderVersion1));
00138     auto& responseHeader =
00139         *static_cast<ResponseHeaderVersion1*>(buffer.getData());
00140     responseHeader.prefix.status = Status::OK;
00141     responseHeader.prefix.toBigEndian();
00142     responseHeader.toBigEndian();
00143     opaqueRPC.response = std::move(buffer);
00144     opaqueRPC.sendReply();
00145 }
00146 
00147 void
00148 ServerRPC::returnError(const google::protobuf::Message& serviceSpecificError)
00149 {
00150     active = false;
00151     Core::Buffer buffer;
00152     Core::ProtoBuf::serialize(serviceSpecificError, buffer,
00153                               sizeof(ResponseHeaderVersion1));
00154     auto& responseHeader =
00155         *static_cast<ResponseHeaderVersion1*>(buffer.getData());
00156     responseHeader.prefix.status = Status::SERVICE_SPECIFIC_ERROR;
00157     responseHeader.prefix.toBigEndian();
00158     responseHeader.toBigEndian();
00159     opaqueRPC.response = std::move(buffer);
00160     opaqueRPC.sendReply();
00161 }
00162 
00163 void
00164 ServerRPC::rejectInvalidService()
00165 {
00166     reject(Status::INVALID_SERVICE);
00167 }
00168 
00169 void
00170 ServerRPC::rejectInvalidRequest()
00171 {
00172     reject(Status::INVALID_REQUEST);
00173 }
00174 
00175 void
00176 ServerRPC::closeSession()
00177 {
00178     active = false;
00179     opaqueRPC.closeSession();
00180 }
00181 
00182 void
00183 ServerRPC::reject(RPC::Protocol::Status status)
00184 {
00185     active = false;
00186     ResponseHeaderVersion1& responseHeader = *new ResponseHeaderVersion1();
00187     responseHeader.prefix.status = status;
00188     responseHeader.prefix.toBigEndian();
00189     responseHeader.toBigEndian();
00190     opaqueRPC.response.setData(
00191         &responseHeader,
00192         sizeof(responseHeader),
00193         Core::Buffer::deleteObjectFn<ResponseHeaderVersion1*>);
00194     opaqueRPC.sendReply();
00195 }
00196 
00197 
00198 } // namespace LogCabin::RPC
00199 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines