LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2014-2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <unistd.h> 00018 00019 #include "Client/Backoff.h" 00020 #include "Client/LeaderRPC.h" 00021 #include "Core/Debug.h" 00022 #include "Core/Util.h" 00023 #include "Protocol/Common.h" 00024 #include "RPC/ClientSession.h" 00025 #include "RPC/ClientRPC.h" 00026 00027 namespace LogCabin { 00028 namespace Client { 00029 00030 //// class LeaderRPCBase //// 00031 00032 std::ostream& 00033 operator<<(std::ostream& os, const LeaderRPCBase::Status& status) 00034 { 00035 switch (status) { 00036 case LeaderRPCBase::Status::OK: 00037 os << "Status::OK"; 00038 break; 00039 case LeaderRPCBase::Status::TIMEOUT: 00040 os << "Status::TIMEOUT"; 00041 break; 00042 case LeaderRPCBase::Status::INVALID_REQUEST: 00043 os << "Status::INVALID_REQUEST"; 00044 break; 00045 } 00046 return os; 00047 } 00048 00049 std::ostream& 00050 operator<<(std::ostream& os, const LeaderRPCBase::Call::Status& status) 00051 { 00052 switch (status) { 00053 case LeaderRPCBase::Call::Status::OK: 00054 os << "Status::OK"; 00055 break; 00056 case LeaderRPCBase::Call::Status::RETRY: 00057 os << "Status::RETRY"; 00058 break; 00059 case LeaderRPCBase::Call::Status::TIMEOUT: 00060 os << "Status::TIMEOUT"; 00061 break; 00062 case LeaderRPCBase::Call::Status::INVALID_REQUEST: 00063 os << "Status::INVALID_REQUEST"; 00064 break; 00065 } 00066 return os; 00067 } 00068 00069 //// class LeaderRPC::Call //// 00070 00071 LeaderRPC::Call::Call(LeaderRPC& leaderRPC) 00072 : leaderRPC(leaderRPC) 00073 , cachedSession() 00074 , rpc() 00075 { 00076 } 00077 00078 LeaderRPC::Call::~Call() 00079 { 00080 } 00081 00082 void 00083 LeaderRPC::Call::start(OpCode opCode, 00084 const google::protobuf::Message& request, 00085 TimePoint timeout) 00086 { 00087 // Save a reference to the leaderSession 00088 cachedSession = leaderRPC.getSession(timeout); 00089 rpc = RPC::ClientRPC(cachedSession, 00090 Protocol::Common::ServiceId::CLIENT_SERVICE, 00091 1, 00092 opCode, 00093 request); 00094 } 00095 00096 void 00097 LeaderRPC::Call::cancel() 00098 { 00099 rpc.cancel(); 00100 cachedSession.reset(); 00101 } 00102 00103 LeaderRPC::Call::Status 00104 LeaderRPC::Call::wait(google::protobuf::Message& response, 00105 TimePoint timeout) 00106 { 00107 typedef RPC::ClientRPC::Status RPCStatus; 00108 Protocol::Client::Error error; 00109 RPCStatus status = rpc.waitForReply(&response, &error, timeout); 00110 00111 // Decode the response 00112 switch (status) { 00113 case RPCStatus::OK: 00114 leaderRPC.reportSuccess(cachedSession); 00115 return Call::Status::OK; 00116 case RPCStatus::SERVICE_SPECIFIC_ERROR: 00117 switch (error.error_code()) { 00118 case Protocol::Client::Error::NOT_LEADER: 00119 // The server we tried is not the current cluster leader. 00120 if (error.has_leader_hint()) { 00121 leaderRPC.reportRedirect(cachedSession, 00122 error.leader_hint()); 00123 } else { 00124 leaderRPC.reportNotLeader(cachedSession); 00125 } 00126 break; 00127 default: 00128 // Hmm, we don't know what this server is trying to tell 00129 // us, but something is wrong. The server shouldn't reply 00130 // back with error codes we don't understand. That's why we 00131 // gave it a serverSpecificErrorVersion number in the 00132 // request header. 00133 PANIC("Unknown error code %u returned in service-specific " 00134 "error. This probably indicates a bug in the server", 00135 error.error_code()); 00136 } 00137 break; 00138 case RPCStatus::RPC_FAILED: 00139 leaderRPC.reportFailure(cachedSession); 00140 break; 00141 case RPCStatus::RPC_CANCELED: 00142 break; 00143 case RPCStatus::TIMEOUT: 00144 return Call::Status::TIMEOUT; 00145 case RPCStatus::INVALID_SERVICE: 00146 PANIC("The server isn't running the ClientService"); 00147 case RPCStatus::INVALID_REQUEST: 00148 return Call::Status::INVALID_REQUEST; 00149 } 00150 if (timeout < Clock::now()) 00151 return Call::Status::TIMEOUT; 00152 else 00153 return Call::Status::RETRY; 00154 } 00155 00156 00157 //// class LeaderRPC //// 00158 00159 LeaderRPC::LeaderRPC(const RPC::Address& hosts, 00160 SessionManager::ClusterUUID& clusterUUID, 00161 Backoff& sessionCreationBackoff, 00162 SessionManager& sessionManager) 00163 : clusterUUID(clusterUUID) 00164 , sessionCreationBackoff(sessionCreationBackoff) 00165 , sessionManager(sessionManager) 00166 , mutex() 00167 , isConnecting(false) 00168 , connected() 00169 , hosts(hosts) 00170 , leaderHint() 00171 , leaderSession() // set by connect() 00172 , failuresSinceLastSuccess(0) 00173 { 00174 } 00175 00176 LeaderRPC::~LeaderRPC() 00177 { 00178 leaderSession.reset(); 00179 } 00180 00181 LeaderRPC::Status 00182 LeaderRPC::call(OpCode opCode, 00183 const google::protobuf::Message& request, 00184 google::protobuf::Message& response, 00185 TimePoint timeout) 00186 { 00187 while (true) { 00188 Call c(*this); 00189 c.start(opCode, request, timeout); 00190 Call::Status callStatus = c.wait(response, timeout); 00191 switch (callStatus) { 00192 case Call::Status::OK: 00193 return Status::OK; 00194 case Call::Status::TIMEOUT: 00195 return Status::TIMEOUT; 00196 case Call::Status::RETRY: 00197 break; 00198 case Call::Status::INVALID_REQUEST: 00199 return Status::INVALID_REQUEST; 00200 } 00201 } 00202 } 00203 00204 std::unique_ptr<LeaderRPCBase::Call> 00205 LeaderRPC::makeCall() 00206 { 00207 return std::unique_ptr<LeaderRPCBase::Call>(new LeaderRPC::Call(*this)); 00208 } 00209 00210 std::shared_ptr<RPC::ClientSession> 00211 LeaderRPC::getSession(TimePoint timeout) 00212 { 00213 std::unique_lock<std::mutex> lockGuard(mutex); 00214 00215 // Threads used to hold the mutex while creating a new session, but then to 00216 // respect timeouts, you'd have to acquire the mutex with a timeout. This 00217 // condition variable approach seems cleaner to me, where the mutex is only 00218 // held during computation, not during I/O. See #173. -Diego 00219 while (isConnecting) { 00220 // Go to sleep, as another thread is already creating a new session. 00221 connected.wait_until(lockGuard, timeout); 00222 if (Clock::now() > timeout) { 00223 return RPC::ClientSession::makeErrorSession( 00224 sessionManager.eventLoop, 00225 "Failed to get session to leader in time that another thread " 00226 "is creating: timeout expired"); 00227 } 00228 } 00229 00230 if (leaderSession) 00231 return leaderSession; 00232 00233 // This thread will create a new session; others should wait. 00234 isConnecting = true; 00235 00236 // Determine which address to connect to while still holding the lock. 00237 RPC::Address address; 00238 if (leaderHint.empty()) { 00239 // Hope the next random host is the leader. If that turns out to be 00240 // false, we will soon find out. 00241 address = hosts; 00242 } else { 00243 // Connect to the leader given by 'leaderHint'. 00244 address = RPC::Address(leaderHint, Protocol::Common::DEFAULT_PORT); 00245 // Don't clear leaderHint until down below, in case this thread times 00246 // out before making any use of it. 00247 } 00248 00249 // Don't hang onto the mutex for any of this blocking stuff (doing so would 00250 // delay other threads with shorter timeouts; see #173). 00251 std::shared_ptr<RPC::ClientSession> session; 00252 bool usedHint = true; 00253 { 00254 Core::MutexUnlock<std::mutex> unlockGuard(lockGuard); 00255 00256 // sleep if we've tried to connect too much recently 00257 sessionCreationBackoff.delayAndBegin(timeout); 00258 if (Clock::now() > timeout) { 00259 session = RPC::ClientSession::makeErrorSession( 00260 sessionManager.eventLoop, 00261 "Failed to create session to leader: timeout expired"); 00262 usedHint = false; 00263 } else { 00264 address.refresh(timeout); 00265 VERBOSE("Connecting to: %s", address.toString().c_str()); 00266 session = sessionManager.createSession( 00267 address, 00268 timeout, 00269 &clusterUUID); 00270 } 00271 } 00272 00273 // Assign back to leaderSession only now that we have the lock again. 00274 leaderSession = session; 00275 if (usedHint) 00276 leaderHint.clear(); 00277 // Unblock other threads and return. 00278 isConnecting = false; 00279 connected.notify_all(); 00280 return leaderSession; 00281 } 00282 00283 void 00284 LeaderRPC::reportFailure(std::shared_ptr<RPC::ClientSession> cachedSession) 00285 { 00286 std::lock_guard<std::mutex> lockGuard(mutex); 00287 if (cachedSession != leaderSession) 00288 return; 00289 ++failuresSinceLastSuccess; 00290 if (Core::Util::isPowerOfTwo(failuresSinceLastSuccess)) { 00291 NOTICE("RPC to server failed: %s " 00292 "(there have been %lu failed attempts during this outage)", 00293 cachedSession->toString().c_str(), 00294 failuresSinceLastSuccess); 00295 } else { 00296 VERBOSE("RPC to server failed: %s " 00297 "(there have been %lu failed attempts during this outage)", 00298 cachedSession->toString().c_str(), 00299 failuresSinceLastSuccess); 00300 } 00301 leaderSession.reset(); 00302 } 00303 00304 void 00305 LeaderRPC::reportNotLeader(std::shared_ptr<RPC::ClientSession> cachedSession) 00306 { 00307 std::lock_guard<std::mutex> lockGuard(mutex); 00308 if (cachedSession != leaderSession) 00309 return; 00310 ++failuresSinceLastSuccess; 00311 if (Core::Util::isPowerOfTwo(failuresSinceLastSuccess)) { 00312 NOTICE("Server [%s] is not leader, will try random host next " 00313 "(there have been %lu failed attempts during this outage)", 00314 cachedSession->toString().c_str(), 00315 failuresSinceLastSuccess); 00316 } else { 00317 VERBOSE("Server [%s] is not leader, will try random host next " 00318 "(there have been %lu failed attempts during this outage)", 00319 cachedSession->toString().c_str(), 00320 failuresSinceLastSuccess); 00321 } 00322 leaderSession.reset(); 00323 } 00324 00325 void 00326 LeaderRPC::reportRedirect(std::shared_ptr<RPC::ClientSession> cachedSession, 00327 const std::string& host) 00328 { 00329 std::lock_guard<std::mutex> lockGuard(mutex); 00330 if (cachedSession != leaderSession) 00331 return; 00332 ++failuresSinceLastSuccess; 00333 if (Core::Util::isPowerOfTwo(failuresSinceLastSuccess)) { 00334 NOTICE("Server [%s] is not leader, will try suggested %s next " 00335 "(there have been %lu failed attempts during this outage)", 00336 cachedSession->toString().c_str(), 00337 host.c_str(), 00338 failuresSinceLastSuccess); 00339 } else { 00340 VERBOSE("Server [%s] is not leader, will try suggested %s next " 00341 "(there have been %lu failed attempts during this outage)", 00342 cachedSession->toString().c_str(), 00343 host.c_str(), 00344 failuresSinceLastSuccess); 00345 } 00346 leaderSession.reset(); 00347 leaderHint = host; 00348 } 00349 00350 void 00351 LeaderRPC::reportSuccess(std::shared_ptr<RPC::ClientSession> cachedSession) 00352 { 00353 std::lock_guard<std::mutex> lockGuard(mutex); 00354 if (cachedSession != leaderSession) 00355 return; 00356 if (failuresSinceLastSuccess > 0) { 00357 NOTICE("Successfully connected to leader [%s] after %lu failures", 00358 cachedSession->toString().c_str(), 00359 failuresSinceLastSuccess); 00360 failuresSinceLastSuccess = 0; 00361 } 00362 } 00363 00364 00365 } // namespace LogCabin::Client 00366 } // namespace LogCabin