LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include "Core/ProtoBuf.h" 00018 #include "RPC/ServerRPC.h" 00019 00020 namespace LogCabin { 00021 namespace RPC { 00022 00023 using RPC::Protocol::RequestHeaderPrefix; 00024 using RPC::Protocol::RequestHeaderVersion1; 00025 using RPC::Protocol::ResponseHeaderPrefix; 00026 using RPC::Protocol::ResponseHeaderVersion1; 00027 using RPC::Protocol::Status; 00028 00029 ServerRPC::ServerRPC(OpaqueServerRPC opaqueRPC) 00030 : opaqueRPC(std::move(opaqueRPC)) 00031 , active(true) 00032 , service(0) 00033 , serviceSpecificErrorVersion(0) 00034 , opCode(0) 00035 { 00036 const Core::Buffer& request = this->opaqueRPC.request; 00037 00038 // Carefully read the headers. 00039 if (request.getLength() < sizeof(RequestHeaderPrefix)) { 00040 reject(Status::INVALID_REQUEST); 00041 return; 00042 } 00043 RequestHeaderPrefix requestHeaderPrefix = 00044 *static_cast<const RequestHeaderPrefix*>(request.getData()); 00045 requestHeaderPrefix.fromBigEndian(); 00046 if (requestHeaderPrefix.version != 1 || 00047 request.getLength() < sizeof(RequestHeaderVersion1)) { 00048 reject(Status::INVALID_VERSION); 00049 return; 00050 } 00051 RequestHeaderVersion1 requestHeader = 00052 *static_cast<const RequestHeaderVersion1*>(request.getData()); 00053 requestHeader.fromBigEndian(); 00054 00055 service = requestHeader.service; 00056 serviceSpecificErrorVersion = requestHeader.serviceSpecificErrorVersion; 00057 opCode = requestHeader.opCode; 00058 } 00059 00060 ServerRPC::ServerRPC() 00061 : opaqueRPC() 00062 , active(false) 00063 , service(0) 00064 , serviceSpecificErrorVersion(0) 00065 , opCode(0) 00066 { 00067 } 00068 00069 ServerRPC::ServerRPC(ServerRPC&& other) 00070 : opaqueRPC(std::move(other.opaqueRPC)) 00071 , active(other.active) 00072 , service(other.service) 00073 , serviceSpecificErrorVersion(other.serviceSpecificErrorVersion) 00074 , opCode(other.opCode) 00075 { 00076 other.active = false; 00077 } 00078 00079 ServerRPC::~ServerRPC() 00080 { 00081 if (active) { 00082 WARNING("ServerRPC destroyed without a reply (service %u, opcode %u). " 00083 "This may cause the client of the RPC to hang", 00084 service, opCode); 00085 } 00086 } 00087 00088 ServerRPC& 00089 ServerRPC::operator=(ServerRPC&& other) 00090 { 00091 opaqueRPC = std::move(other.opaqueRPC); 00092 active = other.active; 00093 other.active = false; 00094 service = other.service; 00095 serviceSpecificErrorVersion = other.serviceSpecificErrorVersion; 00096 opCode = other.opCode; 00097 return *this; 00098 } 00099 00100 bool 00101 ServerRPC::getRequest(google::protobuf::Message& request) 00102 { 00103 if (!active) 00104 return false; 00105 if (!Core::ProtoBuf::parse(opaqueRPC.request, request, 00106 sizeof(RequestHeaderVersion1))) { 00107 rejectInvalidRequest(); 00108 return false; 00109 } 00110 return true; 00111 } 00112 00113 bool 00114 ServerRPC::getRequest(Core::Buffer& buffer) const 00115 { 00116 if (!active) 00117 return false; 00118 uint64_t bytes = opaqueRPC.request.getLength(); 00119 assert(bytes >= sizeof(RequestHeaderVersion1)); 00120 bytes -= sizeof(RequestHeaderVersion1); 00121 buffer.setData(new char[bytes], 00122 bytes, 00123 Core::Buffer::deleteArrayFn<char>); 00124 memcpy(buffer.getData(), 00125 (static_cast<const char*>(opaqueRPC.request.getData()) + 00126 sizeof(RequestHeaderVersion1)), 00127 bytes); 00128 return true; 00129 } 00130 00131 void 00132 ServerRPC::reply(const google::protobuf::Message& payload) 00133 { 00134 active = false; 00135 Core::Buffer buffer; 00136 Core::ProtoBuf::serialize(payload, buffer, 00137 sizeof(ResponseHeaderVersion1)); 00138 auto& responseHeader = 00139 *static_cast<ResponseHeaderVersion1*>(buffer.getData()); 00140 responseHeader.prefix.status = Status::OK; 00141 responseHeader.prefix.toBigEndian(); 00142 responseHeader.toBigEndian(); 00143 opaqueRPC.response = std::move(buffer); 00144 opaqueRPC.sendReply(); 00145 } 00146 00147 void 00148 ServerRPC::returnError(const google::protobuf::Message& serviceSpecificError) 00149 { 00150 active = false; 00151 Core::Buffer buffer; 00152 Core::ProtoBuf::serialize(serviceSpecificError, buffer, 00153 sizeof(ResponseHeaderVersion1)); 00154 auto& responseHeader = 00155 *static_cast<ResponseHeaderVersion1*>(buffer.getData()); 00156 responseHeader.prefix.status = Status::SERVICE_SPECIFIC_ERROR; 00157 responseHeader.prefix.toBigEndian(); 00158 responseHeader.toBigEndian(); 00159 opaqueRPC.response = std::move(buffer); 00160 opaqueRPC.sendReply(); 00161 } 00162 00163 void 00164 ServerRPC::rejectInvalidService() 00165 { 00166 reject(Status::INVALID_SERVICE); 00167 } 00168 00169 void 00170 ServerRPC::rejectInvalidRequest() 00171 { 00172 reject(Status::INVALID_REQUEST); 00173 } 00174 00175 void 00176 ServerRPC::closeSession() 00177 { 00178 active = false; 00179 opaqueRPC.closeSession(); 00180 } 00181 00182 void 00183 ServerRPC::reject(RPC::Protocol::Status status) 00184 { 00185 active = false; 00186 ResponseHeaderVersion1& responseHeader = *new ResponseHeaderVersion1(); 00187 responseHeader.prefix.status = status; 00188 responseHeader.prefix.toBigEndian(); 00189 responseHeader.toBigEndian(); 00190 opaqueRPC.response.setData( 00191 &responseHeader, 00192 sizeof(responseHeader), 00193 Core::Buffer::deleteObjectFn<ResponseHeaderVersion1*>); 00194 opaqueRPC.sendReply(); 00195 } 00196 00197 00198 } // namespace LogCabin::RPC 00199 } // namespace LogCabin