LogCabin
RPC/ClientSession.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2014-2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <assert.h>
00018 #include <string.h>
00019 #include <sys/epoll.h>
00020 #include <unistd.h>
00021 
00022 #include "Core/Debug.h"
00023 #include "Core/StringUtil.h"
00024 #include "Event/File.h"
00025 #include "Event/Loop.h"
00026 #include "Event/Timer.h"
00027 #include "Protocol/Common.h"
00028 #include "RPC/ClientSession.h"
00029 
00030 namespace LogCabin {
00031 namespace RPC {
00032 
00033 namespace {
00034 
00035 /**
00036  * Exits an event loop when a file event occurs.
00037  * Helper for ClientSession constructor.
00038  */
00039 struct FileNotifier : public Event::File {
00040     FileNotifier(Event::Loop& eventLoop, int fd, Ownership ownership)
00041         : Event::File(fd, ownership)
00042         , eventLoop(eventLoop)
00043         , count(0)
00044     {
00045     }
00046     void handleFileEvent(uint32_t events) {
00047         ++count;
00048         eventLoop.exit();
00049     }
00050     Event::Loop& eventLoop;
00051     uint64_t count;
00052 };
00053 
00054 /**
00055  * Exits an event loop when a timer event occurs.
00056  * Helper for ClientSession constructor.
00057  */
00058 struct TimerNotifier : public Event::Timer {
00059     explicit TimerNotifier(Event::Loop& eventLoop)
00060         : Event::Timer()
00061         , eventLoop(eventLoop)
00062     {
00063     }
00064     void handleTimerEvent() {
00065         eventLoop.exit();
00066     }
00067     Event::Loop& eventLoop;
00068 };
00069 
00070 } // anonymous namespace
00071 
00072 ////////// ClientSession::MessageSocketHandler //////////
00073 
00074 ClientSession::MessageSocketHandler::MessageSocketHandler(
00075         ClientSession& session)
00076     : session(session)
00077 {
00078 }
00079 
00080 void
00081 ClientSession::MessageSocketHandler::handleReceivedMessage(
00082         MessageId messageId,
00083         Core::Buffer message)
00084 {
00085     std::lock_guard<std::mutex> mutexGuard(session.mutex);
00086 
00087     if (messageId == Protocol::Common::PING_MESSAGE_ID) {
00088         if (session.numActiveRPCs > 0 && session.activePing) {
00089             // The server has shown that it is alive for now.
00090             // Let's get suspicious again in another PING_TIMEOUT_NS.
00091             session.activePing = false;
00092             session.timer.schedule(session.PING_TIMEOUT_NS);
00093         } else {
00094             VERBOSE("Received an unexpected ping response. This can happen "
00095                     "for a number of reasons and is no cause for alarm. For "
00096                     "example, this happens if a ping request was sent out, "
00097                     "then all RPCs completed before the ping response "
00098                     "arrived.");
00099         }
00100         return;
00101     }
00102 
00103     auto it = session.responses.find(messageId);
00104     if (it == session.responses.end()) {
00105         VERBOSE("Received an unexpected response with message ID %lu. "
00106                 "This can happen for a number of reasons and is no cause "
00107                 "for alarm. For example, this happens if the RPC was "
00108                 "cancelled before its response arrived.",
00109                 messageId);
00110         return;
00111     }
00112     Response& response = *it->second;
00113     if (response.status == Response::HAS_REPLY) {
00114         WARNING("Received a second response from the server for "
00115                 "message ID %lu. This indicates that either the client or "
00116                 "server is assigning message IDs incorrectly, or "
00117                 "the server is misbehaving. Dropped this response.",
00118                 messageId);
00119         return;
00120     }
00121 
00122     // Book-keeping for timeouts
00123     --session.numActiveRPCs;
00124     if (session.numActiveRPCs == 0)
00125         session.timer.deschedule();
00126     else
00127         session.timer.schedule(session.PING_TIMEOUT_NS);
00128 
00129     // Fill in the response
00130     response.status = Response::HAS_REPLY;
00131     response.reply = std::move(message);
00132     response.ready.notify_all();
00133 }
00134 
00135 void
00136 ClientSession::MessageSocketHandler::handleDisconnect()
00137 {
00138     VERBOSE("Disconnected from server %s",
00139             session.address.toString().c_str());
00140     std::lock_guard<std::mutex> mutexGuard(session.mutex);
00141     if (session.errorMessage.empty()) {
00142         // Fail all current and future RPCs.
00143         session.errorMessage = ("Disconnected from server " +
00144                                 session.address.toString());
00145         // Notify any waiting RPCs.
00146         for (auto it = session.responses.begin();
00147              it != session.responses.end();
00148              ++it) {
00149             Response* response = it->second;
00150             response->ready.notify_all();
00151         }
00152     }
00153 }
00154 
00155 ////////// ClientSession::Response //////////
00156 
00157 ClientSession::Response::Response()
00158     : status(Response::WAITING)
00159     , reply()
00160     , hasWaiter(false)
00161     , ready()
00162 {
00163 }
00164 
00165 ////////// ClientSession::Timer //////////
00166 
00167 ClientSession::Timer::Timer(ClientSession& session)
00168     : Event::Timer()
00169     , session(session)
00170 {
00171 }
00172 
00173 void
00174 ClientSession::Timer::handleTimerEvent()
00175 {
00176     std::lock_guard<std::mutex> mutexGuard(session.mutex);
00177 
00178     // Handle "spurious" wake-ups.
00179     if (!session.messageSocket ||
00180         session.numActiveRPCs == 0 ||
00181         !session.errorMessage.empty()) {
00182         return;
00183     }
00184 
00185     // Send a ping or expire the session.
00186     if (!session.activePing) {
00187         VERBOSE("Getting suspicious of %s: sending ping (have %u RPCs "
00188                 "outstanding)",
00189                 session.address.toString().c_str(),
00190                 session.numActiveRPCs);
00191         session.activePing = true;
00192         session.messageSocket->sendMessage(Protocol::Common::PING_MESSAGE_ID,
00193                                            Core::Buffer());
00194         schedule(session.PING_TIMEOUT_NS);
00195     } else {
00196         VERBOSE("ClientSession to %s timed out: didn't get ping reply in "
00197                 "time, failing %u outstanding RPCs",
00198                 session.address.toString().c_str(),
00199                 session.numActiveRPCs);
00200         // Fail all current and future RPCs.
00201         session.errorMessage = ("Server " +
00202                                 session.address.toString() +
00203                                 " timed out");
00204         // Notify any waiting RPCs.
00205         for (auto it = session.responses.begin();
00206              it != session.responses.end();
00207              ++it) {
00208             Response* response = it->second;
00209             response->ready.notify_all();
00210         }
00211     }
00212 }
00213 
00214 ////////// ClientSession //////////
00215 
00216 std::function<
00217     int(int sockfd,
00218         const struct sockaddr *addr,
00219         socklen_t addrlen)> ClientSession::connectFn = ::connect;
00220 
00221 ClientSession::ClientSession(Event::Loop& eventLoop,
00222                              const Address& address,
00223                              uint32_t maxMessageLength,
00224                              TimePoint timeout,
00225                              const Core::Config& config)
00226     : self() // makeSession will fill this in shortly
00227     , PING_TIMEOUT_NS(config.read<uint64_t>(
00228         "tcpHeartbeatTimeoutMilliseconds", 500) * 1000 * 1000)
00229     , eventLoop(eventLoop)
00230     , address(address)
00231     , messageSocketHandler(*this)
00232     , timer(*this)
00233     , mutex()
00234     , nextMessageId(0)
00235     , responses()
00236     , errorMessage()
00237     , numActiveRPCs(0)
00238     , activePing(false)
00239     , messageSocket()
00240     , timerMonitor(eventLoop, timer)
00241 {
00242     // Be careful not to pass a sockaddr of length 0 to conect(). Although it
00243     // should return -1 EINVAL, on some systems (e.g., RHEL6) it instead
00244     // returns OK but leaves the socket unconnected! See
00245     // https://github.com/logcabin/logcabin/issues/66 for more details.
00246     if (!address.isValid()) {
00247         errorMessage = "Failed to resolve " + address.toString();
00248         return;
00249     }
00250 
00251     // Some TCP connection timeouts appear to be ridiculously long in the wild.
00252     // Limit this to 1 second by default, after which you'd most likely want to
00253     // retry.
00254     timeout = std::min(timeout,
00255                        (Clock::now() +
00256                         std::chrono::milliseconds(
00257                             config.read<uint64_t>(
00258                                 "tcpConnectTimeoutMilliseconds", 1000))));
00259 
00260     // Setting NONBLOCK here makes connect return right away with EINPROGRESS.
00261     // Then we can monitor the fd until it's writable to know when it's done,
00262     // along with a timeout. See man page for connect under EINPROGRESS.
00263     int fd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0);
00264     if (fd < 0) {
00265         errorMessage = "Failed to create socket";
00266         return;
00267     }
00268 
00269     // According to the spec, connect() could return OK done here, but in
00270     // practice it'll return EINPROGRESS.
00271     bool waiting = false;
00272     int r = connectFn(fd,
00273                       address.getSockAddr(),
00274                       address.getSockAddrLen());
00275     if (r != 0) {
00276         switch (errno) {
00277             case EINPROGRESS:
00278                 waiting = true;
00279                 break;
00280             default:
00281                 errorMessage = Core::StringUtil::format(
00282                     "Failed to connect socket to %s: %s",
00283                     address.toString().c_str(),
00284                     strerror(errno));
00285                 close(fd);
00286                 return;
00287         }
00288     }
00289 
00290     if (waiting) {
00291         // This is a pretty heavy-weight method of watching a file descriptor
00292         // for a given period of time. On the other hand, it's only a few lines
00293         // of code with the LogCabin::Event classes, so it's easier for now.
00294         Event::Loop loop;
00295         FileNotifier fileNotifier(loop, fd, Event::File::CALLER_CLOSES_FD);
00296         TimerNotifier timerNotifier(loop);
00297         Event::File::Monitor fileMonitor(loop, fileNotifier, EPOLLOUT);
00298         Event::Timer::Monitor timerMonitor(loop, timerNotifier);
00299         timerNotifier.scheduleAbsolute(timeout);
00300         while (true) {
00301             loop.runForever();
00302             if (fileNotifier.count > 0) {
00303                 int error = 0;
00304                 socklen_t errorlen = sizeof(error);
00305                 r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &errorlen);
00306                 if (r != 0)
00307                     PANIC("getsockopt failed: %s", strerror(errno));
00308                 if (error != 0) {
00309                     errorMessage = Core::StringUtil::format(
00310                         "Failed to connect socket to %s: %s",
00311                         address.toString().c_str(),
00312                         strerror(error));
00313                 }
00314                 break;
00315             }
00316             if (Clock::now() > timeout) {
00317                 errorMessage = Core::StringUtil::format(
00318                     "Failed to connect socket to %s: timeout expired",
00319                     address.toString().c_str());
00320                 break;
00321             }
00322             WARNING("spurious exit from event loop?");
00323         }
00324     }
00325     if (!errorMessage.empty()) {
00326         close(fd);
00327         return;
00328     }
00329 
00330     messageSocket.reset(new MessageSocket(
00331         messageSocketHandler, eventLoop, fd, maxMessageLength));
00332 }
00333 
00334 std::shared_ptr<ClientSession>
00335 ClientSession::makeSession(Event::Loop& eventLoop,
00336                            const Address& address,
00337                            uint32_t maxMessageLength,
00338                            TimePoint timeout,
00339                            const Core::Config& config)
00340 {
00341     std::shared_ptr<ClientSession> session(
00342         new ClientSession(eventLoop,
00343                           address,
00344                           maxMessageLength,
00345                           timeout,
00346                           config));
00347     session->self = session;
00348     return session;
00349 }
00350 
00351 std::shared_ptr<ClientSession>
00352 ClientSession::makeErrorSession(Event::Loop& eventLoop,
00353                                 const std::string& errorMessage)
00354 {
00355     Core::Config config;
00356     std::shared_ptr<ClientSession> session(
00357         new ClientSession(eventLoop,
00358                           Address(),
00359                           0,
00360                           TimePoint::min(),
00361                           config));
00362     session->self = session;
00363     session->errorMessage = errorMessage;
00364     return session;
00365 }
00366 
00367 
00368 
00369 ClientSession::~ClientSession()
00370 {
00371     timerMonitor.disableForever();
00372     messageSocket.reset();
00373     for (auto it = responses.begin(); it != responses.end(); ++it)
00374         delete it->second;
00375 }
00376 
00377 OpaqueClientRPC
00378 ClientSession::sendRequest(Core::Buffer request)
00379 {
00380     MessageSocket::MessageId messageId;
00381     {
00382         std::lock_guard<std::mutex> mutexGuard(mutex);
00383         messageId = nextMessageId;
00384         ++nextMessageId;
00385         responses[messageId] = new Response();
00386 
00387         ++numActiveRPCs;
00388         if (numActiveRPCs == 1) {
00389             // activePing's value was undefined while numActiveRPCs = 0
00390             activePing = false;
00391             timer.schedule(PING_TIMEOUT_NS);
00392         }
00393     }
00394     // Release the mutex before sending so that receives can be processed
00395     // simultaneously with sends.
00396     if (messageSocket)
00397         messageSocket->sendMessage(messageId, std::move(request));
00398     OpaqueClientRPC rpc;
00399     rpc.session = self.lock();
00400     rpc.responseToken = messageId;
00401     return rpc;
00402 }
00403 
00404 std::string
00405 ClientSession::getErrorMessage() const
00406 {
00407     std::lock_guard<std::mutex> mutexGuard(mutex);
00408     return errorMessage;
00409 }
00410 
00411 std::string
00412 ClientSession::toString() const
00413 {
00414     std::string error = getErrorMessage();
00415     if (error.empty()) {
00416         return "Active session to " + address.toString();
00417     } else {
00418         // error will already include the server's address.
00419         return "Closed session: " + error;
00420     }
00421 }
00422 
00423 ////////// ClientSession private methods //////////
00424 
00425 void
00426 ClientSession::cancel(OpaqueClientRPC& rpc)
00427 {
00428     // The RPC may be holding the last reference to this session. This
00429     // temporary reference makes sure this object isn't destroyed until after
00430     // we return from this method. It must be the first line in this method.
00431     std::shared_ptr<ClientSession> selfGuard(self.lock());
00432 
00433     // There are two ways to cancel an RPC:
00434     // 1. If there's some thread currently blocked in wait(), this method marks
00435     //    the Response's status as CANCELED, and wait() will delete it later.
00436     // 2. If there's no thread currently blocked in wait(), the Response is
00437     //    deleted entirely.
00438     std::lock_guard<std::mutex> mutexGuard(mutex);
00439     auto it = responses.find(rpc.responseToken);
00440     if (it == responses.end())
00441         return;
00442     Response* response = it->second;
00443     if (response->hasWaiter) {
00444         response->status = Response::CANCELED;
00445         response->ready.notify_all();
00446     } else {
00447         delete response;
00448         responses.erase(it);
00449     }
00450 
00451     --numActiveRPCs;
00452     // Even if numActiveRPCs == 0, it's simpler here to just let the timer wake
00453     // up an extra time and clean up. Otherwise, we'd need to grab an
00454     // Event::Loop::Lock prior to the mutex to call deschedule() without
00455     // inducing deadlock.
00456 }
00457 
00458 void
00459 ClientSession::update(OpaqueClientRPC& rpc)
00460 {
00461     // The RPC may be holding the last reference to this session. This
00462     // temporary reference makes sure this object isn't destroyed until after
00463     // we return from this method. It must be the first line in this method.
00464     std::shared_ptr<ClientSession> selfGuard(self.lock());
00465 
00466     std::lock_guard<std::mutex> mutexGuard(mutex);
00467     auto it = responses.find(rpc.responseToken);
00468     if (it == responses.end()) {
00469         // RPC was cancelled, fields set already
00470         assert(rpc.status == OpaqueClientRPC::Status::CANCELED);
00471         return;
00472     }
00473     Response* response = it->second;
00474     if (response->status == Response::HAS_REPLY) {
00475         rpc.reply = std::move(response->reply);
00476         rpc.status = OpaqueClientRPC::Status::OK;
00477     } else if (!errorMessage.empty()) {
00478         rpc.errorMessage = errorMessage;
00479         rpc.status = OpaqueClientRPC::Status::ERROR;
00480     } else {
00481         // If the RPC was canceled, then it'd be marked ready and update()
00482         // wouldn't be called again.
00483         assert(response->status != Response::CANCELED);
00484         return; // not ready
00485     }
00486     rpc.session.reset();
00487 
00488     delete response;
00489     responses.erase(it);
00490 }
00491 
00492 void
00493 ClientSession::wait(const OpaqueClientRPC& rpc, TimePoint timeout)
00494 {
00495     // The RPC may be holding the last reference to this session. This
00496     // temporary reference makes sure this object isn't destroyed until after
00497     // we return from this method. It must be the first line in this method.
00498     std::shared_ptr<ClientSession> selfGuard(self.lock());
00499 
00500     std::unique_lock<std::mutex> mutexGuard(mutex);
00501     while (true) {
00502         auto it = responses.find(rpc.responseToken);
00503         if (it == responses.end())
00504             return; // RPC was cancelled or already updated
00505         Response* response = it->second;
00506         if (response->status == Response::HAS_REPLY) {
00507             return; // RPC has completed
00508         } else if (response->status == Response::CANCELED) {
00509             // RPC was cancelled, finish cleaning up
00510             delete response;
00511             responses.erase(it);
00512             return;
00513         } else if (!errorMessage.empty()) {
00514             return; // session has error
00515         } else if (timeout < Clock::now()) {
00516             return; // timeout
00517         }
00518         response->hasWaiter = true;
00519         response->ready.wait_until(mutexGuard, timeout);
00520         response->hasWaiter = false;
00521     }
00522 }
00523 
00524 } // namespace LogCabin::RPC
00525 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines