LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2014-2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <memory> 00018 #include <mutex> 00019 #include <string> 00020 #include <unordered_map> 00021 00022 #include "Core/Buffer.h" 00023 #include "Core/ConditionVariable.h" 00024 #include "Core/Config.h" 00025 #include "Event/Timer.h" 00026 #include "RPC/Address.h" 00027 #include "RPC/OpaqueClientRPC.h" 00028 #include "RPC/MessageSocket.h" 00029 00030 #ifndef LOGCABIN_RPC_CLIENTSESSION_H 00031 #define LOGCABIN_RPC_CLIENTSESSION_H 00032 00033 namespace LogCabin { 00034 00035 // forward declaration 00036 namespace Event { 00037 class Loop; 00038 }; 00039 00040 namespace RPC { 00041 00042 /** 00043 * A ClientSession is used to initiate OpaqueClientRPCs. It encapsulates a 00044 * connection to a server. Sessions can be relatively expensive to create, so 00045 * clients should keep them around. 00046 */ 00047 class ClientSession { 00048 public: 00049 /// Clock used for timeouts. 00050 typedef Address::Clock Clock; 00051 /// Type for absolute time values used for timeouts. 00052 typedef Address::TimePoint TimePoint; 00053 00054 private: 00055 /** 00056 * This constructor is private because the class must be allocated in a 00057 * particular way. See #makeSession(). 00058 */ 00059 ClientSession(Event::Loop& eventLoop, 00060 const Address& address, 00061 uint32_t maxMessageLength, 00062 TimePoint timeout, 00063 const Core::Config& config); 00064 public: 00065 /** 00066 * Return a new ClientSession object. This object is managed by a 00067 * std::shared_ptr to ensure that it remains alive while there are 00068 * outstanding RPCs. 00069 * 00070 * This should only be used from worker threads, as it invokes possibly 00071 * long-running syscalls. 00072 * 00073 * \param eventLoop 00074 * Event::Loop that will be used to find out when the underlying 00075 * socket may be read from or written to without blocking. 00076 * \param address 00077 * The RPC server address on which to connect. 00078 * \param maxMessageLength 00079 * The maximum number of bytes to allow per request/response. This 00080 * exists to limit the amount of buffer space a single RPC can use. 00081 * Attempting to send longer requests will PANIC; attempting to 00082 * receive longer requests will disconnect the underlying socket. 00083 * \param timeout 00084 * After this time has elapsed, stop trying to initiate the connection 00085 * and leave the session in an error state. 00086 * \param config 00087 * General settings. This object does not keep a reference. 00088 */ 00089 static std::shared_ptr<ClientSession> 00090 makeSession(Event::Loop& eventLoop, 00091 const Address& address, 00092 uint32_t maxMessageLength, 00093 TimePoint timeout, 00094 const Core::Config& config); 00095 00096 /** 00097 * Return a ClientSession object that's already in an error state. This can 00098 * be useful for delaying errors until an RPC is waited on. 00099 * \param eventLoop 00100 * Ignored but usually readily available to callers and needed to 00101 * satisfy type requirements. 00102 * \param errorMessage 00103 * Description of the error, as will be returned by getErrorMessage() 00104 * later. 00105 */ 00106 static std::shared_ptr<ClientSession> 00107 makeErrorSession(Event::Loop& eventLoop, 00108 const std::string& errorMessage); 00109 00110 /** 00111 * Destructor. 00112 */ 00113 ~ClientSession(); 00114 00115 /** 00116 * Initiate an RPC. 00117 * This method is safe to call from any thread. 00118 * \param request 00119 * The contents of the RPC request. 00120 * \return 00121 * This is be used to wait for and retrieve the reply to the RPC. 00122 */ 00123 OpaqueClientRPC sendRequest(Core::Buffer request); 00124 00125 /** 00126 * If the socket has been disconnected, return a descriptive message. 00127 * The suggested way to detect errors is to wait until an RPC returns an 00128 * error. This method can be used to detect errors earlier. 00129 * 00130 * This method is safe to call from any thread. 00131 * 00132 * \return 00133 * If an error has occurred, a message describing that error. 00134 * Otherwise, an empty string. 00135 */ 00136 std::string getErrorMessage() const; 00137 00138 /** 00139 * Return a string describing this session. It will include the address of 00140 * the server and, if the session has an error, the error message. 00141 */ 00142 std::string toString() const; 00143 00144 private: 00145 00146 /** 00147 * This handles events from #messageSocket. 00148 */ 00149 class MessageSocketHandler : public MessageSocket::Handler { 00150 public: 00151 explicit MessageSocketHandler(ClientSession& clientSession); 00152 void handleReceivedMessage(MessageId messageId, Core::Buffer message); 00153 void handleDisconnect(); 00154 ClientSession& session; 00155 }; 00156 00157 /** 00158 * This contains an expected response for a OpaqueClientRPC object. 00159 * This is created when the OpaqueClientRPC is created; it is deleted when 00160 * the OpaqueClientRPC object is either canceled or updated with a 00161 * response/error. 00162 */ 00163 struct Response { 00164 /** 00165 * Constructor. 00166 */ 00167 Response(); 00168 /** 00169 * Current state of the RPC. 00170 */ 00171 enum { 00172 /** 00173 * Waiting for a reply from the server. 00174 */ 00175 WAITING, 00176 /** 00177 * Received a reply (find it in #reply). 00178 */ 00179 HAS_REPLY, 00180 /** 00181 * The RPC has been canceled by another thread. 00182 */ 00183 CANCELED, 00184 } status; 00185 /** 00186 * The contents of the response. This is valid when 00187 * #status is HAS_REPLY. 00188 */ 00189 Core::Buffer reply; 00190 /** 00191 * If true, a thread is blocked waiting on #ready, 00192 * and this object may not be deleted. 00193 */ 00194 bool hasWaiter; 00195 /** 00196 * OpaqueClientRPC objects wait on this condition variable inside of 00197 * wait(). It is notified when a new response arrives, the session 00198 * is disconnected, or the RPC is canceled. 00199 */ 00200 Core::ConditionVariable ready; 00201 }; 00202 00203 /** 00204 * This is used to time out RPCs and sessions when the server is no longer 00205 * responding. After a timeout period, the client will send a ping to the 00206 * server. If no response is received within another timeout period, the 00207 * session is closed. 00208 */ 00209 class Timer : public Event::Timer { 00210 public: 00211 explicit Timer(ClientSession& session); 00212 void handleTimerEvent(); 00213 ClientSession& session; 00214 }; 00215 00216 // The cancel(), update(), and wait() methods are used by OpaqueClientRPC. 00217 friend class OpaqueClientRPC; 00218 00219 /** 00220 * Called by the RPC when it is no longer interested in its response. 00221 * 00222 * This may be called while holding the RPC's lock. 00223 * 00224 * TODO(ongaro): It'd be nice to cancel sending the request if it hasn't 00225 * already gone out, but I guess that's going to be a pretty rare case. 00226 */ 00227 void cancel(OpaqueClientRPC& rpc); 00228 00229 /** 00230 * Called by the RPC when it wants to be learn of its response 00231 * (non-blocking). 00232 * 00233 * This must be called while holding the RPC's lock. 00234 */ 00235 void update(OpaqueClientRPC& rpc); 00236 00237 /** 00238 * Called by the RPC to wait for its response (blocking). The caller should 00239 * call update() after this returns to learn of the response. 00240 * 00241 * This must not be called while holding the RPC's lock. 00242 * \param rpc 00243 * Wait for response to this. 00244 * \param timeout 00245 * After this time has elapsed, stop waiting and return. The RPC's 00246 * results will probably not be available yet in this case. 00247 */ 00248 void wait(const OpaqueClientRPC& rpc, TimePoint timeout); 00249 00250 /** 00251 * This is used to keep this object alive while there are outstanding RPCs. 00252 */ 00253 std::weak_ptr<ClientSession> self; 00254 00255 /** 00256 * The number of nanoseconds to wait until the client gets suspicious 00257 * about the server not responding. After this amount of time elapses, the 00258 * client will send a ping to the server. If no response is received within 00259 * another PING_TIMEOUT_NS milliseconds, the session is closed. 00260 * 00261 * TODO(ongaro): How should this value be chosen? 00262 * Ideally, you probably want this to be set to something like the 99-th 00263 * percentile of your RPC latency. 00264 * 00265 * TODO(ongaro): How does this interact with TCP? 00266 */ 00267 const uint64_t PING_TIMEOUT_NS; 00268 00269 /** 00270 * The event loop that is used for non-blocking I/O. 00271 */ 00272 Event::Loop& eventLoop; 00273 00274 /** 00275 * The RPC server address provided to the constructor. 00276 */ 00277 const Address address; 00278 00279 /** 00280 * Receives events from #messageSocket. 00281 */ 00282 MessageSocketHandler messageSocketHandler; 00283 00284 /** 00285 * This is used to time out RPCs and sessions when the server is no longer 00286 * responding. See Timer. 00287 */ 00288 Timer timer; 00289 00290 /** 00291 * This mutex protects several members of this class: 00292 * - #nextMessageId 00293 * - #responses 00294 * - #errorMessage 00295 * - #numActiveRPCs 00296 * - #activePing 00297 */ 00298 mutable std::mutex mutex; 00299 00300 /** 00301 * The message ID to assign to the next RPC. These start at 0 and 00302 * increment from there. 00303 */ 00304 MessageSocket::MessageId nextMessageId; 00305 00306 /** 00307 * A map from MessageId to Response objects that is used to store the 00308 * response to RPCs and look it up for OpaqueClientRPC objects. The 00309 * Response objects mapped to must be deleted manually when removed from 00310 * this map (gcc 4.4 doesn't support mapping to non-copyable objects). 00311 */ 00312 std::unordered_map<MessageSocket::MessageId, Response*> responses; 00313 00314 /** 00315 * If this session is disconnected then this holds the error message. 00316 * All new RPCs will be immediately 'ready' with this error message. 00317 * Otherwise, this is the empty string. 00318 */ 00319 std::string errorMessage; 00320 00321 /** 00322 * The number of outstanding RPC requests that have been sent but whose 00323 * responses have not yet been received. This does not include ping 00324 * requests sent by the #timer (which aren't real RPCs). 00325 * This is used to determine when to schedule the timer: the timer is 00326 * scheduled if numActiveRPCs is non-zero. 00327 */ 00328 uint32_t numActiveRPCs; 00329 00330 /** 00331 * When numActiveRPCs is > 0, this field indicates that we are waiting for 00332 * a ping response as evidence that the server is still alive. 00333 * When numActiveRPCs = 0, this field is undefined. 00334 */ 00335 bool activePing; 00336 00337 /** 00338 * The MessageSocket used to send RPC requests and receive RPC responses. 00339 * This may be NULL if the socket was never created. In this case, 00340 * #errorMessage will be set. 00341 */ 00342 std::unique_ptr<MessageSocket> messageSocket; 00343 00344 /** 00345 * Registers timer with the event loop. 00346 */ 00347 Event::Timer::Monitor timerMonitor; 00348 00349 /** 00350 * Usually set to connect() but mocked out in some unit tests. 00351 */ 00352 static std::function< 00353 int(int sockfd, 00354 const struct sockaddr *addr, 00355 socklen_t addrlen)> connectFn; 00356 00357 // ClientSession is non-copyable. 00358 ClientSession(const ClientSession&) = delete; 00359 ClientSession& operator=(const ClientSession&) = delete; 00360 }; // class ClientSession 00361 00362 } // namespace LogCabin::RPC 00363 } // namespace LogCabin 00364 00365 #endif /* LOGCABIN_RPC_CLIENTSESSION_H */