LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2014-2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <cinttypes> 00018 #include <deque> 00019 #include <memory> 00020 #include <mutex> 00021 00022 #include "build/Protocol/Client.pb.h" 00023 #include "Client/SessionManager.h" 00024 #include "Core/ConditionVariable.h" 00025 #include "RPC/Address.h" 00026 #include "RPC/ClientRPC.h" 00027 00028 #ifndef LOGCABIN_CLIENT_LEADERRPC_H 00029 #define LOGCABIN_CLIENT_LEADERRPC_H 00030 00031 namespace LogCabin { 00032 00033 // forward declaration 00034 namespace Core { 00035 class Config; 00036 } 00037 00038 // forward declaration 00039 namespace Event { 00040 class Loop; 00041 } 00042 00043 // forward declaration 00044 namespace RPC { 00045 class ClientSession; 00046 } 00047 00048 namespace Client { 00049 00050 // forward declaration 00051 class Backoff; 00052 00053 /** 00054 * This class is used to send RPCs from clients to the leader of the LogCabin 00055 * cluster. It automatically finds and connects to the leader and transparently 00056 * rolls over to a new leader when necessary. 00057 * 00058 * There are two implementations of this interface: LeaderRPC is probably the 00059 * one you're interested in. LeaderRPCMock is used for unit testing only. 00060 */ 00061 class LeaderRPCBase { 00062 public: 00063 /// Clock used for timeouts. 00064 typedef RPC::ClientRPC::Clock Clock; 00065 /// Type for absolute time values used for timeouts. 00066 typedef RPC::ClientRPC::TimePoint TimePoint; 00067 00068 /** 00069 * RPC operation code. 00070 */ 00071 typedef Protocol::Client::OpCode OpCode; 00072 00073 /** 00074 * Return type for LeaderRPCBase::call(). 00075 */ 00076 enum class Status { 00077 /** 00078 * The RPC completed scucessfully. 00079 */ 00080 OK, 00081 /** 00082 * The given timeout elapsed before the RPC completed. 00083 */ 00084 TIMEOUT, 00085 /** 00086 * The server rejected the request, probably because it doesn't 00087 * support the opcode, or maybe the request arguments were invalid. 00088 */ 00089 INVALID_REQUEST, 00090 }; 00091 00092 /** 00093 * Print out a Status for debugging purposes. 00094 */ 00095 friend std::ostream& operator<<(std::ostream& os, const Status& server); 00096 00097 /// Constructor. 00098 LeaderRPCBase() {} 00099 00100 /// Destructor. 00101 virtual ~LeaderRPCBase() {} 00102 00103 /** 00104 * Execute an RPC on the cluster leader. 00105 * This class guarantees that the RPC will be executed at least once. 00106 * \param opCode 00107 * RPC operation code. The caller must guarantee that this is a valid 00108 * opCode. (If the server rejects it, this will PANIC.) 00109 * \param request 00110 * The parameters for the operation. The caller must guarantee that 00111 * this is a well-formed request. (If the server rejects it, this will 00112 * PANIC.) 00113 * \param timeout 00114 * After this time has elapsed, stop waiting and return TIMEOUT. 00115 * In this case, response will be left unmodified. 00116 * \param[out] response 00117 * The response to the operation will be filled in here. 00118 */ 00119 virtual Status call(OpCode opCode, 00120 const google::protobuf::Message& request, 00121 google::protobuf::Message& response, 00122 TimePoint timeout) = 0; 00123 00124 /** 00125 * An asynchronous version of call(). This allows multiple RPCs to be 00126 * executed concurrently, or canceling an RPC that is running on a separate 00127 * thread. 00128 */ 00129 class Call { 00130 public: 00131 /** 00132 * Return type for LeaderRPCBase::Call::wait(). 00133 */ 00134 enum class Status { 00135 /** 00136 * The RPC completed scucessfully. 00137 */ 00138 OK, 00139 /** 00140 * The RPC did not succeed, nor did it timeout. 00141 * The caller should try again. 00142 * TODO(ongaro): this is a bit ugly 00143 */ 00144 RETRY, 00145 /** 00146 * The given timeout elapsed before the RPC completed. 00147 */ 00148 TIMEOUT, 00149 /** 00150 * The server rejected the request, probably because it doesn't 00151 * support the opcode, or maybe the request arguments were invalid. 00152 */ 00153 INVALID_REQUEST, 00154 }; 00155 00156 /** 00157 * Print out a Status for debugging purposes. 00158 */ 00159 friend std::ostream& operator<<(std::ostream& os, 00160 const Status& server); 00161 00162 /** 00163 * Constructor. 00164 */ 00165 Call() {} 00166 /** 00167 * Destructor. 00168 */ 00169 virtual ~Call() {} 00170 /** 00171 * Invoke the RPC. 00172 * \param opCode 00173 * RPC operation code. The caller must guarantee that this is a 00174 * valid opCode. (If the server rejects it, this will PANIC.) 00175 * \param request 00176 * The parameters for the operation. The caller must guarantee 00177 * that this is a well-formed request. (If the server rejects it, 00178 * this will PANIC.) 00179 * \param timeout 00180 * After this time has elapsed, stop trying to initiate the 00181 * connection to the leader and use an invalid session, which will 00182 * cause the RPC to fail later. 00183 */ 00184 virtual void start(OpCode opCode, 00185 const google::protobuf::Message& request, 00186 TimePoint timeout) = 0; 00187 /** 00188 * Cancel the RPC. This may only be called after start(), but it may 00189 * be called safely from a separate thread. 00190 */ 00191 virtual void cancel() = 0; 00192 /** 00193 * Wait for the RPC to complete. 00194 * \param[out] response 00195 * If successful, the response to the operation will be filled in 00196 * here. 00197 * \param timeout 00198 * After this time has elapsed, stop waiting and return TIMEOUT. 00199 * In this case, response will be left unmodified. 00200 * \return 00201 * True if the RPC completed successfully, false otherwise. If 00202 * this returns false, it is the callers responsibility to start 00203 * over to achieve the same at-most-once semantics as #call(). 00204 */ 00205 virtual Status wait(google::protobuf::Message& response, 00206 TimePoint timeout) = 0; 00207 }; 00208 00209 /** 00210 * Return a new Call object. 00211 */ 00212 virtual std::unique_ptr<Call> makeCall() = 0; 00213 00214 // LeaderRPCBase is not copyable 00215 LeaderRPCBase(const LeaderRPCBase&) = delete; 00216 LeaderRPCBase& operator=(const LeaderRPCBase&) = delete; 00217 }; 00218 00219 00220 /** 00221 * This is the implementation of LeaderRPCBase that uses the RPC system. 00222 * (The other implementation, LeaderRPCMock, is only used for testing.) 00223 */ 00224 class LeaderRPC : public LeaderRPCBase { 00225 public: 00226 /** 00227 * Constructor. 00228 * \param hosts 00229 * Describe the servers to connect to. This class assumes that 00230 * refreshing 'hosts' will result in a random host that might be the 00231 * current cluster leader. 00232 * \param clusterUUID 00233 * Keeps track of the unique ID for this cluster, if known. 00234 * \param sessionCreationBackoff 00235 * Used to rate-limit new TCP connections. 00236 * \param sessionManager 00237 * Used to create new sessions. 00238 */ 00239 LeaderRPC(const RPC::Address& hosts, 00240 SessionManager::ClusterUUID& clusterUUID, 00241 Backoff& sessionCreationBackoff, 00242 SessionManager& sessionManager); 00243 00244 /// Destructor. 00245 ~LeaderRPC(); 00246 00247 /// See LeaderRPCBase::call. 00248 Status call(OpCode opCode, 00249 const google::protobuf::Message& request, 00250 google::protobuf::Message& response, 00251 TimePoint timeout); 00252 00253 /// See LeaderRPCBase::makeCall(). 00254 std::unique_ptr<LeaderRPCBase::Call> makeCall(); 00255 00256 private: 00257 00258 /// See LeaderRPCBase::Call. 00259 class Call : public LeaderRPCBase::Call { 00260 public: 00261 explicit Call(LeaderRPC& leaderRPC); 00262 ~Call(); 00263 void start(OpCode opCode, 00264 const google::protobuf::Message& request, 00265 TimePoint timeout); 00266 void cancel(); 00267 Status wait(google::protobuf::Message& response, 00268 TimePoint timeout); 00269 LeaderRPC& leaderRPC; 00270 /** 00271 * Copy of leaderSession when the RPC was started (might have changed 00272 * since). 00273 */ 00274 std::shared_ptr<RPC::ClientSession> cachedSession; 00275 /** 00276 * RPC object which may be canceled. 00277 */ 00278 RPC::ClientRPC rpc; 00279 }; 00280 00281 /** 00282 * Return a session connected to the most likely cluster leader, creating 00283 * it if necessary. 00284 * \param timeout 00285 * After this time has elapsed, stop trying to initiate the connection 00286 * and return an invalid session. 00287 * \return 00288 * Session on which to execute RPCs. 00289 */ 00290 std::shared_ptr<RPC::ClientSession> 00291 getSession(TimePoint timeout); 00292 00293 /** 00294 * Notify this class that an RPC on the given session failed. This will 00295 * usually cause this class to connect to a random server next time 00296 * getSession() is called. 00297 * \param cachedSession 00298 * Session previously returned by getSession(). This is used to detect 00299 * races in which some other thread has already solved the problem. 00300 */ 00301 void 00302 reportFailure(std::shared_ptr<RPC::ClientSession> cachedSession); 00303 00304 /** 00305 * Notify this class that a non-leader server rejected an RPC. This will 00306 * usually cause this class to connect to a random server next time 00307 * getSession() is called. 00308 * \param cachedSession 00309 * Session previously returned by getSession(). This is used to detect 00310 * races in which some other thread has already solved the problem. 00311 */ 00312 void 00313 reportNotLeader(std::shared_ptr<RPC::ClientSession> cachedSession); 00314 00315 /** 00316 * Notify this class that an RPC on the given session was redirected by a 00317 * non-leader server. This will usually cause this class to connect to the 00318 * given host the next time getSession() is called. 00319 * \param cachedSession 00320 * Session previously returned by getSession(). This is used to detect 00321 * races in which some other thread has already solved the problem. 00322 * \param host 00323 * Address of the server that is likely the leader. 00324 */ 00325 void 00326 reportRedirect(std::shared_ptr<RPC::ClientSession> cachedSession, 00327 const std::string& host); 00328 00329 /** 00330 * Notify this class that an RPC on the given session reached a leader. 00331 * This is just here for debug log messages. 00332 * \param cachedSession 00333 * Session previously returned by getSession(). This is used to detect 00334 * races in which some other thread has already solved any problems. 00335 */ 00336 void 00337 reportSuccess(std::shared_ptr<RPC::ClientSession> cachedSession); 00338 00339 /** 00340 * Keeps track of the unique ID for this cluster, if known. 00341 */ 00342 SessionManager::ClusterUUID& clusterUUID; 00343 00344 /** 00345 * Used to rate-limit the creation of ClientSession objects (TCP 00346 * connections). 00347 */ 00348 Backoff& sessionCreationBackoff; 00349 00350 /** 00351 * Used to create new sessions. 00352 */ 00353 SessionManager& sessionManager; 00354 00355 /** 00356 * Protects all of the following member variables in this class. 00357 */ 00358 std::mutex mutex; 00359 00360 /** 00361 * Set to true when some thread is already initiating a new session. 00362 * When this is already true, other threads wait on #connected 00363 * rather than starting additional sessions. 00364 */ 00365 bool isConnecting; 00366 00367 /** 00368 * Notified when #isConnecting becomes false (when #leaderSession is set). 00369 */ 00370 Core::ConditionVariable connected; 00371 00372 /** 00373 * An address referring to the hosts in the LogCabin cluster. A random host 00374 * is selected from here when this class doesn't know who the cluster 00375 * leader is. 00376 */ 00377 RPC::Address hosts; 00378 00379 /** 00380 * If nonempty, the address of the server that is likely to be the 00381 * current leader. 00382 */ 00383 std::string leaderHint; 00384 00385 /** 00386 * The goal is to get this session connected to the cluster leader. 00387 * This is never null, but it might sometimes point to the wrong host. 00388 */ 00389 std::shared_ptr<RPC::ClientSession> leaderSession; 00390 00391 /** 00392 * The number of attempted RPCs that have not successfully reached a leader 00393 * since the last time an RPC did. Used for rate-limiting and summarizing 00394 * log messages: they're only printed when this number reaches a power of 00395 * two. 00396 */ 00397 uint64_t failuresSinceLastSuccess; 00398 }; 00399 00400 } // namespace LogCabin::Client 00401 } // namespace LogCabin 00402 00403 #endif /* LOGCABIN_CLIENT_LEADERRPC_H */