LogCabin
RPC/OpaqueClientRPC.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2014 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include "Core/Mutex.h"
00018 #include "RPC/ClientSession.h"
00019 #include "RPC/OpaqueClientRPC.h"
00020 
00021 namespace LogCabin {
00022 namespace RPC {
00023 
00024 OpaqueClientRPC::OpaqueClientRPC()
00025     : mutex()
00026     , session()
00027     , responseToken(~0UL)
00028     , status(Status::NOT_READY)
00029     , reply()
00030     , errorMessage()
00031 {
00032 }
00033 
00034 OpaqueClientRPC::OpaqueClientRPC(OpaqueClientRPC&& other)
00035     : mutex()
00036     , session(std::move(other.session))
00037     , responseToken(std::move(other.responseToken))
00038     , status(std::move(other.status))
00039     , reply(std::move(other.reply))
00040     , errorMessage(std::move(other.errorMessage))
00041 {
00042 }
00043 
00044 OpaqueClientRPC::~OpaqueClientRPC()
00045 {
00046     cancel();
00047 }
00048 
00049 OpaqueClientRPC&
00050 OpaqueClientRPC::operator=(OpaqueClientRPC&& other)
00051 {
00052     std::lock_guard<std::mutex> mutexGuard(mutex);
00053     session = std::move(other.session);
00054     responseToken = std::move(other.responseToken);
00055     status = std::move(other.status);
00056     reply = std::move(other.reply);
00057     errorMessage = std::move(other.errorMessage);
00058     return *this;
00059 }
00060 
00061 void
00062 OpaqueClientRPC::cancel()
00063 {
00064     std::lock_guard<std::mutex> mutexGuard(mutex);
00065     if (status != Status::NOT_READY)
00066         return;
00067     if (session)
00068         session->cancel(*this);
00069     status = Status::CANCELED;
00070     session.reset();
00071     reply.reset();
00072     errorMessage = "RPC canceled by user";
00073 }
00074 
00075 std::string
00076 OpaqueClientRPC::getErrorMessage() const
00077 {
00078     std::lock_guard<std::mutex> mutexGuard(mutex);
00079     const_cast<OpaqueClientRPC*>(this)->update();
00080     return errorMessage;
00081 }
00082 
00083 OpaqueClientRPC::Status
00084 OpaqueClientRPC::getStatus() const
00085 {
00086     std::lock_guard<std::mutex> mutexGuard(mutex);
00087     const_cast<OpaqueClientRPC*>(this)->update();
00088     return status;
00089 }
00090 
00091 Core::Buffer*
00092 OpaqueClientRPC::peekReply()
00093 {
00094     std::lock_guard<std::mutex> mutexGuard(mutex);
00095     update();
00096     if (status == Status::OK)
00097         return &reply;
00098     else
00099         return NULL;
00100 }
00101 
00102 void
00103 OpaqueClientRPC::waitForReply(TimePoint timeout)
00104 {
00105     std::unique_lock<std::mutex> mutexGuard(mutex);
00106     if (status != Status::NOT_READY)
00107         return;
00108     if (session) {
00109         {
00110             // release the mutex while calling wait()
00111             Core::MutexUnlock<std::mutex> unlockGuard(mutexGuard);
00112             session->wait(*this, timeout);
00113         }
00114         update();
00115     } else {
00116         errorMessage = "This RPC was never associated with a ClientSession.";
00117         status = Status::ERROR;
00118     }
00119 }
00120 
00121 ///// private methods /////
00122 
00123 void
00124 OpaqueClientRPC::update()
00125 {
00126     if (status == Status::NOT_READY && session)
00127         session->update(*this);
00128 }
00129 
00130 ///// exported functions /////
00131 
00132 ::std::ostream&
00133 operator<<(::std::ostream& os, OpaqueClientRPC::Status status)
00134 {
00135     typedef OpaqueClientRPC::Status Status;
00136     switch (status) {
00137         case Status::NOT_READY:
00138             return os << "NOT_READY";
00139         case Status::OK:
00140             return os << "OK";
00141         case Status::ERROR:
00142             return os << "ERROR";
00143         case Status::CANCELED:
00144             return os << "CANCELED";
00145         default:
00146             return os << "(INVALID VALUE)";
00147     }
00148 }
00149 
00150 } // namespace LogCabin::RPC
00151 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines