LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2014 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include "Core/Mutex.h" 00018 #include "RPC/ClientSession.h" 00019 #include "RPC/OpaqueClientRPC.h" 00020 00021 namespace LogCabin { 00022 namespace RPC { 00023 00024 OpaqueClientRPC::OpaqueClientRPC() 00025 : mutex() 00026 , session() 00027 , responseToken(~0UL) 00028 , status(Status::NOT_READY) 00029 , reply() 00030 , errorMessage() 00031 { 00032 } 00033 00034 OpaqueClientRPC::OpaqueClientRPC(OpaqueClientRPC&& other) 00035 : mutex() 00036 , session(std::move(other.session)) 00037 , responseToken(std::move(other.responseToken)) 00038 , status(std::move(other.status)) 00039 , reply(std::move(other.reply)) 00040 , errorMessage(std::move(other.errorMessage)) 00041 { 00042 } 00043 00044 OpaqueClientRPC::~OpaqueClientRPC() 00045 { 00046 cancel(); 00047 } 00048 00049 OpaqueClientRPC& 00050 OpaqueClientRPC::operator=(OpaqueClientRPC&& other) 00051 { 00052 std::lock_guard<std::mutex> mutexGuard(mutex); 00053 session = std::move(other.session); 00054 responseToken = std::move(other.responseToken); 00055 status = std::move(other.status); 00056 reply = std::move(other.reply); 00057 errorMessage = std::move(other.errorMessage); 00058 return *this; 00059 } 00060 00061 void 00062 OpaqueClientRPC::cancel() 00063 { 00064 std::lock_guard<std::mutex> mutexGuard(mutex); 00065 if (status != Status::NOT_READY) 00066 return; 00067 if (session) 00068 session->cancel(*this); 00069 status = Status::CANCELED; 00070 session.reset(); 00071 reply.reset(); 00072 errorMessage = "RPC canceled by user"; 00073 } 00074 00075 std::string 00076 OpaqueClientRPC::getErrorMessage() const 00077 { 00078 std::lock_guard<std::mutex> mutexGuard(mutex); 00079 const_cast<OpaqueClientRPC*>(this)->update(); 00080 return errorMessage; 00081 } 00082 00083 OpaqueClientRPC::Status 00084 OpaqueClientRPC::getStatus() const 00085 { 00086 std::lock_guard<std::mutex> mutexGuard(mutex); 00087 const_cast<OpaqueClientRPC*>(this)->update(); 00088 return status; 00089 } 00090 00091 Core::Buffer* 00092 OpaqueClientRPC::peekReply() 00093 { 00094 std::lock_guard<std::mutex> mutexGuard(mutex); 00095 update(); 00096 if (status == Status::OK) 00097 return &reply; 00098 else 00099 return NULL; 00100 } 00101 00102 void 00103 OpaqueClientRPC::waitForReply(TimePoint timeout) 00104 { 00105 std::unique_lock<std::mutex> mutexGuard(mutex); 00106 if (status != Status::NOT_READY) 00107 return; 00108 if (session) { 00109 { 00110 // release the mutex while calling wait() 00111 Core::MutexUnlock<std::mutex> unlockGuard(mutexGuard); 00112 session->wait(*this, timeout); 00113 } 00114 update(); 00115 } else { 00116 errorMessage = "This RPC was never associated with a ClientSession."; 00117 status = Status::ERROR; 00118 } 00119 } 00120 00121 ///// private methods ///// 00122 00123 void 00124 OpaqueClientRPC::update() 00125 { 00126 if (status == Status::NOT_READY && session) 00127 session->update(*this); 00128 } 00129 00130 ///// exported functions ///// 00131 00132 ::std::ostream& 00133 operator<<(::std::ostream& os, OpaqueClientRPC::Status status) 00134 { 00135 typedef OpaqueClientRPC::Status Status; 00136 switch (status) { 00137 case Status::NOT_READY: 00138 return os << "NOT_READY"; 00139 case Status::OK: 00140 return os << "OK"; 00141 case Status::ERROR: 00142 return os << "ERROR"; 00143 case Status::CANCELED: 00144 return os << "CANCELED"; 00145 default: 00146 return os << "(INVALID VALUE)"; 00147 } 00148 } 00149 00150 } // namespace LogCabin::RPC 00151 } // namespace LogCabin