LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2014-2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <memory> 00018 #include <set> 00019 #include <string> 00020 #include <thread> 00021 00022 #include "build/Protocol/ServerControl.pb.h" 00023 #include "include/LogCabin/Client.h" 00024 #include "Client/Backoff.h" 00025 #include "Client/LeaderRPC.h" 00026 #include "Client/SessionManager.h" 00027 #include "Core/ConditionVariable.h" 00028 #include "Core/Config.h" 00029 #include "Core/Mutex.h" 00030 #include "Core/Time.h" 00031 #include "Event/Loop.h" 00032 00033 #ifndef LOGCABIN_CLIENT_CLIENTIMPL_H 00034 #define LOGCABIN_CLIENT_CLIENTIMPL_H 00035 00036 namespace LogCabin { 00037 namespace Client { 00038 00039 /** 00040 * A predicate on tree operations. 00041 * First component: the absolute path corresponding to the 'path' 00042 * argument of setCondition(), or empty if no condition is set. 00043 * Second component: the file contents given as the 'value' argument of 00044 * setCondition(). 00045 */ 00046 typedef std::pair<std::string, std::string> Condition; 00047 00048 /** 00049 * The implementation of the client library. 00050 * This is wrapped by Client::Cluster and Client::Log for usability. 00051 */ 00052 class ClientImpl { 00053 public: 00054 /// Clock used for timeouts. 00055 typedef LeaderRPC::Clock Clock; 00056 /// Type for absolute time values used for timeouts. 00057 typedef LeaderRPC::TimePoint TimePoint; 00058 00059 /** 00060 * Return the absolute time when the calling operation should timeout. 00061 * \param relTimeoutNanos 00062 * The number of nanoseconds from now to time out, or 0 for no 00063 * timeout. 00064 */ 00065 static TimePoint absTimeout(uint64_t relTimeoutNanos); 00066 00067 /// Constructor. 00068 explicit ClientImpl(const std::map<std::string, std::string>& options = 00069 std::map<std::string, std::string>()); 00070 /// Destructor. 00071 virtual ~ClientImpl(); 00072 00073 /** 00074 * Initialize this object. This must be called directly after the 00075 * constructor. 00076 * \param hosts 00077 * A string describing the hosts in the cluster. This should be of the 00078 * form host:port, where host is usually a DNS name that resolves to 00079 * multiple IP addresses, or a comma-delimited list. 00080 */ 00081 void init(const std::string& hosts); 00082 00083 /** 00084 * Called by init() to do any necessary initialization of the derived 00085 * class. 00086 */ 00087 virtual void initDerived(); 00088 00089 GetConfigurationResult getConfiguration(TimePoint timeout); 00090 ConfigurationResult setConfiguration( 00091 uint64_t oldId, 00092 const Configuration& newConfiguration, 00093 TimePoint timeout); 00094 00095 /// See Cluster::getServerInfo. 00096 Result getServerInfo(const std::string& host, 00097 TimePoint timeout, 00098 Server& info); 00099 00100 /** 00101 * Return the canonicalized path name resulting from accessing path 00102 * relative to workingDirectory. 00103 * \return 00104 * Status and error message. Possible errors are: 00105 * - INVALID_ARGUMENT if path is relative and workingDirectory is not 00106 * an absolute path. 00107 * - INVALID_ARGUMENT if path attempts to access above the root 00108 * directory. 00109 */ 00110 Result canonicalize(const std::string& path, 00111 const std::string& workingDirectory, 00112 std::string& canonical); 00113 00114 /// See Tree::makeDirectory. 00115 Result makeDirectory(const std::string& path, 00116 const std::string& workingDirectory, 00117 const Condition& condition, 00118 TimePoint timeout); 00119 00120 /// See Tree::listDirectory. 00121 Result listDirectory(const std::string& path, 00122 const std::string& workingDirectory, 00123 const Condition& condition, 00124 TimePoint timeout, 00125 std::vector<std::string>& children); 00126 00127 /// See Tree::removeDirectory. 00128 Result removeDirectory(const std::string& path, 00129 const std::string& workingDirectory, 00130 const Condition& condition, 00131 TimePoint timeout); 00132 00133 /// See Tree::write. 00134 Result write(const std::string& path, 00135 const std::string& workingDirectory, 00136 const std::string& contents, 00137 const Condition& condition, 00138 TimePoint timeout); 00139 00140 /// See Tree::read. 00141 Result read(const std::string& path, 00142 const std::string& workingDirectory, 00143 const Condition& condition, 00144 TimePoint timeout, 00145 std::string& contents); 00146 00147 /// See Tree::removeFile. 00148 Result removeFile(const std::string& path, 00149 const std::string& workingDirectory, 00150 const Condition& condition, 00151 TimePoint timeout); 00152 00153 /** 00154 * Low-level interface to ServerControl service used by 00155 * Client/ServerControl.cc. 00156 */ 00157 Result serverControl(const std::string& host, 00158 TimePoint timeout, 00159 Protocol::ServerControl::OpCode opCode, 00160 const google::protobuf::Message& request, 00161 google::protobuf::Message& response); 00162 00163 protected: 00164 00165 /** 00166 * Options/settings. 00167 */ 00168 const Core::Config config; 00169 00170 /** 00171 * The Event::Loop used to drive the underlying RPC mechanism. 00172 */ 00173 Event::Loop eventLoop; 00174 00175 /** 00176 * A unique ID for the cluster that this client may connect to. This is 00177 * initialized to a value from the options map passed to the 00178 * Client::Cluster constructor. If it's not set then, it may be set later 00179 * as a result of learning a UUID from some server. 00180 */ 00181 SessionManager::ClusterUUID clusterUUID; 00182 00183 /** 00184 * Used to create new sessions. 00185 */ 00186 SessionManager sessionManager; 00187 00188 /** 00189 * Used to rate-limit the creation of ClientSession objects (TCP 00190 * connections). 00191 */ 00192 Backoff sessionCreationBackoff; 00193 00194 /** 00195 * Describes the hosts in the cluster. 00196 */ 00197 std::string hosts; 00198 00199 /** 00200 * Used to send RPCs to the leader of the LogCabin cluster. 00201 */ 00202 std::unique_ptr<LeaderRPCBase> leaderRPC; 00203 00204 /** 00205 * This class helps with providing exactly-once semantics for read-write 00206 * RPCs. For example, it assigns sequence numbers to RPCs, which servers 00207 * then use to prevent duplicate processing of duplicate requests. 00208 * 00209 * This class is implemented in a monitor style. 00210 */ 00211 class ExactlyOnceRPCHelper { 00212 public: 00213 /** 00214 * Constructor. 00215 * \param client 00216 * Used to open a session with the cluster. Should not be NULL. 00217 */ 00218 explicit ExactlyOnceRPCHelper(ClientImpl* client); 00219 /** 00220 * Destructor. 00221 */ 00222 ~ExactlyOnceRPCHelper(); 00223 /** 00224 * Prepare to shut down (join with thread). 00225 */ 00226 void exit(); 00227 /** 00228 * Call this before sending an RPC. 00229 * \param timeout 00230 * If this timeout elapses before a session can be opened with the 00231 * cluster, this method will return early and the returned 00232 * information will have a client_id set to 0, which is not a 00233 * valid ID. 00234 * \return 00235 * Info to be used with read-write RPCs, or if the timeout 00236 * elapsed, a client_id set to 0. 00237 */ 00238 Protocol::Client::ExactlyOnceRPCInfo getRPCInfo(TimePoint timeout); 00239 /** 00240 * Call this after receiving an RPCs response. 00241 */ 00242 void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&); 00243 00244 private: 00245 00246 /** 00247 * Internal version of getRPCInfo() to avoid deadlock with self. 00248 */ 00249 Protocol::Client::ExactlyOnceRPCInfo getRPCInfo( 00250 Core::HoldingMutex holdingMutex, 00251 TimePoint timeout); 00252 /** 00253 * Internal version of doneWithRPC() to avoid deadlock with self. 00254 */ 00255 void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&, 00256 Core::HoldingMutex holdingMutex); 00257 /** 00258 * Main function for keep-alive thread. Periodically makes 00259 * requests to the cluster to keep the client's session active. 00260 */ 00261 void keepAliveThreadMain(); 00262 00263 /** 00264 * Used to open a session with the cluster. 00265 * const and non-NULL except for unit tests. 00266 */ 00267 ClientImpl* client; 00268 /** 00269 * Protects all the members of this class. 00270 */ 00271 mutable Core::Mutex mutex; 00272 /** 00273 * The numbers of the RPCs for which this client is still awaiting a 00274 * response. 00275 */ 00276 std::set<uint64_t> outstandingRPCNumbers; 00277 /** 00278 * The client's session ID as returned by the open session RPC, or 0 if 00279 * one has not yet been assigned. 00280 */ 00281 uint64_t clientId; 00282 /** 00283 * The number to assign to the next RPC. 00284 */ 00285 uint64_t nextRPCNumber; 00286 /** 00287 * keepAliveThread blocks on this. Notified when lastKeepAliveStart, 00288 * keepAliveIntervalMs, or exiting changes. 00289 */ 00290 Core::ConditionVariable keepAliveCV; 00291 /** 00292 * Flag to keepAliveThread that it should shut down. 00293 */ 00294 bool exiting; 00295 /** 00296 * Time just before the last keep-alive or read-write request to the 00297 * cluster was made. The next keep-alive request will be invoked 00298 * keepAliveIntervalMs after this, if no intervening requests are made. 00299 */ 00300 TimePoint lastKeepAliveStart; 00301 /** 00302 * How often session keep-alive requests are sent during periods of 00303 * inactivity. 00304 */ 00305 std::chrono::milliseconds keepAliveInterval; 00306 /** 00307 * How long to wait for the CloseSession RPC before giving up. 00308 */ 00309 std::chrono::milliseconds sessionCloseTimeout; 00310 00311 /** 00312 * If set, this is an ongoing keep-alive RPC. This call is canceled to 00313 * interrupt #keepAliveThread when exiting. 00314 */ 00315 std::unique_ptr<LeaderRPCBase::Call> keepAliveCall; 00316 00317 /** 00318 * Runs keepAliveThreadMain(). 00319 * Since this thread would be unexpected/wasteful for clients that only 00320 * issue read-only requests (or no requests at all), it is spawned 00321 * lazily, if/when the client opens its session with the cluster (upon 00322 * its first read-write request). 00323 */ 00324 std::thread keepAliveThread; 00325 00326 // ExactlyOnceRPCHelper is not copyable. 00327 ExactlyOnceRPCHelper(const ExactlyOnceRPCHelper&) = delete; 00328 ExactlyOnceRPCHelper& operator=(const ExactlyOnceRPCHelper&) = delete; 00329 } exactlyOnceRPCHelper; 00330 00331 /** 00332 * A thread that runs the Event::Loop. 00333 */ 00334 std::thread eventLoopThread; 00335 00336 // ClientImpl is not copyable 00337 ClientImpl(const ClientImpl&) = delete; 00338 ClientImpl& operator=(const ClientImpl&) = delete; 00339 }; 00340 00341 } // namespace LogCabin::Client 00342 } // namespace LogCabin 00343 00344 #endif /* LOGCABIN_CLIENT_CLIENTIMPL_H */