LogCabin
RPC/ClientSession.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2014-2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <memory>
00018 #include <mutex>
00019 #include <string>
00020 #include <unordered_map>
00021 
00022 #include "Core/Buffer.h"
00023 #include "Core/ConditionVariable.h"
00024 #include "Core/Config.h"
00025 #include "Event/Timer.h"
00026 #include "RPC/Address.h"
00027 #include "RPC/OpaqueClientRPC.h"
00028 #include "RPC/MessageSocket.h"
00029 
00030 #ifndef LOGCABIN_RPC_CLIENTSESSION_H
00031 #define LOGCABIN_RPC_CLIENTSESSION_H
00032 
00033 namespace LogCabin {
00034 
00035 // forward declaration
00036 namespace Event {
00037 class Loop;
00038 };
00039 
00040 namespace RPC {
00041 
00042 /**
00043  * A ClientSession is used to initiate OpaqueClientRPCs. It encapsulates a
00044  * connection to a server. Sessions can be relatively expensive to create, so
00045  * clients should keep them around.
00046  */
00047 class ClientSession {
00048   public:
00049     /// Clock used for timeouts.
00050     typedef Address::Clock Clock;
00051     /// Type for absolute time values used for timeouts.
00052     typedef Address::TimePoint TimePoint;
00053 
00054   private:
00055     /**
00056      * This constructor is private because the class must be allocated in a
00057      * particular way. See #makeSession().
00058      */
00059     ClientSession(Event::Loop& eventLoop,
00060                   const Address& address,
00061                   uint32_t maxMessageLength,
00062                   TimePoint timeout,
00063                   const Core::Config& config);
00064   public:
00065     /**
00066      * Return a new ClientSession object. This object is managed by a
00067      * std::shared_ptr to ensure that it remains alive while there are
00068      * outstanding RPCs.
00069      *
00070      * This should only be used from worker threads, as it invokes possibly
00071      * long-running syscalls.
00072      *
00073      * \param eventLoop
00074      *      Event::Loop that will be used to find out when the underlying
00075      *      socket may be read from or written to without blocking.
00076      * \param address
00077      *      The RPC server address on which to connect.
00078      * \param maxMessageLength
00079      *      The maximum number of bytes to allow per request/response. This
00080      *      exists to limit the amount of buffer space a single RPC can use.
00081      *      Attempting to send longer requests will PANIC; attempting to
00082      *      receive longer requests will disconnect the underlying socket.
00083      * \param timeout
00084      *      After this time has elapsed, stop trying to initiate the connection
00085      *      and leave the session in an error state.
00086      * \param config
00087      *      General settings. This object does not keep a reference.
00088      */
00089     static std::shared_ptr<ClientSession>
00090     makeSession(Event::Loop& eventLoop,
00091                 const Address& address,
00092                 uint32_t maxMessageLength,
00093                 TimePoint timeout,
00094                 const Core::Config& config);
00095 
00096     /**
00097      * Return a ClientSession object that's already in an error state. This can
00098      * be useful for delaying errors until an RPC is waited on.
00099      * \param eventLoop
00100      *      Ignored but usually readily available to callers and needed to
00101      *      satisfy type requirements.
00102      * \param errorMessage
00103      *      Description of the error, as will be returned by getErrorMessage()
00104      *      later.
00105      */
00106     static std::shared_ptr<ClientSession>
00107     makeErrorSession(Event::Loop& eventLoop,
00108                      const std::string& errorMessage);
00109 
00110     /**
00111      * Destructor.
00112      */
00113     ~ClientSession();
00114 
00115     /**
00116      * Initiate an RPC.
00117      * This method is safe to call from any thread.
00118      * \param request
00119      *      The contents of the RPC request.
00120      * \return
00121      *      This is be used to wait for and retrieve the reply to the RPC.
00122      */
00123     OpaqueClientRPC sendRequest(Core::Buffer request);
00124 
00125     /**
00126      * If the socket has been disconnected, return a descriptive message.
00127      * The suggested way to detect errors is to wait until an RPC returns an
00128      * error. This method can be used to detect errors earlier.
00129      *
00130      * This method is safe to call from any thread.
00131      *
00132      * \return
00133      *      If an error has occurred, a message describing that error.
00134      *      Otherwise, an empty string.
00135      */
00136     std::string getErrorMessage() const;
00137 
00138     /**
00139      * Return a string describing this session. It will include the address of
00140      * the server and, if the session has an error, the error message.
00141      */
00142     std::string toString() const;
00143 
00144   private:
00145 
00146     /**
00147      * This handles events from #messageSocket.
00148      */
00149     class MessageSocketHandler : public MessageSocket::Handler {
00150       public:
00151         explicit MessageSocketHandler(ClientSession& clientSession);
00152         void handleReceivedMessage(MessageId messageId, Core::Buffer message);
00153         void handleDisconnect();
00154         ClientSession& session;
00155     };
00156 
00157     /**
00158      * This contains an expected response for a OpaqueClientRPC object.
00159      * This is created when the OpaqueClientRPC is created; it is deleted when
00160      * the OpaqueClientRPC object is either canceled or updated with a
00161      * response/error.
00162      */
00163     struct Response {
00164         /**
00165          * Constructor.
00166          */
00167         Response();
00168         /**
00169          * Current state of the RPC.
00170          */
00171         enum {
00172           /**
00173            * Waiting for a reply from the server.
00174            */
00175           WAITING,
00176           /**
00177            * Received a reply (find it in #reply).
00178            */
00179           HAS_REPLY,
00180           /**
00181            * The RPC has been canceled by another thread.
00182            */
00183           CANCELED,
00184         } status;
00185         /**
00186          * The contents of the response. This is valid when
00187          * #status is HAS_REPLY.
00188          */
00189         Core::Buffer reply;
00190         /**
00191          * If true, a thread is blocked waiting on #ready,
00192          * and this object may not be deleted.
00193          */
00194         bool hasWaiter;
00195         /**
00196          * OpaqueClientRPC objects wait on this condition variable inside of
00197          * wait(). It is notified when a new response arrives, the session
00198          * is disconnected, or the RPC is canceled.
00199          */
00200         Core::ConditionVariable ready;
00201     };
00202 
00203     /**
00204      * This is used to time out RPCs and sessions when the server is no longer
00205      * responding. After a timeout period, the client will send a ping to the
00206      * server. If no response is received within another timeout period, the
00207      * session is closed.
00208      */
00209     class Timer : public Event::Timer {
00210       public:
00211         explicit Timer(ClientSession& session);
00212         void handleTimerEvent();
00213         ClientSession& session;
00214     };
00215 
00216     // The cancel(), update(), and wait() methods are used by OpaqueClientRPC.
00217     friend class OpaqueClientRPC;
00218 
00219     /**
00220      * Called by the RPC when it is no longer interested in its response.
00221      *
00222      * This may be called while holding the RPC's lock.
00223      *
00224      * TODO(ongaro): It'd be nice to cancel sending the request if it hasn't
00225      * already gone out, but I guess that's going to be a pretty rare case.
00226      */
00227     void cancel(OpaqueClientRPC& rpc);
00228 
00229     /**
00230      * Called by the RPC when it wants to be learn of its response
00231      * (non-blocking).
00232      *
00233      * This must be called while holding the RPC's lock.
00234      */
00235     void update(OpaqueClientRPC& rpc);
00236 
00237     /**
00238      * Called by the RPC to wait for its response (blocking). The caller should
00239      * call update() after this returns to learn of the response.
00240      *
00241      * This must not be called while holding the RPC's lock.
00242      * \param rpc
00243      *      Wait for response to this.
00244      * \param timeout
00245      *      After this time has elapsed, stop waiting and return. The RPC's
00246      *      results will probably not be available yet in this case.
00247      */
00248     void wait(const OpaqueClientRPC& rpc, TimePoint timeout);
00249 
00250     /**
00251      * This is used to keep this object alive while there are outstanding RPCs.
00252      */
00253     std::weak_ptr<ClientSession> self;
00254 
00255     /**
00256      * The number of nanoseconds to wait until the client gets suspicious
00257      * about the server not responding. After this amount of time elapses, the
00258      * client will send a ping to the server. If no response is received within
00259      * another PING_TIMEOUT_NS milliseconds, the session is closed.
00260      *
00261      * TODO(ongaro): How should this value be chosen?
00262      * Ideally, you probably want this to be set to something like the 99-th
00263      * percentile of your RPC latency.
00264      *
00265      * TODO(ongaro): How does this interact with TCP?
00266      */
00267     const uint64_t PING_TIMEOUT_NS;
00268 
00269     /**
00270      * The event loop that is used for non-blocking I/O.
00271      */
00272     Event::Loop& eventLoop;
00273 
00274     /**
00275      * The RPC server address provided to the constructor.
00276      */
00277     const Address address;
00278 
00279     /**
00280      * Receives events from #messageSocket.
00281      */
00282     MessageSocketHandler messageSocketHandler;
00283 
00284     /**
00285      * This is used to time out RPCs and sessions when the server is no longer
00286      * responding. See Timer.
00287      */
00288     Timer timer;
00289 
00290     /**
00291      * This mutex protects several members of this class:
00292      *  - #nextMessageId
00293      *  - #responses
00294      *  - #errorMessage
00295      *  - #numActiveRPCs
00296      *  - #activePing
00297      */
00298     mutable std::mutex mutex;
00299 
00300     /**
00301      * The message ID to assign to the next RPC. These start at 0 and
00302      * increment from there.
00303      */
00304     MessageSocket::MessageId nextMessageId;
00305 
00306     /**
00307      * A map from MessageId to Response objects that is used to store the
00308      * response to RPCs and look it up for OpaqueClientRPC objects. The
00309      * Response objects mapped to must be deleted manually when removed from
00310      * this map (gcc 4.4 doesn't support mapping to non-copyable objects).
00311      */
00312     std::unordered_map<MessageSocket::MessageId, Response*> responses;
00313 
00314     /**
00315      * If this session is disconnected then this holds the error message.
00316      * All new RPCs will be immediately 'ready' with this error message.
00317      * Otherwise, this is the empty string.
00318      */
00319     std::string errorMessage;
00320 
00321     /**
00322      * The number of outstanding RPC requests that have been sent but whose
00323      * responses have not yet been received. This does not include ping
00324      * requests sent by the #timer (which aren't real RPCs).
00325      * This is used to determine when to schedule the timer: the timer is
00326      * scheduled if numActiveRPCs is non-zero.
00327      */
00328     uint32_t numActiveRPCs;
00329 
00330     /**
00331      * When numActiveRPCs is > 0, this field indicates that we are waiting for
00332      * a ping response as evidence that the server is still alive.
00333      * When numActiveRPCs = 0, this field is undefined.
00334      */
00335     bool activePing;
00336 
00337     /**
00338      * The MessageSocket used to send RPC requests and receive RPC responses.
00339      * This may be NULL if the socket was never created. In this case,
00340      * #errorMessage will be set.
00341      */
00342     std::unique_ptr<MessageSocket> messageSocket;
00343 
00344     /**
00345      * Registers timer with the event loop.
00346      */
00347     Event::Timer::Monitor timerMonitor;
00348 
00349     /**
00350      * Usually set to connect() but mocked out in some unit tests.
00351      */
00352     static std::function<
00353         int(int sockfd,
00354             const struct sockaddr *addr,
00355             socklen_t addrlen)> connectFn;
00356 
00357     // ClientSession is non-copyable.
00358     ClientSession(const ClientSession&) = delete;
00359     ClientSession& operator=(const ClientSession&) = delete;
00360 }; // class ClientSession
00361 
00362 } // namespace LogCabin::RPC
00363 } // namespace LogCabin
00364 
00365 #endif /* LOGCABIN_RPC_CLIENTSESSION_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines