LogCabin
RPC/MessageSocket.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <deque>
00018 #include <vector>
00019 
00020 #include "Core/Buffer.h"
00021 #include "Core/Mutex.h"
00022 #include "Event/File.h"
00023 
00024 #ifndef LOGCABIN_RPC_MESSAGESOCKET_H
00025 #define LOGCABIN_RPC_MESSAGESOCKET_H
00026 
00027 namespace LogCabin {
00028 
00029 // forward declaration
00030 namespace Event {
00031 class Loop;
00032 }
00033 
00034 namespace RPC {
00035 
00036 /**
00037  * A MessageSocket is a message-oriented layer on top of a TCP connection.
00038  * It sends and receives discrete chunks of data identified by opaque IDs.
00039  * Higher layers can use this to build an RPC framework, both on the client
00040  * side and on the server side.
00041  *
00042  * On the wire, this adds a 16-byte header on all messages:
00043  *     | 0xdaf4 | version | length | messageId |
00044  * See Header for more details. Following the header, the data is sent as an
00045  * opaque binary string.
00046  */
00047 class MessageSocket {
00048   public:
00049     /**
00050      * An opaque identifier for a message.
00051      * For RPCs, clients can use this to pair up a response with its request,
00052      * and servers will want to reply with the same ID as the matching request.
00053      */
00054     typedef uint64_t MessageId;
00055 
00056     /**
00057      * Largest version of the framing protocol supported by this code.
00058      */
00059     enum { MAX_VERSION_SUPPORTED = 1 };
00060 
00061     /**
00062      * An interface for handling events generated by a MessageSocket.
00063      * The Handler's lifetime must outlive that of the MessageSocket.
00064      */
00065     class Handler {
00066       public:
00067         typedef MessageSocket::MessageId MessageId;
00068 
00069         /**
00070          * Destructor.
00071          */
00072         virtual ~Handler() {}
00073 
00074         /**
00075          * This method is overridden by a subclass and invoked when a new
00076          * message is received. This method will be invoked by the main event
00077          * loop on whatever thread is running the Event::Loop.
00078          * \param messageId
00079          *      An opaque identifier for the message set by the sender.
00080          * \param contents
00081          *      The data received.
00082          */
00083         virtual void handleReceivedMessage(MessageId messageId,
00084                                            Core::Buffer contents) = 0;
00085 
00086         /**
00087          * This method is overridden by a subclass and invoked when the socket
00088          * has been disconnected and will be closed soon. It is safe to destroy
00089          * the MessageSocket during this call. This method will be invoked by
00090          * the main event loop at any time on whatever thread is running the
00091          * Event::Loop.
00092          *
00093          * Note that MessageSocket::close() calls this method as well, even if
00094          * it's already been called.
00095          */
00096         virtual void handleDisconnect() = 0;
00097     };
00098 
00099     /**
00100      * Constructor.
00101      * \param handler
00102      *      Handles received messages and disconnect events.
00103      * \param eventLoop
00104      *      Event::Loop that will be used to find out when the socket is
00105      *      readable or writable.
00106      * \param fd
00107      *      Connected file descriptor for the socket. This object will close
00108      *      the file descriptor when it is disconnected.
00109      * \param maxMessageLength
00110      *      The maximum number of bytes of payload to allow per message. This
00111      *      exists to limit the amount of buffer space a single socket can use.
00112      *      Attempting to send longer messages will PANIC; attempting to
00113      *      receive longer messages will disconnect the socket.
00114      */
00115     MessageSocket(Handler& handler,
00116                   Event::Loop& eventLoop,
00117                   int fd,
00118                   uint32_t maxMessageLength);
00119 
00120     /**
00121      * Destructor.
00122      */
00123     ~MessageSocket();
00124 
00125     /**
00126      * Used when the server wishes to close this socket. It invokes
00127      * Handler::handleDisconnect(), as if the client has disconnected. After
00128      * this returns, the Handler will not be called again.
00129      */
00130     void close();
00131 
00132     /**
00133      * Queue a message to be sent to the other side of this socket.
00134      * This method is safe to call from any thread.
00135      * \param messageId
00136      *      An opaque identifier for the message.
00137      * \param contents
00138      *      The data to send. This must be shorter than the maxMessageLength
00139      *      argument given to the constructor.
00140      */
00141     void sendMessage(MessageId messageId, Core::Buffer contents);
00142 
00143   private:
00144 
00145     /**
00146      * This class is an Event::File monitor that calls writable() when the
00147      * socket can be written to without blocking. When there are messages to be
00148      * sent out, it is set to EPOLLOUT|EPOLLONESHOT.
00149      *
00150      * Since ReceiveSocket is more efficient when one-shot is not used, and
00151      * SendSocket is more efficient when one-shot is used, the two are
00152      * monitored as separate Event::File objects.
00153      */
00154     struct SendSocket : public Event::File {
00155       public:
00156         SendSocket(int fd, MessageSocket& messageSocket);
00157         ~SendSocket();
00158         void handleFileEvent(uint32_t events);
00159       private:
00160         MessageSocket& messageSocket;
00161     };
00162 
00163     /**
00164      * This class is an Event::File monitor that calls readable() when the
00165      * socket can be read from without blocking. This is always set for EPOLLIN
00166      * events in a non-one-shot (persistent) manner.
00167      */
00168     struct ReceiveSocket : public Event::File {
00169       public:
00170         ReceiveSocket(int fd, MessageSocket& messageSocket);
00171         ~ReceiveSocket();
00172         void handleFileEvent(uint32_t events);
00173       private:
00174         MessageSocket& messageSocket;
00175     };
00176 
00177     /**
00178      * This is the header that precedes every message across the TCP socket.
00179      */
00180     struct Header {
00181         /**
00182          * Convert the contents to host order from big endian (how this header
00183          * should be transferred on the network).
00184          */
00185         void fromBigEndian();
00186         /**
00187          * Convert the contents to big endian (how this header should be
00188          * transferred on the network) from host order.
00189          */
00190         void toBigEndian();
00191 
00192         /**
00193          * The value 0xdaf4 encoded in big endian.
00194          */
00195         uint16_t fixed;
00196 
00197         /**
00198          * Currently only version 1 is defined and supported. Big endian.
00199          */
00200         uint16_t version;
00201 
00202         /**
00203          * The length in bytes of the contents of the message, not including
00204          * this header. Big endian.
00205          */
00206         uint32_t payloadLength;
00207 
00208         /**
00209          * A unique message ID assigned by the sender. Big endian.
00210          */
00211         uint64_t messageId;
00212     } __attribute__((packed));
00213 
00214     /**
00215      * This class stages a message while it is being received.
00216      */
00217     struct Inbound {
00218         /// Constructor.
00219         Inbound();
00220         /**
00221          * The number of bytes read for the message, including the header.
00222          */
00223         size_t bytesRead;
00224         /**
00225          * If bytesRead >= sizeof(header), the header has been fully received
00226          * and its fields are in host order. Otherwise, the header is still
00227          * being received here.
00228          */
00229         Header header;
00230         /**
00231          * The contents of the message (after the header) are staged here.
00232          */
00233         Core::Buffer message;
00234     };
00235 
00236     /**
00237      * This class stages a message while it is being sent.
00238      */
00239     struct Outbound {
00240         /// Default constructor.
00241         Outbound();
00242         /// Move constructor.
00243         Outbound(Outbound&& other);
00244         /// Constructor.
00245         Outbound(MessageId messageId, Core::Buffer message);
00246         /// Move assignment.
00247         Outbound& operator=(Outbound&& other);
00248         /**
00249          * The number of bytes already sent for this message, including the
00250          * header.
00251          */
00252         size_t bytesSent;
00253         /**
00254          * The message header, in big endian.
00255          */
00256         Header header;
00257         /**
00258          * The contents of the message (after the header).
00259          */
00260         Core::Buffer message;
00261     };
00262 
00263     /**
00264      * Cleans up and calls onDisconnect() when the socket has an error.
00265      * Only called from event loop handlers.
00266      */
00267     void disconnect();
00268 
00269     /**
00270      * Called when the socket has data that can be read without blocking.
00271      */
00272     void readable();
00273 
00274     /**
00275      * Wrapper around recv(); used by readable().
00276      * \param buf
00277      *      Where to store the data received.
00278      * \param maxBytes
00279      *      The maximum number of bytes to receive and store into buf.
00280      * \return
00281      *      The number of bytes read (<= maxBytes), if successful.
00282      *      The value -1 indicates that the socket was disconnected, in which
00283      *      case the caller must be careful not to access this object and
00284      *      immediately return.
00285      */
00286     ssize_t read(void* buf, size_t maxBytes);
00287 
00288     /**
00289      * Called when the socket may be written to without blocking.
00290      */
00291     void writable();
00292 
00293     /**
00294      * The maximum number of bytes of payload to allow per message. This exists
00295      * to limit the amount of buffer space a single socket can use.
00296      */
00297     const uint32_t maxMessageLength;
00298 
00299     /**
00300      * Deals with received messages and disconnects.
00301      */
00302     Handler& handler;
00303 
00304     /**
00305      * Used to find out when the socket is readable or writable.
00306      */
00307     Event::Loop& eventLoop;
00308 
00309     /**
00310      * The current message that is being received.
00311      */
00312     Inbound inbound;
00313 
00314     /**
00315      * Protects #outboundQueue only from concurrent modification.
00316      */
00317     Core::Mutex outboundQueueMutex;
00318 
00319     /**
00320      * A queue of messages waiting to be sent. The first one may be in the
00321      * middle of transmission, while the others have not yet started. This
00322      * queue is protected from concurrent modifications by #outboundQueueMutex.
00323      *
00324      * It's important that this remains a std::deque (or std::queue) because
00325      * writable() holds a pointer to the first element without the lock, while
00326      * sendMessage() may concurrently push onto the queue. std::deques are
00327      * guaranteed not to invalidate pointers while elements are pushed and
00328      * popped from the ends.
00329      */
00330     std::deque<Outbound> outboundQueue;
00331 
00332     /**
00333      * Notifies MessageSocket when the socket can be read from without
00334      * blocking.
00335      */
00336     ReceiveSocket receiveSocket;
00337 
00338     /**
00339      * Notifies MessageSocket when the socket can be transmitted on without
00340      * blocking.
00341      */
00342     SendSocket sendSocket;
00343 
00344     /**
00345      * Registers receiveSocket with the event loop.
00346      */
00347     Event::File::Monitor receiveSocketMonitor;
00348 
00349     /**
00350      * Registers sendSocket with the event loop.
00351      */
00352     Event::File::Monitor sendSocketMonitor;
00353 
00354     // MessageSocket is non-copyable.
00355     MessageSocket(const MessageSocket&) = delete;
00356     MessageSocket& operator=(const MessageSocket&) = delete;
00357 
00358 }; // class MessageSocket
00359 
00360 } // namespace LogCabin::RPC
00361 } // namespace LogCabin
00362 
00363 #endif /* LOGCABIN_RPC_MESSAGESOCKET_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines