LogCabin
Client/LeaderRPC.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2014-2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <cinttypes>
00018 #include <deque>
00019 #include <memory>
00020 #include <mutex>
00021 
00022 #include "build/Protocol/Client.pb.h"
00023 #include "Client/SessionManager.h"
00024 #include "Core/ConditionVariable.h"
00025 #include "RPC/Address.h"
00026 #include "RPC/ClientRPC.h"
00027 
00028 #ifndef LOGCABIN_CLIENT_LEADERRPC_H
00029 #define LOGCABIN_CLIENT_LEADERRPC_H
00030 
00031 namespace LogCabin {
00032 
00033 // forward declaration
00034 namespace Core {
00035 class Config;
00036 }
00037 
00038 // forward declaration
00039 namespace Event {
00040 class Loop;
00041 }
00042 
00043 // forward declaration
00044 namespace RPC {
00045 class ClientSession;
00046 }
00047 
00048 namespace Client {
00049 
00050 // forward declaration
00051 class Backoff;
00052 
00053 /**
00054  * This class is used to send RPCs from clients to the leader of the LogCabin
00055  * cluster. It automatically finds and connects to the leader and transparently
00056  * rolls over to a new leader when necessary.
00057  *
00058  * There are two implementations of this interface: LeaderRPC is probably the
00059  * one you're interested in. LeaderRPCMock is used for unit testing only.
00060  */
00061 class LeaderRPCBase {
00062   public:
00063     /// Clock used for timeouts.
00064     typedef RPC::ClientRPC::Clock Clock;
00065     /// Type for absolute time values used for timeouts.
00066     typedef RPC::ClientRPC::TimePoint TimePoint;
00067 
00068     /**
00069      * RPC operation code.
00070      */
00071     typedef Protocol::Client::OpCode OpCode;
00072 
00073     /**
00074      * Return type for LeaderRPCBase::call().
00075      */
00076     enum class Status {
00077         /**
00078          * The RPC completed scucessfully.
00079          */
00080         OK,
00081         /**
00082          * The given timeout elapsed before the RPC completed.
00083          */
00084         TIMEOUT,
00085         /**
00086          * The server rejected the request, probably because it doesn't
00087          * support the opcode, or maybe the request arguments were invalid.
00088          */
00089         INVALID_REQUEST,
00090     };
00091 
00092     /**
00093      * Print out a Status for debugging purposes.
00094      */
00095     friend std::ostream& operator<<(std::ostream& os, const Status& server);
00096 
00097     /// Constructor.
00098     LeaderRPCBase() {}
00099 
00100     /// Destructor.
00101     virtual ~LeaderRPCBase() {}
00102 
00103     /**
00104      * Execute an RPC on the cluster leader.
00105      * This class guarantees that the RPC will be executed at least once.
00106      * \param opCode
00107      *      RPC operation code. The caller must guarantee that this is a valid
00108      *      opCode. (If the server rejects it, this will PANIC.)
00109      * \param request
00110      *      The parameters for the operation. The caller must guarantee that
00111      *      this is a well-formed request. (If the server rejects it, this will
00112      *      PANIC.)
00113      * \param timeout
00114      *      After this time has elapsed, stop waiting and return TIMEOUT.
00115      *      In this case, response will be left unmodified.
00116      * \param[out] response
00117      *      The response to the operation will be filled in here.
00118      */
00119     virtual Status call(OpCode opCode,
00120                         const google::protobuf::Message& request,
00121                         google::protobuf::Message& response,
00122                         TimePoint timeout) = 0;
00123 
00124     /**
00125      * An asynchronous version of call(). This allows multiple RPCs to be
00126      * executed concurrently, or canceling an RPC that is running on a separate
00127      * thread.
00128      */
00129     class Call {
00130       public:
00131         /**
00132          * Return type for LeaderRPCBase::Call::wait().
00133          */
00134         enum class Status {
00135             /**
00136              * The RPC completed scucessfully.
00137              */
00138             OK,
00139             /**
00140              * The RPC did not succeed, nor did it timeout.
00141              * The caller should try again.
00142              * TODO(ongaro): this is a bit ugly
00143              */
00144             RETRY,
00145             /**
00146              * The given timeout elapsed before the RPC completed.
00147              */
00148             TIMEOUT,
00149             /**
00150              * The server rejected the request, probably because it doesn't
00151              * support the opcode, or maybe the request arguments were invalid.
00152              */
00153             INVALID_REQUEST,
00154         };
00155 
00156         /**
00157          * Print out a Status for debugging purposes.
00158          */
00159         friend std::ostream& operator<<(std::ostream& os,
00160                                         const Status& server);
00161 
00162         /**
00163          * Constructor.
00164          */
00165         Call() {}
00166         /**
00167          * Destructor.
00168          */
00169         virtual ~Call() {}
00170         /**
00171          * Invoke the RPC.
00172          * \param opCode
00173          *      RPC operation code. The caller must guarantee that this is a
00174          *      valid opCode. (If the server rejects it, this will PANIC.)
00175          * \param request
00176          *      The parameters for the operation. The caller must guarantee
00177          *      that this is a well-formed request. (If the server rejects it,
00178          *      this will PANIC.)
00179          * \param timeout
00180          *      After this time has elapsed, stop trying to initiate the
00181          *      connection to the leader and use an invalid session, which will
00182          *      cause the RPC to fail later.
00183          */
00184         virtual void start(OpCode opCode,
00185                            const google::protobuf::Message& request,
00186                            TimePoint timeout) = 0;
00187         /**
00188          * Cancel the RPC. This may only be called after start(), but it may
00189          * be called safely from a separate thread.
00190          */
00191         virtual void cancel() = 0;
00192         /**
00193          * Wait for the RPC to complete.
00194          * \param[out] response
00195          *      If successful, the response to the operation will be filled in
00196          *      here.
00197          * \param timeout
00198          *      After this time has elapsed, stop waiting and return TIMEOUT.
00199          *      In this case, response will be left unmodified.
00200          * \return
00201          *      True if the RPC completed successfully, false otherwise. If
00202          *      this returns false, it is the callers responsibility to start
00203          *      over to achieve the same at-most-once semantics as #call().
00204          */
00205         virtual Status wait(google::protobuf::Message& response,
00206                             TimePoint timeout) = 0;
00207     };
00208 
00209     /**
00210      * Return a new Call object.
00211      */
00212     virtual std::unique_ptr<Call> makeCall() = 0;
00213 
00214     // LeaderRPCBase is not copyable
00215     LeaderRPCBase(const LeaderRPCBase&) = delete;
00216     LeaderRPCBase& operator=(const LeaderRPCBase&) = delete;
00217 };
00218 
00219 
00220 /**
00221  * This is the implementation of LeaderRPCBase that uses the RPC system.
00222  * (The other implementation, LeaderRPCMock, is only used for testing.)
00223  */
00224 class LeaderRPC : public LeaderRPCBase {
00225   public:
00226     /**
00227      * Constructor.
00228      * \param hosts
00229      *      Describe the servers to connect to. This class assumes that
00230      *      refreshing 'hosts' will result in a random host that might be the
00231      *      current cluster leader.
00232      * \param clusterUUID
00233      *      Keeps track of the unique ID for this cluster, if known.
00234      * \param sessionCreationBackoff
00235      *      Used to rate-limit new TCP connections.
00236      * \param sessionManager
00237      *      Used to create new sessions.
00238      */
00239     LeaderRPC(const RPC::Address& hosts,
00240               SessionManager::ClusterUUID& clusterUUID,
00241               Backoff& sessionCreationBackoff,
00242               SessionManager& sessionManager);
00243 
00244     /// Destructor.
00245     ~LeaderRPC();
00246 
00247     /// See LeaderRPCBase::call.
00248     Status call(OpCode opCode,
00249                 const google::protobuf::Message& request,
00250                 google::protobuf::Message& response,
00251                 TimePoint timeout);
00252 
00253     /// See LeaderRPCBase::makeCall().
00254     std::unique_ptr<LeaderRPCBase::Call> makeCall();
00255 
00256   private:
00257 
00258     /// See LeaderRPCBase::Call.
00259     class Call : public LeaderRPCBase::Call {
00260       public:
00261         explicit Call(LeaderRPC& leaderRPC);
00262         ~Call();
00263         void start(OpCode opCode,
00264                    const google::protobuf::Message& request,
00265                    TimePoint timeout);
00266         void cancel();
00267         Status wait(google::protobuf::Message& response,
00268                     TimePoint timeout);
00269         LeaderRPC& leaderRPC;
00270         /**
00271          * Copy of leaderSession when the RPC was started (might have changed
00272          * since).
00273          */
00274         std::shared_ptr<RPC::ClientSession> cachedSession;
00275         /**
00276          * RPC object which may be canceled.
00277          */
00278         RPC::ClientRPC rpc;
00279     };
00280 
00281     /**
00282      * Return a session connected to the most likely cluster leader, creating
00283      * it if necessary.
00284      * \param timeout
00285      *      After this time has elapsed, stop trying to initiate the connection
00286      *      and return an invalid session.
00287      * \return
00288      *      Session on which to execute RPCs.
00289      */
00290     std::shared_ptr<RPC::ClientSession>
00291     getSession(TimePoint timeout);
00292 
00293     /**
00294      * Notify this class that an RPC on the given session failed. This will
00295      * usually cause this class to connect to a random server next time
00296      * getSession() is called.
00297      * \param cachedSession
00298      *      Session previously returned by getSession(). This is used to detect
00299      *      races in which some other thread has already solved the problem.
00300      */
00301     void
00302     reportFailure(std::shared_ptr<RPC::ClientSession> cachedSession);
00303 
00304     /**
00305      * Notify this class that a non-leader server rejected an RPC. This will
00306      * usually cause this class to connect to a random server next time
00307      * getSession() is called.
00308      * \param cachedSession
00309      *      Session previously returned by getSession(). This is used to detect
00310      *      races in which some other thread has already solved the problem.
00311      */
00312     void
00313     reportNotLeader(std::shared_ptr<RPC::ClientSession> cachedSession);
00314 
00315     /**
00316      * Notify this class that an RPC on the given session was redirected by a
00317      * non-leader server. This will usually cause this class to connect to the
00318      * given host the next time getSession() is called.
00319      * \param cachedSession
00320      *      Session previously returned by getSession(). This is used to detect
00321      *      races in which some other thread has already solved the problem.
00322      * \param host
00323      *      Address of the server that is likely the leader.
00324      */
00325     void
00326     reportRedirect(std::shared_ptr<RPC::ClientSession> cachedSession,
00327                    const std::string& host);
00328 
00329     /**
00330      * Notify this class that an RPC on the given session reached a leader.
00331      * This is just here for debug log messages.
00332      * \param cachedSession
00333      *      Session previously returned by getSession(). This is used to detect
00334      *      races in which some other thread has already solved any problems.
00335      */
00336     void
00337     reportSuccess(std::shared_ptr<RPC::ClientSession> cachedSession);
00338 
00339     /**
00340      * Keeps track of the unique ID for this cluster, if known.
00341      */
00342     SessionManager::ClusterUUID& clusterUUID;
00343 
00344     /**
00345      * Used to rate-limit the creation of ClientSession objects (TCP
00346      * connections).
00347      */
00348     Backoff& sessionCreationBackoff;
00349 
00350     /**
00351      * Used to create new sessions.
00352      */
00353     SessionManager& sessionManager;
00354 
00355     /**
00356      * Protects all of the following member variables in this class.
00357      */
00358     std::mutex mutex;
00359 
00360     /**
00361      * Set to true when some thread is already initiating a new session.
00362      * When this is already true, other threads wait on #connected
00363      * rather than starting additional sessions.
00364      */
00365     bool isConnecting;
00366 
00367     /**
00368      * Notified when #isConnecting becomes false (when #leaderSession is set).
00369      */
00370     Core::ConditionVariable connected;
00371 
00372     /**
00373      * An address referring to the hosts in the LogCabin cluster. A random host
00374      * is selected from here when this class doesn't know who the cluster
00375      * leader is.
00376      */
00377     RPC::Address hosts;
00378 
00379     /**
00380      * If nonempty, the address of the server that is likely to be the
00381      * current leader.
00382      */
00383      std::string leaderHint;
00384 
00385     /**
00386      * The goal is to get this session connected to the cluster leader.
00387      * This is never null, but it might sometimes point to the wrong host.
00388      */
00389     std::shared_ptr<RPC::ClientSession> leaderSession;
00390 
00391     /**
00392      * The number of attempted RPCs that have not successfully reached a leader
00393      * since the last time an RPC did. Used for rate-limiting and summarizing
00394      * log messages: they're only printed when this number reaches a power of
00395      * two.
00396      */
00397     uint64_t failuresSinceLastSuccess;
00398 };
00399 
00400 } // namespace LogCabin::Client
00401 } // namespace LogCabin
00402 
00403 #endif /* LOGCABIN_CLIENT_LEADERRPC_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines