LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <deque> 00018 #include <vector> 00019 00020 #include "Core/Buffer.h" 00021 #include "Core/Mutex.h" 00022 #include "Event/File.h" 00023 00024 #ifndef LOGCABIN_RPC_MESSAGESOCKET_H 00025 #define LOGCABIN_RPC_MESSAGESOCKET_H 00026 00027 namespace LogCabin { 00028 00029 // forward declaration 00030 namespace Event { 00031 class Loop; 00032 } 00033 00034 namespace RPC { 00035 00036 /** 00037 * A MessageSocket is a message-oriented layer on top of a TCP connection. 00038 * It sends and receives discrete chunks of data identified by opaque IDs. 00039 * Higher layers can use this to build an RPC framework, both on the client 00040 * side and on the server side. 00041 * 00042 * On the wire, this adds a 16-byte header on all messages: 00043 * | 0xdaf4 | version | length | messageId | 00044 * See Header for more details. Following the header, the data is sent as an 00045 * opaque binary string. 00046 */ 00047 class MessageSocket { 00048 public: 00049 /** 00050 * An opaque identifier for a message. 00051 * For RPCs, clients can use this to pair up a response with its request, 00052 * and servers will want to reply with the same ID as the matching request. 00053 */ 00054 typedef uint64_t MessageId; 00055 00056 /** 00057 * Largest version of the framing protocol supported by this code. 00058 */ 00059 enum { MAX_VERSION_SUPPORTED = 1 }; 00060 00061 /** 00062 * An interface for handling events generated by a MessageSocket. 00063 * The Handler's lifetime must outlive that of the MessageSocket. 00064 */ 00065 class Handler { 00066 public: 00067 typedef MessageSocket::MessageId MessageId; 00068 00069 /** 00070 * Destructor. 00071 */ 00072 virtual ~Handler() {} 00073 00074 /** 00075 * This method is overridden by a subclass and invoked when a new 00076 * message is received. This method will be invoked by the main event 00077 * loop on whatever thread is running the Event::Loop. 00078 * \param messageId 00079 * An opaque identifier for the message set by the sender. 00080 * \param contents 00081 * The data received. 00082 */ 00083 virtual void handleReceivedMessage(MessageId messageId, 00084 Core::Buffer contents) = 0; 00085 00086 /** 00087 * This method is overridden by a subclass and invoked when the socket 00088 * has been disconnected and will be closed soon. It is safe to destroy 00089 * the MessageSocket during this call. This method will be invoked by 00090 * the main event loop at any time on whatever thread is running the 00091 * Event::Loop. 00092 * 00093 * Note that MessageSocket::close() calls this method as well, even if 00094 * it's already been called. 00095 */ 00096 virtual void handleDisconnect() = 0; 00097 }; 00098 00099 /** 00100 * Constructor. 00101 * \param handler 00102 * Handles received messages and disconnect events. 00103 * \param eventLoop 00104 * Event::Loop that will be used to find out when the socket is 00105 * readable or writable. 00106 * \param fd 00107 * Connected file descriptor for the socket. This object will close 00108 * the file descriptor when it is disconnected. 00109 * \param maxMessageLength 00110 * The maximum number of bytes of payload to allow per message. This 00111 * exists to limit the amount of buffer space a single socket can use. 00112 * Attempting to send longer messages will PANIC; attempting to 00113 * receive longer messages will disconnect the socket. 00114 */ 00115 MessageSocket(Handler& handler, 00116 Event::Loop& eventLoop, 00117 int fd, 00118 uint32_t maxMessageLength); 00119 00120 /** 00121 * Destructor. 00122 */ 00123 ~MessageSocket(); 00124 00125 /** 00126 * Used when the server wishes to close this socket. It invokes 00127 * Handler::handleDisconnect(), as if the client has disconnected. After 00128 * this returns, the Handler will not be called again. 00129 */ 00130 void close(); 00131 00132 /** 00133 * Queue a message to be sent to the other side of this socket. 00134 * This method is safe to call from any thread. 00135 * \param messageId 00136 * An opaque identifier for the message. 00137 * \param contents 00138 * The data to send. This must be shorter than the maxMessageLength 00139 * argument given to the constructor. 00140 */ 00141 void sendMessage(MessageId messageId, Core::Buffer contents); 00142 00143 private: 00144 00145 /** 00146 * This class is an Event::File monitor that calls writable() when the 00147 * socket can be written to without blocking. When there are messages to be 00148 * sent out, it is set to EPOLLOUT|EPOLLONESHOT. 00149 * 00150 * Since ReceiveSocket is more efficient when one-shot is not used, and 00151 * SendSocket is more efficient when one-shot is used, the two are 00152 * monitored as separate Event::File objects. 00153 */ 00154 struct SendSocket : public Event::File { 00155 public: 00156 SendSocket(int fd, MessageSocket& messageSocket); 00157 ~SendSocket(); 00158 void handleFileEvent(uint32_t events); 00159 private: 00160 MessageSocket& messageSocket; 00161 }; 00162 00163 /** 00164 * This class is an Event::File monitor that calls readable() when the 00165 * socket can be read from without blocking. This is always set for EPOLLIN 00166 * events in a non-one-shot (persistent) manner. 00167 */ 00168 struct ReceiveSocket : public Event::File { 00169 public: 00170 ReceiveSocket(int fd, MessageSocket& messageSocket); 00171 ~ReceiveSocket(); 00172 void handleFileEvent(uint32_t events); 00173 private: 00174 MessageSocket& messageSocket; 00175 }; 00176 00177 /** 00178 * This is the header that precedes every message across the TCP socket. 00179 */ 00180 struct Header { 00181 /** 00182 * Convert the contents to host order from big endian (how this header 00183 * should be transferred on the network). 00184 */ 00185 void fromBigEndian(); 00186 /** 00187 * Convert the contents to big endian (how this header should be 00188 * transferred on the network) from host order. 00189 */ 00190 void toBigEndian(); 00191 00192 /** 00193 * The value 0xdaf4 encoded in big endian. 00194 */ 00195 uint16_t fixed; 00196 00197 /** 00198 * Currently only version 1 is defined and supported. Big endian. 00199 */ 00200 uint16_t version; 00201 00202 /** 00203 * The length in bytes of the contents of the message, not including 00204 * this header. Big endian. 00205 */ 00206 uint32_t payloadLength; 00207 00208 /** 00209 * A unique message ID assigned by the sender. Big endian. 00210 */ 00211 uint64_t messageId; 00212 } __attribute__((packed)); 00213 00214 /** 00215 * This class stages a message while it is being received. 00216 */ 00217 struct Inbound { 00218 /// Constructor. 00219 Inbound(); 00220 /** 00221 * The number of bytes read for the message, including the header. 00222 */ 00223 size_t bytesRead; 00224 /** 00225 * If bytesRead >= sizeof(header), the header has been fully received 00226 * and its fields are in host order. Otherwise, the header is still 00227 * being received here. 00228 */ 00229 Header header; 00230 /** 00231 * The contents of the message (after the header) are staged here. 00232 */ 00233 Core::Buffer message; 00234 }; 00235 00236 /** 00237 * This class stages a message while it is being sent. 00238 */ 00239 struct Outbound { 00240 /// Default constructor. 00241 Outbound(); 00242 /// Move constructor. 00243 Outbound(Outbound&& other); 00244 /// Constructor. 00245 Outbound(MessageId messageId, Core::Buffer message); 00246 /// Move assignment. 00247 Outbound& operator=(Outbound&& other); 00248 /** 00249 * The number of bytes already sent for this message, including the 00250 * header. 00251 */ 00252 size_t bytesSent; 00253 /** 00254 * The message header, in big endian. 00255 */ 00256 Header header; 00257 /** 00258 * The contents of the message (after the header). 00259 */ 00260 Core::Buffer message; 00261 }; 00262 00263 /** 00264 * Cleans up and calls onDisconnect() when the socket has an error. 00265 * Only called from event loop handlers. 00266 */ 00267 void disconnect(); 00268 00269 /** 00270 * Called when the socket has data that can be read without blocking. 00271 */ 00272 void readable(); 00273 00274 /** 00275 * Wrapper around recv(); used by readable(). 00276 * \param buf 00277 * Where to store the data received. 00278 * \param maxBytes 00279 * The maximum number of bytes to receive and store into buf. 00280 * \return 00281 * The number of bytes read (<= maxBytes), if successful. 00282 * The value -1 indicates that the socket was disconnected, in which 00283 * case the caller must be careful not to access this object and 00284 * immediately return. 00285 */ 00286 ssize_t read(void* buf, size_t maxBytes); 00287 00288 /** 00289 * Called when the socket may be written to without blocking. 00290 */ 00291 void writable(); 00292 00293 /** 00294 * The maximum number of bytes of payload to allow per message. This exists 00295 * to limit the amount of buffer space a single socket can use. 00296 */ 00297 const uint32_t maxMessageLength; 00298 00299 /** 00300 * Deals with received messages and disconnects. 00301 */ 00302 Handler& handler; 00303 00304 /** 00305 * Used to find out when the socket is readable or writable. 00306 */ 00307 Event::Loop& eventLoop; 00308 00309 /** 00310 * The current message that is being received. 00311 */ 00312 Inbound inbound; 00313 00314 /** 00315 * Protects #outboundQueue only from concurrent modification. 00316 */ 00317 Core::Mutex outboundQueueMutex; 00318 00319 /** 00320 * A queue of messages waiting to be sent. The first one may be in the 00321 * middle of transmission, while the others have not yet started. This 00322 * queue is protected from concurrent modifications by #outboundQueueMutex. 00323 * 00324 * It's important that this remains a std::deque (or std::queue) because 00325 * writable() holds a pointer to the first element without the lock, while 00326 * sendMessage() may concurrently push onto the queue. std::deques are 00327 * guaranteed not to invalidate pointers while elements are pushed and 00328 * popped from the ends. 00329 */ 00330 std::deque<Outbound> outboundQueue; 00331 00332 /** 00333 * Notifies MessageSocket when the socket can be read from without 00334 * blocking. 00335 */ 00336 ReceiveSocket receiveSocket; 00337 00338 /** 00339 * Notifies MessageSocket when the socket can be transmitted on without 00340 * blocking. 00341 */ 00342 SendSocket sendSocket; 00343 00344 /** 00345 * Registers receiveSocket with the event loop. 00346 */ 00347 Event::File::Monitor receiveSocketMonitor; 00348 00349 /** 00350 * Registers sendSocket with the event loop. 00351 */ 00352 Event::File::Monitor sendSocketMonitor; 00353 00354 // MessageSocket is non-copyable. 00355 MessageSocket(const MessageSocket&) = delete; 00356 MessageSocket& operator=(const MessageSocket&) = delete; 00357 00358 }; // class MessageSocket 00359 00360 } // namespace LogCabin::RPC 00361 } // namespace LogCabin 00362 00363 #endif /* LOGCABIN_RPC_MESSAGESOCKET_H */