LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2014-2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <assert.h> 00018 #include <string.h> 00019 #include <sys/epoll.h> 00020 #include <unistd.h> 00021 00022 #include "Core/Debug.h" 00023 #include "Core/StringUtil.h" 00024 #include "Event/File.h" 00025 #include "Event/Loop.h" 00026 #include "Event/Timer.h" 00027 #include "Protocol/Common.h" 00028 #include "RPC/ClientSession.h" 00029 00030 namespace LogCabin { 00031 namespace RPC { 00032 00033 namespace { 00034 00035 /** 00036 * Exits an event loop when a file event occurs. 00037 * Helper for ClientSession constructor. 00038 */ 00039 struct FileNotifier : public Event::File { 00040 FileNotifier(Event::Loop& eventLoop, int fd, Ownership ownership) 00041 : Event::File(fd, ownership) 00042 , eventLoop(eventLoop) 00043 , count(0) 00044 { 00045 } 00046 void handleFileEvent(uint32_t events) { 00047 ++count; 00048 eventLoop.exit(); 00049 } 00050 Event::Loop& eventLoop; 00051 uint64_t count; 00052 }; 00053 00054 /** 00055 * Exits an event loop when a timer event occurs. 00056 * Helper for ClientSession constructor. 00057 */ 00058 struct TimerNotifier : public Event::Timer { 00059 explicit TimerNotifier(Event::Loop& eventLoop) 00060 : Event::Timer() 00061 , eventLoop(eventLoop) 00062 { 00063 } 00064 void handleTimerEvent() { 00065 eventLoop.exit(); 00066 } 00067 Event::Loop& eventLoop; 00068 }; 00069 00070 } // anonymous namespace 00071 00072 ////////// ClientSession::MessageSocketHandler ////////// 00073 00074 ClientSession::MessageSocketHandler::MessageSocketHandler( 00075 ClientSession& session) 00076 : session(session) 00077 { 00078 } 00079 00080 void 00081 ClientSession::MessageSocketHandler::handleReceivedMessage( 00082 MessageId messageId, 00083 Core::Buffer message) 00084 { 00085 std::lock_guard<std::mutex> mutexGuard(session.mutex); 00086 00087 if (messageId == Protocol::Common::PING_MESSAGE_ID) { 00088 if (session.numActiveRPCs > 0 && session.activePing) { 00089 // The server has shown that it is alive for now. 00090 // Let's get suspicious again in another PING_TIMEOUT_NS. 00091 session.activePing = false; 00092 session.timer.schedule(session.PING_TIMEOUT_NS); 00093 } else { 00094 VERBOSE("Received an unexpected ping response. This can happen " 00095 "for a number of reasons and is no cause for alarm. For " 00096 "example, this happens if a ping request was sent out, " 00097 "then all RPCs completed before the ping response " 00098 "arrived."); 00099 } 00100 return; 00101 } 00102 00103 auto it = session.responses.find(messageId); 00104 if (it == session.responses.end()) { 00105 VERBOSE("Received an unexpected response with message ID %lu. " 00106 "This can happen for a number of reasons and is no cause " 00107 "for alarm. For example, this happens if the RPC was " 00108 "cancelled before its response arrived.", 00109 messageId); 00110 return; 00111 } 00112 Response& response = *it->second; 00113 if (response.status == Response::HAS_REPLY) { 00114 WARNING("Received a second response from the server for " 00115 "message ID %lu. This indicates that either the client or " 00116 "server is assigning message IDs incorrectly, or " 00117 "the server is misbehaving. Dropped this response.", 00118 messageId); 00119 return; 00120 } 00121 00122 // Book-keeping for timeouts 00123 --session.numActiveRPCs; 00124 if (session.numActiveRPCs == 0) 00125 session.timer.deschedule(); 00126 else 00127 session.timer.schedule(session.PING_TIMEOUT_NS); 00128 00129 // Fill in the response 00130 response.status = Response::HAS_REPLY; 00131 response.reply = std::move(message); 00132 response.ready.notify_all(); 00133 } 00134 00135 void 00136 ClientSession::MessageSocketHandler::handleDisconnect() 00137 { 00138 VERBOSE("Disconnected from server %s", 00139 session.address.toString().c_str()); 00140 std::lock_guard<std::mutex> mutexGuard(session.mutex); 00141 if (session.errorMessage.empty()) { 00142 // Fail all current and future RPCs. 00143 session.errorMessage = ("Disconnected from server " + 00144 session.address.toString()); 00145 // Notify any waiting RPCs. 00146 for (auto it = session.responses.begin(); 00147 it != session.responses.end(); 00148 ++it) { 00149 Response* response = it->second; 00150 response->ready.notify_all(); 00151 } 00152 } 00153 } 00154 00155 ////////// ClientSession::Response ////////// 00156 00157 ClientSession::Response::Response() 00158 : status(Response::WAITING) 00159 , reply() 00160 , hasWaiter(false) 00161 , ready() 00162 { 00163 } 00164 00165 ////////// ClientSession::Timer ////////// 00166 00167 ClientSession::Timer::Timer(ClientSession& session) 00168 : Event::Timer() 00169 , session(session) 00170 { 00171 } 00172 00173 void 00174 ClientSession::Timer::handleTimerEvent() 00175 { 00176 std::lock_guard<std::mutex> mutexGuard(session.mutex); 00177 00178 // Handle "spurious" wake-ups. 00179 if (!session.messageSocket || 00180 session.numActiveRPCs == 0 || 00181 !session.errorMessage.empty()) { 00182 return; 00183 } 00184 00185 // Send a ping or expire the session. 00186 if (!session.activePing) { 00187 VERBOSE("Getting suspicious of %s: sending ping (have %u RPCs " 00188 "outstanding)", 00189 session.address.toString().c_str(), 00190 session.numActiveRPCs); 00191 session.activePing = true; 00192 session.messageSocket->sendMessage(Protocol::Common::PING_MESSAGE_ID, 00193 Core::Buffer()); 00194 schedule(session.PING_TIMEOUT_NS); 00195 } else { 00196 VERBOSE("ClientSession to %s timed out: didn't get ping reply in " 00197 "time, failing %u outstanding RPCs", 00198 session.address.toString().c_str(), 00199 session.numActiveRPCs); 00200 // Fail all current and future RPCs. 00201 session.errorMessage = ("Server " + 00202 session.address.toString() + 00203 " timed out"); 00204 // Notify any waiting RPCs. 00205 for (auto it = session.responses.begin(); 00206 it != session.responses.end(); 00207 ++it) { 00208 Response* response = it->second; 00209 response->ready.notify_all(); 00210 } 00211 } 00212 } 00213 00214 ////////// ClientSession ////////// 00215 00216 std::function< 00217 int(int sockfd, 00218 const struct sockaddr *addr, 00219 socklen_t addrlen)> ClientSession::connectFn = ::connect; 00220 00221 ClientSession::ClientSession(Event::Loop& eventLoop, 00222 const Address& address, 00223 uint32_t maxMessageLength, 00224 TimePoint timeout, 00225 const Core::Config& config) 00226 : self() // makeSession will fill this in shortly 00227 , PING_TIMEOUT_NS(config.read<uint64_t>( 00228 "tcpHeartbeatTimeoutMilliseconds", 500) * 1000 * 1000) 00229 , eventLoop(eventLoop) 00230 , address(address) 00231 , messageSocketHandler(*this) 00232 , timer(*this) 00233 , mutex() 00234 , nextMessageId(0) 00235 , responses() 00236 , errorMessage() 00237 , numActiveRPCs(0) 00238 , activePing(false) 00239 , messageSocket() 00240 , timerMonitor(eventLoop, timer) 00241 { 00242 // Be careful not to pass a sockaddr of length 0 to conect(). Although it 00243 // should return -1 EINVAL, on some systems (e.g., RHEL6) it instead 00244 // returns OK but leaves the socket unconnected! See 00245 // https://github.com/logcabin/logcabin/issues/66 for more details. 00246 if (!address.isValid()) { 00247 errorMessage = "Failed to resolve " + address.toString(); 00248 return; 00249 } 00250 00251 // Some TCP connection timeouts appear to be ridiculously long in the wild. 00252 // Limit this to 1 second by default, after which you'd most likely want to 00253 // retry. 00254 timeout = std::min(timeout, 00255 (Clock::now() + 00256 std::chrono::milliseconds( 00257 config.read<uint64_t>( 00258 "tcpConnectTimeoutMilliseconds", 1000)))); 00259 00260 // Setting NONBLOCK here makes connect return right away with EINPROGRESS. 00261 // Then we can monitor the fd until it's writable to know when it's done, 00262 // along with a timeout. See man page for connect under EINPROGRESS. 00263 int fd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0); 00264 if (fd < 0) { 00265 errorMessage = "Failed to create socket"; 00266 return; 00267 } 00268 00269 // According to the spec, connect() could return OK done here, but in 00270 // practice it'll return EINPROGRESS. 00271 bool waiting = false; 00272 int r = connectFn(fd, 00273 address.getSockAddr(), 00274 address.getSockAddrLen()); 00275 if (r != 0) { 00276 switch (errno) { 00277 case EINPROGRESS: 00278 waiting = true; 00279 break; 00280 default: 00281 errorMessage = Core::StringUtil::format( 00282 "Failed to connect socket to %s: %s", 00283 address.toString().c_str(), 00284 strerror(errno)); 00285 close(fd); 00286 return; 00287 } 00288 } 00289 00290 if (waiting) { 00291 // This is a pretty heavy-weight method of watching a file descriptor 00292 // for a given period of time. On the other hand, it's only a few lines 00293 // of code with the LogCabin::Event classes, so it's easier for now. 00294 Event::Loop loop; 00295 FileNotifier fileNotifier(loop, fd, Event::File::CALLER_CLOSES_FD); 00296 TimerNotifier timerNotifier(loop); 00297 Event::File::Monitor fileMonitor(loop, fileNotifier, EPOLLOUT); 00298 Event::Timer::Monitor timerMonitor(loop, timerNotifier); 00299 timerNotifier.scheduleAbsolute(timeout); 00300 while (true) { 00301 loop.runForever(); 00302 if (fileNotifier.count > 0) { 00303 int error = 0; 00304 socklen_t errorlen = sizeof(error); 00305 r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &errorlen); 00306 if (r != 0) 00307 PANIC("getsockopt failed: %s", strerror(errno)); 00308 if (error != 0) { 00309 errorMessage = Core::StringUtil::format( 00310 "Failed to connect socket to %s: %s", 00311 address.toString().c_str(), 00312 strerror(error)); 00313 } 00314 break; 00315 } 00316 if (Clock::now() > timeout) { 00317 errorMessage = Core::StringUtil::format( 00318 "Failed to connect socket to %s: timeout expired", 00319 address.toString().c_str()); 00320 break; 00321 } 00322 WARNING("spurious exit from event loop?"); 00323 } 00324 } 00325 if (!errorMessage.empty()) { 00326 close(fd); 00327 return; 00328 } 00329 00330 messageSocket.reset(new MessageSocket( 00331 messageSocketHandler, eventLoop, fd, maxMessageLength)); 00332 } 00333 00334 std::shared_ptr<ClientSession> 00335 ClientSession::makeSession(Event::Loop& eventLoop, 00336 const Address& address, 00337 uint32_t maxMessageLength, 00338 TimePoint timeout, 00339 const Core::Config& config) 00340 { 00341 std::shared_ptr<ClientSession> session( 00342 new ClientSession(eventLoop, 00343 address, 00344 maxMessageLength, 00345 timeout, 00346 config)); 00347 session->self = session; 00348 return session; 00349 } 00350 00351 std::shared_ptr<ClientSession> 00352 ClientSession::makeErrorSession(Event::Loop& eventLoop, 00353 const std::string& errorMessage) 00354 { 00355 Core::Config config; 00356 std::shared_ptr<ClientSession> session( 00357 new ClientSession(eventLoop, 00358 Address(), 00359 0, 00360 TimePoint::min(), 00361 config)); 00362 session->self = session; 00363 session->errorMessage = errorMessage; 00364 return session; 00365 } 00366 00367 00368 00369 ClientSession::~ClientSession() 00370 { 00371 timerMonitor.disableForever(); 00372 messageSocket.reset(); 00373 for (auto it = responses.begin(); it != responses.end(); ++it) 00374 delete it->second; 00375 } 00376 00377 OpaqueClientRPC 00378 ClientSession::sendRequest(Core::Buffer request) 00379 { 00380 MessageSocket::MessageId messageId; 00381 { 00382 std::lock_guard<std::mutex> mutexGuard(mutex); 00383 messageId = nextMessageId; 00384 ++nextMessageId; 00385 responses[messageId] = new Response(); 00386 00387 ++numActiveRPCs; 00388 if (numActiveRPCs == 1) { 00389 // activePing's value was undefined while numActiveRPCs = 0 00390 activePing = false; 00391 timer.schedule(PING_TIMEOUT_NS); 00392 } 00393 } 00394 // Release the mutex before sending so that receives can be processed 00395 // simultaneously with sends. 00396 if (messageSocket) 00397 messageSocket->sendMessage(messageId, std::move(request)); 00398 OpaqueClientRPC rpc; 00399 rpc.session = self.lock(); 00400 rpc.responseToken = messageId; 00401 return rpc; 00402 } 00403 00404 std::string 00405 ClientSession::getErrorMessage() const 00406 { 00407 std::lock_guard<std::mutex> mutexGuard(mutex); 00408 return errorMessage; 00409 } 00410 00411 std::string 00412 ClientSession::toString() const 00413 { 00414 std::string error = getErrorMessage(); 00415 if (error.empty()) { 00416 return "Active session to " + address.toString(); 00417 } else { 00418 // error will already include the server's address. 00419 return "Closed session: " + error; 00420 } 00421 } 00422 00423 ////////// ClientSession private methods ////////// 00424 00425 void 00426 ClientSession::cancel(OpaqueClientRPC& rpc) 00427 { 00428 // The RPC may be holding the last reference to this session. This 00429 // temporary reference makes sure this object isn't destroyed until after 00430 // we return from this method. It must be the first line in this method. 00431 std::shared_ptr<ClientSession> selfGuard(self.lock()); 00432 00433 // There are two ways to cancel an RPC: 00434 // 1. If there's some thread currently blocked in wait(), this method marks 00435 // the Response's status as CANCELED, and wait() will delete it later. 00436 // 2. If there's no thread currently blocked in wait(), the Response is 00437 // deleted entirely. 00438 std::lock_guard<std::mutex> mutexGuard(mutex); 00439 auto it = responses.find(rpc.responseToken); 00440 if (it == responses.end()) 00441 return; 00442 Response* response = it->second; 00443 if (response->hasWaiter) { 00444 response->status = Response::CANCELED; 00445 response->ready.notify_all(); 00446 } else { 00447 delete response; 00448 responses.erase(it); 00449 } 00450 00451 --numActiveRPCs; 00452 // Even if numActiveRPCs == 0, it's simpler here to just let the timer wake 00453 // up an extra time and clean up. Otherwise, we'd need to grab an 00454 // Event::Loop::Lock prior to the mutex to call deschedule() without 00455 // inducing deadlock. 00456 } 00457 00458 void 00459 ClientSession::update(OpaqueClientRPC& rpc) 00460 { 00461 // The RPC may be holding the last reference to this session. This 00462 // temporary reference makes sure this object isn't destroyed until after 00463 // we return from this method. It must be the first line in this method. 00464 std::shared_ptr<ClientSession> selfGuard(self.lock()); 00465 00466 std::lock_guard<std::mutex> mutexGuard(mutex); 00467 auto it = responses.find(rpc.responseToken); 00468 if (it == responses.end()) { 00469 // RPC was cancelled, fields set already 00470 assert(rpc.status == OpaqueClientRPC::Status::CANCELED); 00471 return; 00472 } 00473 Response* response = it->second; 00474 if (response->status == Response::HAS_REPLY) { 00475 rpc.reply = std::move(response->reply); 00476 rpc.status = OpaqueClientRPC::Status::OK; 00477 } else if (!errorMessage.empty()) { 00478 rpc.errorMessage = errorMessage; 00479 rpc.status = OpaqueClientRPC::Status::ERROR; 00480 } else { 00481 // If the RPC was canceled, then it'd be marked ready and update() 00482 // wouldn't be called again. 00483 assert(response->status != Response::CANCELED); 00484 return; // not ready 00485 } 00486 rpc.session.reset(); 00487 00488 delete response; 00489 responses.erase(it); 00490 } 00491 00492 void 00493 ClientSession::wait(const OpaqueClientRPC& rpc, TimePoint timeout) 00494 { 00495 // The RPC may be holding the last reference to this session. This 00496 // temporary reference makes sure this object isn't destroyed until after 00497 // we return from this method. It must be the first line in this method. 00498 std::shared_ptr<ClientSession> selfGuard(self.lock()); 00499 00500 std::unique_lock<std::mutex> mutexGuard(mutex); 00501 while (true) { 00502 auto it = responses.find(rpc.responseToken); 00503 if (it == responses.end()) 00504 return; // RPC was cancelled or already updated 00505 Response* response = it->second; 00506 if (response->status == Response::HAS_REPLY) { 00507 return; // RPC has completed 00508 } else if (response->status == Response::CANCELED) { 00509 // RPC was cancelled, finish cleaning up 00510 delete response; 00511 responses.erase(it); 00512 return; 00513 } else if (!errorMessage.empty()) { 00514 return; // session has error 00515 } else if (timeout < Clock::now()) { 00516 return; // timeout 00517 } 00518 response->hasWaiter = true; 00519 response->ready.wait_until(mutexGuard, timeout); 00520 response->hasWaiter = false; 00521 } 00522 } 00523 00524 } // namespace LogCabin::RPC 00525 } // namespace LogCabin