LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2014 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <cinttypes> 00018 #include <memory> 00019 #include <mutex> 00020 #include <stdexcept> 00021 #include <string> 00022 00023 #include "Core/Buffer.h" 00024 #include "Core/Time.h" 00025 00026 #ifndef LOGCABIN_RPC_OPAQUECLIENTRPC_H 00027 #define LOGCABIN_RPC_OPAQUECLIENTRPC_H 00028 00029 namespace LogCabin { 00030 namespace RPC { 00031 00032 class ClientSession; // forward declaration 00033 00034 /** 00035 * This class represents an asynchronous remote procedure call. A ClientSession 00036 * returns an instance when an RPC is initiated; this can be used to wait for 00037 * and retrieve the reply. 00038 */ 00039 class OpaqueClientRPC { 00040 public: 00041 /// Clock used for timeouts. 00042 typedef Core::Time::SteadyClock Clock; 00043 /// Type for absolute time values used for timeouts. 00044 typedef Clock::time_point TimePoint; 00045 00046 /** 00047 * State of the RPC. 00048 */ 00049 enum class Status { 00050 /** 00051 * The RPC is still in progress. 00052 */ 00053 NOT_READY, 00054 /** 00055 * The RPC has completed successfully. 00056 */ 00057 OK, 00058 /** 00059 * The RPC has failed with an error (see #getErrorMessage()). 00060 */ 00061 ERROR, 00062 /** 00063 * The RPC was aborted using #cancel(). 00064 */ 00065 CANCELED, 00066 }; 00067 00068 /** 00069 * Default constructor. This doesn't create a valid RPC, but it is useful 00070 * as a placeholder. 00071 */ 00072 OpaqueClientRPC(); 00073 00074 /** 00075 * Move constructor. 00076 */ 00077 OpaqueClientRPC(OpaqueClientRPC&&); 00078 00079 /** 00080 * Destructor. 00081 */ 00082 ~OpaqueClientRPC(); 00083 00084 /** 00085 * Move assignment. 00086 */ 00087 OpaqueClientRPC& operator=(OpaqueClientRPC&&); 00088 00089 /** 00090 * Abort the RPC. 00091 * The caller is no longer interested in its reply. 00092 */ 00093 void cancel(); 00094 00095 /** 00096 * If an error has occurred, return a message describing that error. 00097 * 00098 * All errors indicate that it is unknown whether or not the server 00099 * executed the RPC. Unless the RPC was canceled with #cancel(), the 00100 * ClientSession has been disconnected and is no longer useful for 00101 * initiating new RPCs. 00102 * 00103 * \return 00104 * If an error has occurred, a message describing that error. 00105 * Otherwise, an empty string. 00106 */ 00107 std::string getErrorMessage() const; 00108 00109 /** 00110 * See #Status. 00111 */ 00112 Status getStatus() const; 00113 00114 /** 00115 * Look at the reply buffer. 00116 * 00117 * \return 00118 * If the reply is already available and there were no errors, returns 00119 * a pointer to the reply buffer inside this OpaqueClientRPC object. 00120 * Otherwise, returns NULL. 00121 */ 00122 Core::Buffer* peekReply(); 00123 00124 /** 00125 * Block until the reply is ready, an error has occurred, or the given 00126 * timeout elapses. 00127 * 00128 * This may be used from worker threads only, because OpaqueClientRPC 00129 * objects rely on the event loop servicing their ClientSession in order to 00130 * make progress. 00131 * 00132 * \param timeout 00133 * After this time has elapsed, stop waiting and return. The RPC's 00134 * results will probably not be available yet in this case (status 00135 * will be NOT_READY). 00136 */ 00137 void waitForReply(TimePoint timeout); 00138 00139 private: 00140 00141 /** 00142 * Update the fields of this object if the RPC has not completed. 00143 * Must be called with the lock held. 00144 */ 00145 void update(); 00146 00147 /** 00148 * Protects all the members of this class. 00149 */ 00150 mutable std::mutex mutex; 00151 00152 /** 00153 * The session on which this RPC is executing. 00154 * The session itself will reset this field once the reply has been 00155 * received to eagerly drop its own reference count. 00156 */ 00157 std::shared_ptr<ClientSession> session; 00158 00159 /** 00160 * A token given to the session to look up new information about the 00161 * progress of this RPC's reply. 00162 */ 00163 uint64_t responseToken; 00164 00165 /** 00166 * See #Status. 00167 */ 00168 Status status; 00169 00170 /** 00171 * The payload of a successful reply, once available. 00172 * This becomes valid when #status is OK. 00173 */ 00174 Core::Buffer reply; 00175 00176 /** 00177 * If an error occurred in the RPC then this holds the error message; 00178 * otherwise, this is the empty string. 00179 */ 00180 std::string errorMessage; 00181 00182 // The ClientSession class fills in the members of this object. 00183 friend class ClientSession; 00184 00185 // OpaqueClientRPC is non-copyable. 00186 OpaqueClientRPC(const OpaqueClientRPC&) = delete; 00187 OpaqueClientRPC& operator=(const OpaqueClientRPC&) = delete; 00188 00189 }; // class OpaqueClientRPC 00190 00191 /** 00192 * Output an OpaqueClientRPC::Status to a stream. 00193 * This is helpful for google test output. 00194 */ 00195 ::std::ostream& 00196 operator<<(::std::ostream& os, OpaqueClientRPC::Status status); 00197 00198 00199 } // namespace LogCabin::RPC 00200 } // namespace LogCabin 00201 00202 #endif /* LOGCABIN_RPC_OPAQUECLIENTRPC_H */