LogCabin
Client/SessionManager.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2014-2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include "Client/SessionManager.h"
00018 #include "Core/ProtoBuf.h"
00019 #include "Protocol/Common.h"
00020 #include "RPC/ClientRPC.h"
00021 #include "RPC/ClientSession.h"
00022 #include "build/Protocol/Client.pb.h"
00023 
00024 namespace LogCabin {
00025 namespace Client {
00026 
00027 SessionManager::SessionManager(Event::Loop& eventLoop,
00028                                const Core::Config& config)
00029     : eventLoop(eventLoop)
00030     , config(config)
00031     , skipVerify(false)
00032 {
00033 }
00034 
00035 std::shared_ptr<RPC::ClientSession>
00036 SessionManager::createSession(const RPC::Address& address,
00037                               RPC::Address::TimePoint timeout,
00038                               ClusterUUID* clusterUUID,
00039                               ServerId* serverId)
00040 {
00041     std::shared_ptr<RPC::ClientSession> session =
00042         RPC::ClientSession::makeSession(
00043                         eventLoop,
00044                         address,
00045                         Protocol::Common::MAX_MESSAGE_LENGTH,
00046                         timeout,
00047                         config);
00048     if (!session->getErrorMessage().empty() || skipVerify)
00049         return session;
00050 
00051     Protocol::Client::VerifyRecipient::Request request;
00052     if (clusterUUID != NULL) {
00053         std::string uuid = clusterUUID->getOrDefault();
00054         if (!uuid.empty())
00055             request.set_cluster_uuid(uuid);
00056     }
00057     if (serverId != NULL) {
00058         std::pair<bool, uint64_t> c = serverId->get();
00059         if (c.first)
00060             request.set_server_id(c.second);
00061     }
00062 
00063     RPC::ClientRPC rpc(session,
00064                        Protocol::Common::ServiceId::CLIENT_SERVICE,
00065                        1,
00066                        Protocol::Client::OpCode::VERIFY_RECIPIENT,
00067                        request);
00068 
00069     typedef RPC::ClientRPC::Status RPCStatus;
00070     Protocol::Client::VerifyRecipient::Response response;
00071     Protocol::Client::Error error;
00072     RPCStatus status = rpc.waitForReply(&response, &error, timeout);
00073 
00074     // Decode the response
00075     switch (status) {
00076         case RPCStatus::OK:
00077             if (response.ok()) {
00078                 if (!request.has_cluster_uuid() &&
00079                     response.has_cluster_uuid() &&
00080                     !response.cluster_uuid().empty() &&
00081                     clusterUUID != NULL) {
00082                     clusterUUID->set(response.cluster_uuid());
00083                 }
00084                 if (!request.has_server_id() &&
00085                     response.has_server_id() &&
00086                     serverId != NULL) {
00087                     serverId->set(response.server_id());
00088                 }
00089                 return session;
00090             } else {
00091                 ERROR("Intended recipient was not at %s: %s. "
00092                       "Closing session.",
00093                       session->toString().c_str(),
00094                       response.error().c_str());
00095                 break;
00096             }
00097         case RPCStatus::RPC_FAILED:
00098             break;
00099         case RPCStatus::TIMEOUT:
00100             break;
00101         case RPCStatus::SERVICE_SPECIFIC_ERROR:
00102             // Hmm, we don't know what this server is trying to tell us,
00103             // but something is wrong. The server shouldn't reply back with
00104             // error codes we don't understand. That's why we gave it a
00105             // serverSpecificErrorVersion number in the request header.
00106             PANIC("Unknown error code %u returned in service-specific "
00107                   "error. This probably indicates a bug in the server",
00108                   error.error_code());
00109             break;
00110         case RPCStatus::RPC_CANCELED:
00111             PANIC("RPC canceled unexpectedly");
00112         case RPCStatus::INVALID_SERVICE:
00113             PANIC("The server isn't running the ClientService");
00114         case RPCStatus::INVALID_REQUEST:
00115             PANIC("The server's ClientService doesn't support the "
00116                   "VerifyRecipient RPC or claims the request is malformed");
00117     }
00118     return RPC::ClientSession::makeErrorSession(
00119         eventLoop,
00120         Core::StringUtil::format("Verifying recipient with %s failed "
00121                                  "(after connecting over TCP)",
00122                                  address.toString().c_str()));
00123 }
00124 
00125 } // namespace LogCabin::Client
00126 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines