LogCabin
RPC/OpaqueServer.h
Go to the documentation of this file.
00001 /* Copyright (c) 2011-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <deque>
00018 #include <memory>
00019 #include <string>
00020 #include <unordered_set>
00021 
00022 #include "Core/CompatHash.h"
00023 #include "RPC/MessageSocket.h"
00024 
00025 #ifndef LOGCABIN_RPC_OPAQUESERVER_H
00026 #define LOGCABIN_RPC_OPAQUESERVER_H
00027 
00028 namespace LogCabin {
00029 
00030 // forward declaration
00031 namespace Core {
00032 class Buffer;
00033 };
00034 
00035 // forward declaration
00036 namespace Event {
00037 class Loop;
00038 };
00039 
00040 namespace RPC {
00041 
00042 // forward declarations
00043 class Address;
00044 class OpaqueServerRPC;
00045 
00046 /**
00047  * An OpaqueServer listens for incoming RPCs over TCP connections.
00048  * OpaqueServers can be created from any thread, but they will always run on
00049  * the thread running the Event::Loop.
00050  */
00051 class OpaqueServer {
00052   public:
00053 
00054     /**
00055      * An interface for handling events generated by an OpaqueServer.
00056      * The Handler's lifetime must outlive that of the OpaqueServer.
00057      */
00058     class Handler {
00059       public:
00060         /**
00061          * Destructor.
00062          */
00063         virtual ~Handler() {}
00064 
00065         /**
00066          * This method is overridden by a subclass and invoked when a new RPC
00067          * arrives. This will be called from the Event::Loop thread, so it must
00068          * return quickly. It should call OpaqueServerRPC::sendReply() if and
00069          * when it wants to respond to the RPC request.
00070          */
00071         virtual void handleRPC(OpaqueServerRPC serverRPC) = 0;
00072     };
00073 
00074     /**
00075      * Constructor. This object won't actually do anything until bind() is
00076      * called.
00077      * \param handler
00078      *      Handles inbound RPCs.
00079      * \param eventLoop
00080      *      Event::Loop that will be used to find out when the underlying
00081      *      socket may be read from or written to without blocking.
00082      * \param maxMessageLength
00083      *      The maximum number of bytes to allow per request/response. This
00084      *      exists to limit the amount of buffer space a single RPC can use.
00085      *      Attempting to send longer responses will PANIC; attempting to
00086      *      receive longer requests will disconnect the underlying socket.
00087      */
00088     OpaqueServer(Handler& handler,
00089                  Event::Loop& eventLoop,
00090                  uint32_t maxMessageLength);
00091 
00092     /**
00093      * Destructor. OpaqueServerRPC objects originating from this OpaqueServer
00094      * may be kept around after this destructor returns; however, they won't
00095      * actually send replies anymore.
00096      */
00097     ~OpaqueServer();
00098 
00099     /**
00100      * Listen on an address for new client connections. You can call this
00101      * multiple times to listen on multiple addresses. (But if you call this
00102      * twice with the same address, the second time will always throw an
00103      * error.)
00104      * This method is thread-safe.
00105      * \param listenAddress
00106      *      The TCP address on listen for new client connections.
00107      * \return
00108      *      An error message if this was not able to listen on the given
00109      *      address; the empty string otherwise.
00110      */
00111     std::string bind(const Address& listenAddress);
00112 
00113   private:
00114 
00115     // forward declaration
00116     struct SocketWithHandler;
00117 
00118     /**
00119      * Receives events from a MessageSocket.
00120      */
00121     class MessageSocketHandler : public MessageSocket::Handler {
00122       public:
00123         explicit MessageSocketHandler(OpaqueServer* server);
00124         void handleReceivedMessage(MessageId messageId, Core::Buffer message);
00125         void handleDisconnect();
00126 
00127         /**
00128          * The OpaqueServer which keeps a strong reference to this object, or
00129          * NULL if the server has been/is being destroyed. Used to invoke the
00130          * server's rpcHandler when receiving an RPC request, or to drop the
00131          * server's reference to this socket when disconnecting.
00132          *
00133          * May only be accessed with an Event::Loop::Lock or from the event
00134          * loop, since the OpaqueServer may set this to NULL under the same
00135          * rules.
00136          */
00137         OpaqueServer* server;
00138 
00139         /**
00140          * A weak reference to this object, used to give OpaqueServerRPCs a way
00141          * to send their replies back on their originating socket.
00142          * This may be empty when the SocketWithHandler is shutting down.
00143          */
00144         std::weak_ptr<SocketWithHandler> self;
00145 
00146         // MessageSocketHandler is not copyable.
00147         MessageSocketHandler(const MessageSocketHandler&) = delete;
00148         MessageSocketHandler& operator=(const MessageSocketHandler&) = delete;
00149     };
00150 
00151     /**
00152      * Couples a MessageSocketHandler with a MessageSocket (monitor) and
00153      * destroys them in the right order (monitor first).
00154      *
00155      * This class is reference-counted with std::shared_ptr. Usually, one
00156      * strong reference exists in OpaqueServer::sockets, which keeps this
00157      * object alive. Weak references exist OpaqueServerRPC objects and in
00158      * MessageSocketHandler::self (to copy into OpaqueServerRPC objects).
00159      */
00160     struct SocketWithHandler {
00161       public:
00162         /**
00163          * Return a newly constructed SocketWithHandler, with the handler's
00164          * self field pointing to itself.
00165          * \param server
00166          *      Server that owns this object. Held by MessageSocketHandler.
00167          * \param fd
00168          *      TCP connection with client for MessageSocket.
00169          */
00170         static std::shared_ptr<SocketWithHandler>
00171         make(OpaqueServer* server, int fd);
00172 
00173         ~SocketWithHandler();
00174         MessageSocketHandler handler;
00175         MessageSocket monitor;
00176 
00177       private:
00178         SocketWithHandler(OpaqueServer* server, int fd);
00179     };
00180 
00181     /**
00182      * A socket that listens on a particular address.
00183      */
00184     class BoundListener : public Event::File {
00185       public:
00186         /**
00187          * Constructor.
00188          * \param server
00189          *      OpaqueServer that owns this object.
00190          * \param fd
00191          *      The underlying socket that is listening on a particular
00192          *      address.
00193          */
00194         BoundListener(OpaqueServer& server, int fd);
00195         void handleFileEvent(uint32_t events);
00196         OpaqueServer& server;
00197     };
00198 
00199     /**
00200      * Couples a BoundListener with an Event::File::Monitor and destroys them
00201      * in the right order (monitor first).
00202      */
00203     struct BoundListenerWithMonitor {
00204         /// Constructor. See BoundListener.
00205         BoundListenerWithMonitor(OpaqueServer& server, int fd);
00206 
00207         /// Destructor.
00208         ~BoundListenerWithMonitor();
00209         /**
00210          * This creates a new SocketWithHandler instance upon getting a new
00211          * connection.
00212          */
00213         BoundListener handler;
00214         /**
00215          * This listens for incoming TCP connections and calls 'handler' with
00216          * them.
00217          */
00218         Event::File::Monitor monitor;
00219     };
00220 
00221     /**
00222      * Deals with OpaqueServerRPC objects that this class creates when it
00223      * receives a request.
00224      */
00225     Handler& rpcHandler;
00226 
00227     /**
00228      * The event loop that is used for non-blocking I/O.
00229      */
00230     Event::Loop& eventLoop;
00231 
00232     /**
00233      * The maximum number of bytes to allow per request/response.
00234      */
00235     const uint32_t maxMessageLength;
00236 
00237     /**
00238      * Every open socket is referenced here so that it can be cleaned up when
00239      * this OpaqueServer is destroyed. These are reference-counted: the
00240      * lifetime of each socket may slightly exceed the lifetime of the
00241      * OpaqueServer if it is being actively used to send out a OpaqueServerRPC
00242      * response when the OpaqueServer is destroyed.
00243      *
00244      * This may only be accessed from the Event::Loop or while holding an
00245      * Event::Loop::Lock (it's almost entirely accessed from event handlers, so
00246      * it's convenient to rely on the Event::Loop::Lock for mutual exclusion
00247      * during OpauqeServer's destructor as well).
00248      */
00249     std::unordered_set<std::shared_ptr<SocketWithHandler>> sockets;
00250 
00251     /**
00252      * Lock to prevent concurrent modification of #boundListeners.
00253      */
00254     Core::Mutex boundListenersMutex;
00255 
00256     /**
00257      * A list of listening sockets that each listen on a particular address.
00258      * std::deque is used so that the handler objects have a stable memory
00259      * location (otherwise their monitors would blow up).
00260      */
00261     std::deque<BoundListenerWithMonitor> boundListeners;
00262 
00263     /**
00264      * OpaqueServerRPC keeps a std::weak_ptr back to its originating
00265      * ServerMessageSocket.
00266      */
00267     friend class OpaqueServerRPC;
00268 
00269     // OpaqueServer is non-copyable.
00270     OpaqueServer(const OpaqueServer&) = delete;
00271     OpaqueServer& operator=(const OpaqueServer&) = delete;
00272 }; // class OpaqueServer
00273 
00274 } // namespace LogCabin::RPC
00275 } // namespace LogCabin
00276 
00277 #endif /* LOGCABIN_RPC_OPAQUESERVER_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines