LogCabin
|
00001 /* Copyright (c) 2011-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <deque> 00018 #include <memory> 00019 #include <string> 00020 #include <unordered_set> 00021 00022 #include "Core/CompatHash.h" 00023 #include "RPC/MessageSocket.h" 00024 00025 #ifndef LOGCABIN_RPC_OPAQUESERVER_H 00026 #define LOGCABIN_RPC_OPAQUESERVER_H 00027 00028 namespace LogCabin { 00029 00030 // forward declaration 00031 namespace Core { 00032 class Buffer; 00033 }; 00034 00035 // forward declaration 00036 namespace Event { 00037 class Loop; 00038 }; 00039 00040 namespace RPC { 00041 00042 // forward declarations 00043 class Address; 00044 class OpaqueServerRPC; 00045 00046 /** 00047 * An OpaqueServer listens for incoming RPCs over TCP connections. 00048 * OpaqueServers can be created from any thread, but they will always run on 00049 * the thread running the Event::Loop. 00050 */ 00051 class OpaqueServer { 00052 public: 00053 00054 /** 00055 * An interface for handling events generated by an OpaqueServer. 00056 * The Handler's lifetime must outlive that of the OpaqueServer. 00057 */ 00058 class Handler { 00059 public: 00060 /** 00061 * Destructor. 00062 */ 00063 virtual ~Handler() {} 00064 00065 /** 00066 * This method is overridden by a subclass and invoked when a new RPC 00067 * arrives. This will be called from the Event::Loop thread, so it must 00068 * return quickly. It should call OpaqueServerRPC::sendReply() if and 00069 * when it wants to respond to the RPC request. 00070 */ 00071 virtual void handleRPC(OpaqueServerRPC serverRPC) = 0; 00072 }; 00073 00074 /** 00075 * Constructor. This object won't actually do anything until bind() is 00076 * called. 00077 * \param handler 00078 * Handles inbound RPCs. 00079 * \param eventLoop 00080 * Event::Loop that will be used to find out when the underlying 00081 * socket may be read from or written to without blocking. 00082 * \param maxMessageLength 00083 * The maximum number of bytes to allow per request/response. This 00084 * exists to limit the amount of buffer space a single RPC can use. 00085 * Attempting to send longer responses will PANIC; attempting to 00086 * receive longer requests will disconnect the underlying socket. 00087 */ 00088 OpaqueServer(Handler& handler, 00089 Event::Loop& eventLoop, 00090 uint32_t maxMessageLength); 00091 00092 /** 00093 * Destructor. OpaqueServerRPC objects originating from this OpaqueServer 00094 * may be kept around after this destructor returns; however, they won't 00095 * actually send replies anymore. 00096 */ 00097 ~OpaqueServer(); 00098 00099 /** 00100 * Listen on an address for new client connections. You can call this 00101 * multiple times to listen on multiple addresses. (But if you call this 00102 * twice with the same address, the second time will always throw an 00103 * error.) 00104 * This method is thread-safe. 00105 * \param listenAddress 00106 * The TCP address on listen for new client connections. 00107 * \return 00108 * An error message if this was not able to listen on the given 00109 * address; the empty string otherwise. 00110 */ 00111 std::string bind(const Address& listenAddress); 00112 00113 private: 00114 00115 // forward declaration 00116 struct SocketWithHandler; 00117 00118 /** 00119 * Receives events from a MessageSocket. 00120 */ 00121 class MessageSocketHandler : public MessageSocket::Handler { 00122 public: 00123 explicit MessageSocketHandler(OpaqueServer* server); 00124 void handleReceivedMessage(MessageId messageId, Core::Buffer message); 00125 void handleDisconnect(); 00126 00127 /** 00128 * The OpaqueServer which keeps a strong reference to this object, or 00129 * NULL if the server has been/is being destroyed. Used to invoke the 00130 * server's rpcHandler when receiving an RPC request, or to drop the 00131 * server's reference to this socket when disconnecting. 00132 * 00133 * May only be accessed with an Event::Loop::Lock or from the event 00134 * loop, since the OpaqueServer may set this to NULL under the same 00135 * rules. 00136 */ 00137 OpaqueServer* server; 00138 00139 /** 00140 * A weak reference to this object, used to give OpaqueServerRPCs a way 00141 * to send their replies back on their originating socket. 00142 * This may be empty when the SocketWithHandler is shutting down. 00143 */ 00144 std::weak_ptr<SocketWithHandler> self; 00145 00146 // MessageSocketHandler is not copyable. 00147 MessageSocketHandler(const MessageSocketHandler&) = delete; 00148 MessageSocketHandler& operator=(const MessageSocketHandler&) = delete; 00149 }; 00150 00151 /** 00152 * Couples a MessageSocketHandler with a MessageSocket (monitor) and 00153 * destroys them in the right order (monitor first). 00154 * 00155 * This class is reference-counted with std::shared_ptr. Usually, one 00156 * strong reference exists in OpaqueServer::sockets, which keeps this 00157 * object alive. Weak references exist OpaqueServerRPC objects and in 00158 * MessageSocketHandler::self (to copy into OpaqueServerRPC objects). 00159 */ 00160 struct SocketWithHandler { 00161 public: 00162 /** 00163 * Return a newly constructed SocketWithHandler, with the handler's 00164 * self field pointing to itself. 00165 * \param server 00166 * Server that owns this object. Held by MessageSocketHandler. 00167 * \param fd 00168 * TCP connection with client for MessageSocket. 00169 */ 00170 static std::shared_ptr<SocketWithHandler> 00171 make(OpaqueServer* server, int fd); 00172 00173 ~SocketWithHandler(); 00174 MessageSocketHandler handler; 00175 MessageSocket monitor; 00176 00177 private: 00178 SocketWithHandler(OpaqueServer* server, int fd); 00179 }; 00180 00181 /** 00182 * A socket that listens on a particular address. 00183 */ 00184 class BoundListener : public Event::File { 00185 public: 00186 /** 00187 * Constructor. 00188 * \param server 00189 * OpaqueServer that owns this object. 00190 * \param fd 00191 * The underlying socket that is listening on a particular 00192 * address. 00193 */ 00194 BoundListener(OpaqueServer& server, int fd); 00195 void handleFileEvent(uint32_t events); 00196 OpaqueServer& server; 00197 }; 00198 00199 /** 00200 * Couples a BoundListener with an Event::File::Monitor and destroys them 00201 * in the right order (monitor first). 00202 */ 00203 struct BoundListenerWithMonitor { 00204 /// Constructor. See BoundListener. 00205 BoundListenerWithMonitor(OpaqueServer& server, int fd); 00206 00207 /// Destructor. 00208 ~BoundListenerWithMonitor(); 00209 /** 00210 * This creates a new SocketWithHandler instance upon getting a new 00211 * connection. 00212 */ 00213 BoundListener handler; 00214 /** 00215 * This listens for incoming TCP connections and calls 'handler' with 00216 * them. 00217 */ 00218 Event::File::Monitor monitor; 00219 }; 00220 00221 /** 00222 * Deals with OpaqueServerRPC objects that this class creates when it 00223 * receives a request. 00224 */ 00225 Handler& rpcHandler; 00226 00227 /** 00228 * The event loop that is used for non-blocking I/O. 00229 */ 00230 Event::Loop& eventLoop; 00231 00232 /** 00233 * The maximum number of bytes to allow per request/response. 00234 */ 00235 const uint32_t maxMessageLength; 00236 00237 /** 00238 * Every open socket is referenced here so that it can be cleaned up when 00239 * this OpaqueServer is destroyed. These are reference-counted: the 00240 * lifetime of each socket may slightly exceed the lifetime of the 00241 * OpaqueServer if it is being actively used to send out a OpaqueServerRPC 00242 * response when the OpaqueServer is destroyed. 00243 * 00244 * This may only be accessed from the Event::Loop or while holding an 00245 * Event::Loop::Lock (it's almost entirely accessed from event handlers, so 00246 * it's convenient to rely on the Event::Loop::Lock for mutual exclusion 00247 * during OpauqeServer's destructor as well). 00248 */ 00249 std::unordered_set<std::shared_ptr<SocketWithHandler>> sockets; 00250 00251 /** 00252 * Lock to prevent concurrent modification of #boundListeners. 00253 */ 00254 Core::Mutex boundListenersMutex; 00255 00256 /** 00257 * A list of listening sockets that each listen on a particular address. 00258 * std::deque is used so that the handler objects have a stable memory 00259 * location (otherwise their monitors would blow up). 00260 */ 00261 std::deque<BoundListenerWithMonitor> boundListeners; 00262 00263 /** 00264 * OpaqueServerRPC keeps a std::weak_ptr back to its originating 00265 * ServerMessageSocket. 00266 */ 00267 friend class OpaqueServerRPC; 00268 00269 // OpaqueServer is non-copyable. 00270 OpaqueServer(const OpaqueServer&) = delete; 00271 OpaqueServer& operator=(const OpaqueServer&) = delete; 00272 }; // class OpaqueServer 00273 00274 } // namespace LogCabin::RPC 00275 } // namespace LogCabin 00276 00277 #endif /* LOGCABIN_RPC_OPAQUESERVER_H */