LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2014-2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include "Client/SessionManager.h" 00018 #include "Core/ProtoBuf.h" 00019 #include "Protocol/Common.h" 00020 #include "RPC/ClientRPC.h" 00021 #include "RPC/ClientSession.h" 00022 #include "build/Protocol/Client.pb.h" 00023 00024 namespace LogCabin { 00025 namespace Client { 00026 00027 SessionManager::SessionManager(Event::Loop& eventLoop, 00028 const Core::Config& config) 00029 : eventLoop(eventLoop) 00030 , config(config) 00031 , skipVerify(false) 00032 { 00033 } 00034 00035 std::shared_ptr<RPC::ClientSession> 00036 SessionManager::createSession(const RPC::Address& address, 00037 RPC::Address::TimePoint timeout, 00038 ClusterUUID* clusterUUID, 00039 ServerId* serverId) 00040 { 00041 std::shared_ptr<RPC::ClientSession> session = 00042 RPC::ClientSession::makeSession( 00043 eventLoop, 00044 address, 00045 Protocol::Common::MAX_MESSAGE_LENGTH, 00046 timeout, 00047 config); 00048 if (!session->getErrorMessage().empty() || skipVerify) 00049 return session; 00050 00051 Protocol::Client::VerifyRecipient::Request request; 00052 if (clusterUUID != NULL) { 00053 std::string uuid = clusterUUID->getOrDefault(); 00054 if (!uuid.empty()) 00055 request.set_cluster_uuid(uuid); 00056 } 00057 if (serverId != NULL) { 00058 std::pair<bool, uint64_t> c = serverId->get(); 00059 if (c.first) 00060 request.set_server_id(c.second); 00061 } 00062 00063 RPC::ClientRPC rpc(session, 00064 Protocol::Common::ServiceId::CLIENT_SERVICE, 00065 1, 00066 Protocol::Client::OpCode::VERIFY_RECIPIENT, 00067 request); 00068 00069 typedef RPC::ClientRPC::Status RPCStatus; 00070 Protocol::Client::VerifyRecipient::Response response; 00071 Protocol::Client::Error error; 00072 RPCStatus status = rpc.waitForReply(&response, &error, timeout); 00073 00074 // Decode the response 00075 switch (status) { 00076 case RPCStatus::OK: 00077 if (response.ok()) { 00078 if (!request.has_cluster_uuid() && 00079 response.has_cluster_uuid() && 00080 !response.cluster_uuid().empty() && 00081 clusterUUID != NULL) { 00082 clusterUUID->set(response.cluster_uuid()); 00083 } 00084 if (!request.has_server_id() && 00085 response.has_server_id() && 00086 serverId != NULL) { 00087 serverId->set(response.server_id()); 00088 } 00089 return session; 00090 } else { 00091 ERROR("Intended recipient was not at %s: %s. " 00092 "Closing session.", 00093 session->toString().c_str(), 00094 response.error().c_str()); 00095 break; 00096 } 00097 case RPCStatus::RPC_FAILED: 00098 break; 00099 case RPCStatus::TIMEOUT: 00100 break; 00101 case RPCStatus::SERVICE_SPECIFIC_ERROR: 00102 // Hmm, we don't know what this server is trying to tell us, 00103 // but something is wrong. The server shouldn't reply back with 00104 // error codes we don't understand. That's why we gave it a 00105 // serverSpecificErrorVersion number in the request header. 00106 PANIC("Unknown error code %u returned in service-specific " 00107 "error. This probably indicates a bug in the server", 00108 error.error_code()); 00109 break; 00110 case RPCStatus::RPC_CANCELED: 00111 PANIC("RPC canceled unexpectedly"); 00112 case RPCStatus::INVALID_SERVICE: 00113 PANIC("The server isn't running the ClientService"); 00114 case RPCStatus::INVALID_REQUEST: 00115 PANIC("The server's ClientService doesn't support the " 00116 "VerifyRecipient RPC or claims the request is malformed"); 00117 } 00118 return RPC::ClientSession::makeErrorSession( 00119 eventLoop, 00120 Core::StringUtil::format("Verifying recipient with %s failed " 00121 "(after connecting over TCP)", 00122 address.toString().c_str())); 00123 } 00124 00125 } // namespace LogCabin::Client 00126 } // namespace LogCabin