LogCabin
Client/LeaderRPC.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2014-2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <unistd.h>
00018 
00019 #include "Client/Backoff.h"
00020 #include "Client/LeaderRPC.h"
00021 #include "Core/Debug.h"
00022 #include "Core/Util.h"
00023 #include "Protocol/Common.h"
00024 #include "RPC/ClientSession.h"
00025 #include "RPC/ClientRPC.h"
00026 
00027 namespace LogCabin {
00028 namespace Client {
00029 
00030 //// class LeaderRPCBase ////
00031 
00032 std::ostream&
00033 operator<<(std::ostream& os, const LeaderRPCBase::Status& status)
00034 {
00035     switch (status) {
00036         case LeaderRPCBase::Status::OK:
00037             os << "Status::OK";
00038             break;
00039         case LeaderRPCBase::Status::TIMEOUT:
00040             os << "Status::TIMEOUT";
00041             break;
00042         case LeaderRPCBase::Status::INVALID_REQUEST:
00043             os << "Status::INVALID_REQUEST";
00044             break;
00045     }
00046     return os;
00047 }
00048 
00049 std::ostream&
00050 operator<<(std::ostream& os, const LeaderRPCBase::Call::Status& status)
00051 {
00052     switch (status) {
00053         case LeaderRPCBase::Call::Status::OK:
00054             os << "Status::OK";
00055             break;
00056         case LeaderRPCBase::Call::Status::RETRY:
00057             os << "Status::RETRY";
00058             break;
00059         case LeaderRPCBase::Call::Status::TIMEOUT:
00060             os << "Status::TIMEOUT";
00061             break;
00062         case LeaderRPCBase::Call::Status::INVALID_REQUEST:
00063             os << "Status::INVALID_REQUEST";
00064             break;
00065     }
00066     return os;
00067 }
00068 
00069 //// class LeaderRPC::Call ////
00070 
00071 LeaderRPC::Call::Call(LeaderRPC& leaderRPC)
00072     : leaderRPC(leaderRPC)
00073     , cachedSession()
00074     , rpc()
00075 {
00076 }
00077 
00078 LeaderRPC::Call::~Call()
00079 {
00080 }
00081 
00082 void
00083 LeaderRPC::Call::start(OpCode opCode,
00084                        const google::protobuf::Message& request,
00085                        TimePoint timeout)
00086 {
00087     // Save a reference to the leaderSession
00088     cachedSession = leaderRPC.getSession(timeout);
00089     rpc = RPC::ClientRPC(cachedSession,
00090                          Protocol::Common::ServiceId::CLIENT_SERVICE,
00091                          1,
00092                          opCode,
00093                          request);
00094 }
00095 
00096 void
00097 LeaderRPC::Call::cancel()
00098 {
00099     rpc.cancel();
00100     cachedSession.reset();
00101 }
00102 
00103 LeaderRPC::Call::Status
00104 LeaderRPC::Call::wait(google::protobuf::Message& response,
00105                       TimePoint timeout)
00106 {
00107     typedef RPC::ClientRPC::Status RPCStatus;
00108     Protocol::Client::Error error;
00109     RPCStatus status = rpc.waitForReply(&response, &error, timeout);
00110 
00111     // Decode the response
00112     switch (status) {
00113         case RPCStatus::OK:
00114             leaderRPC.reportSuccess(cachedSession);
00115             return Call::Status::OK;
00116         case RPCStatus::SERVICE_SPECIFIC_ERROR:
00117             switch (error.error_code()) {
00118                 case Protocol::Client::Error::NOT_LEADER:
00119                     // The server we tried is not the current cluster leader.
00120                     if (error.has_leader_hint()) {
00121                         leaderRPC.reportRedirect(cachedSession,
00122                                                  error.leader_hint());
00123                     } else {
00124                         leaderRPC.reportNotLeader(cachedSession);
00125                     }
00126                     break;
00127                 default:
00128                     // Hmm, we don't know what this server is trying to tell
00129                     // us, but something is wrong. The server shouldn't reply
00130                     // back with error codes we don't understand. That's why we
00131                     // gave it a serverSpecificErrorVersion number in the
00132                     // request header.
00133                     PANIC("Unknown error code %u returned in service-specific "
00134                           "error. This probably indicates a bug in the server",
00135                           error.error_code());
00136             }
00137             break;
00138         case RPCStatus::RPC_FAILED:
00139             leaderRPC.reportFailure(cachedSession);
00140             break;
00141         case RPCStatus::RPC_CANCELED:
00142             break;
00143         case RPCStatus::TIMEOUT:
00144             return Call::Status::TIMEOUT;
00145         case RPCStatus::INVALID_SERVICE:
00146             PANIC("The server isn't running the ClientService");
00147         case RPCStatus::INVALID_REQUEST:
00148             return Call::Status::INVALID_REQUEST;
00149     }
00150     if (timeout < Clock::now())
00151         return Call::Status::TIMEOUT;
00152     else
00153         return Call::Status::RETRY;
00154 }
00155 
00156 
00157 //// class LeaderRPC ////
00158 
00159 LeaderRPC::LeaderRPC(const RPC::Address& hosts,
00160                      SessionManager::ClusterUUID& clusterUUID,
00161                      Backoff& sessionCreationBackoff,
00162                      SessionManager& sessionManager)
00163     : clusterUUID(clusterUUID)
00164     , sessionCreationBackoff(sessionCreationBackoff)
00165     , sessionManager(sessionManager)
00166     , mutex()
00167     , isConnecting(false)
00168     , connected()
00169     , hosts(hosts)
00170     , leaderHint()
00171     , leaderSession() // set by connect()
00172     , failuresSinceLastSuccess(0)
00173 {
00174 }
00175 
00176 LeaderRPC::~LeaderRPC()
00177 {
00178     leaderSession.reset();
00179 }
00180 
00181 LeaderRPC::Status
00182 LeaderRPC::call(OpCode opCode,
00183                 const google::protobuf::Message& request,
00184                 google::protobuf::Message& response,
00185                 TimePoint timeout)
00186 {
00187     while (true) {
00188         Call c(*this);
00189         c.start(opCode, request, timeout);
00190         Call::Status callStatus = c.wait(response, timeout);
00191         switch (callStatus) {
00192             case Call::Status::OK:
00193                 return Status::OK;
00194             case Call::Status::TIMEOUT:
00195                 return Status::TIMEOUT;
00196             case Call::Status::RETRY:
00197                 break;
00198             case Call::Status::INVALID_REQUEST:
00199                 return Status::INVALID_REQUEST;
00200         }
00201     }
00202 }
00203 
00204 std::unique_ptr<LeaderRPCBase::Call>
00205 LeaderRPC::makeCall()
00206 {
00207     return std::unique_ptr<LeaderRPCBase::Call>(new LeaderRPC::Call(*this));
00208 }
00209 
00210 std::shared_ptr<RPC::ClientSession>
00211 LeaderRPC::getSession(TimePoint timeout)
00212 {
00213     std::unique_lock<std::mutex> lockGuard(mutex);
00214 
00215     // Threads used to hold the mutex while creating a new session, but then to
00216     // respect timeouts, you'd have to acquire the mutex with a timeout. This
00217     // condition variable approach seems cleaner to me, where the mutex is only
00218     // held during computation, not during I/O. See #173. -Diego
00219     while (isConnecting) {
00220         // Go to sleep, as another thread is already creating a new session.
00221         connected.wait_until(lockGuard, timeout);
00222         if (Clock::now() > timeout) {
00223             return RPC::ClientSession::makeErrorSession(
00224                 sessionManager.eventLoop,
00225                 "Failed to get session to leader in time that another thread "
00226                 "is creating: timeout expired");
00227         }
00228     }
00229 
00230     if (leaderSession)
00231         return leaderSession;
00232 
00233     // This thread will create a new session; others should wait.
00234     isConnecting = true;
00235 
00236     // Determine which address to connect to while still holding the lock.
00237     RPC::Address address;
00238     if (leaderHint.empty()) {
00239         // Hope the next random host is the leader. If that turns out to be
00240         // false, we will soon find out.
00241         address = hosts;
00242     } else {
00243         // Connect to the leader given by 'leaderHint'.
00244         address = RPC::Address(leaderHint, Protocol::Common::DEFAULT_PORT);
00245         // Don't clear leaderHint until down below, in case this thread times
00246         // out before making any use of it.
00247     }
00248 
00249     // Don't hang onto the mutex for any of this blocking stuff (doing so would
00250     // delay other threads with shorter timeouts; see #173).
00251     std::shared_ptr<RPC::ClientSession> session;
00252     bool usedHint = true;
00253     {
00254         Core::MutexUnlock<std::mutex> unlockGuard(lockGuard);
00255 
00256         // sleep if we've tried to connect too much recently
00257         sessionCreationBackoff.delayAndBegin(timeout);
00258         if (Clock::now() > timeout) {
00259             session = RPC::ClientSession::makeErrorSession(
00260                     sessionManager.eventLoop,
00261                     "Failed to create session to leader: timeout expired");
00262             usedHint = false;
00263         } else {
00264             address.refresh(timeout);
00265             VERBOSE("Connecting to: %s", address.toString().c_str());
00266             session = sessionManager.createSession(
00267                     address,
00268                     timeout,
00269                     &clusterUUID);
00270         }
00271     }
00272 
00273     // Assign back to leaderSession only now that we have the lock again.
00274     leaderSession = session;
00275     if (usedHint)
00276         leaderHint.clear();
00277     // Unblock other threads and return.
00278     isConnecting = false;
00279     connected.notify_all();
00280     return leaderSession;
00281 }
00282 
00283 void
00284 LeaderRPC::reportFailure(std::shared_ptr<RPC::ClientSession> cachedSession)
00285 {
00286     std::lock_guard<std::mutex> lockGuard(mutex);
00287     if (cachedSession != leaderSession)
00288         return;
00289     ++failuresSinceLastSuccess;
00290     if (Core::Util::isPowerOfTwo(failuresSinceLastSuccess)) {
00291         NOTICE("RPC to server failed: %s "
00292                "(there have been %lu failed attempts during this outage)",
00293                cachedSession->toString().c_str(),
00294                failuresSinceLastSuccess);
00295     } else {
00296         VERBOSE("RPC to server failed: %s "
00297                 "(there have been %lu failed attempts during this outage)",
00298                 cachedSession->toString().c_str(),
00299                 failuresSinceLastSuccess);
00300     }
00301     leaderSession.reset();
00302 }
00303 
00304 void
00305 LeaderRPC::reportNotLeader(std::shared_ptr<RPC::ClientSession> cachedSession)
00306 {
00307     std::lock_guard<std::mutex> lockGuard(mutex);
00308     if (cachedSession != leaderSession)
00309         return;
00310     ++failuresSinceLastSuccess;
00311     if (Core::Util::isPowerOfTwo(failuresSinceLastSuccess)) {
00312         NOTICE("Server [%s] is not leader, will try random host next "
00313                "(there have been %lu failed attempts during this outage)",
00314                cachedSession->toString().c_str(),
00315                failuresSinceLastSuccess);
00316     } else {
00317         VERBOSE("Server [%s] is not leader, will try random host next "
00318                 "(there have been %lu failed attempts during this outage)",
00319                 cachedSession->toString().c_str(),
00320                 failuresSinceLastSuccess);
00321     }
00322     leaderSession.reset();
00323 }
00324 
00325 void
00326 LeaderRPC::reportRedirect(std::shared_ptr<RPC::ClientSession> cachedSession,
00327                           const std::string& host)
00328 {
00329     std::lock_guard<std::mutex> lockGuard(mutex);
00330     if (cachedSession != leaderSession)
00331         return;
00332     ++failuresSinceLastSuccess;
00333     if (Core::Util::isPowerOfTwo(failuresSinceLastSuccess)) {
00334         NOTICE("Server [%s] is not leader, will try suggested %s next "
00335                "(there have been %lu failed attempts during this outage)",
00336                cachedSession->toString().c_str(),
00337                host.c_str(),
00338                failuresSinceLastSuccess);
00339     } else {
00340         VERBOSE("Server [%s] is not leader, will try suggested %s next "
00341                 "(there have been %lu failed attempts during this outage)",
00342                 cachedSession->toString().c_str(),
00343                 host.c_str(),
00344                 failuresSinceLastSuccess);
00345     }
00346     leaderSession.reset();
00347     leaderHint = host;
00348 }
00349 
00350 void
00351 LeaderRPC::reportSuccess(std::shared_ptr<RPC::ClientSession> cachedSession)
00352 {
00353     std::lock_guard<std::mutex> lockGuard(mutex);
00354     if (cachedSession != leaderSession)
00355         return;
00356     if (failuresSinceLastSuccess > 0) {
00357         NOTICE("Successfully connected to leader [%s] after %lu failures",
00358                cachedSession->toString().c_str(),
00359                failuresSinceLastSuccess);
00360         failuresSinceLastSuccess = 0;
00361     }
00362 }
00363 
00364 
00365 } // namespace LogCabin::Client
00366 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines