LogCabin
RPC/Server.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <cinttypes>
00018 #include <memory>
00019 #include <mutex>
00020 #include <unordered_map>
00021 
00022 #include "RPC/OpaqueServer.h"
00023 #include "RPC/Service.h"
00024 
00025 #ifndef LOGCABIN_RPC_SERVER_H
00026 #define LOGCABIN_RPC_SERVER_H
00027 
00028 namespace LogCabin {
00029 namespace RPC {
00030 
00031 /**
00032  * A Server listens for incoming RPCs over TCP connections and dispatches these
00033  * to Services.
00034  * Servers can be created from any thread, but they will always run on
00035  * the thread running the Event::Loop. Services will always run on a thread
00036  * pool.
00037  */
00038 class Server {
00039   public:
00040     /**
00041      * Constructor. This object won't actually do anything until bind() is
00042      * called.
00043      * \param eventLoop
00044      *      Event::Loop that will be used to find out when the underlying
00045      *      socket may be read from or written to without blocking.
00046      * \param maxMessageLength
00047      *      The maximum number of bytes to allow per request/response. This
00048      *      exists to limit the amount of buffer space a single RPC can use.
00049      *      Attempting to send longer responses will PANIC; attempting to
00050      *      receive longer requests will disconnect the underlying socket.
00051      */
00052     Server(Event::Loop& eventLoop, uint32_t maxMessageLength);
00053 
00054     /**
00055      * Destructor. ServerRPC objects originating from this Server may be kept
00056      * around after this destructor returns; however, they won't actually send
00057      * replies anymore.
00058      */
00059     ~Server();
00060 
00061     /**
00062      * See OpaqueServer::bind().
00063      */
00064     std::string bind(const Address& listenAddress);
00065 
00066     /**
00067      * Register a Service to receive RPCs from clients. If a service has
00068      * already been registered for this service ID, this will replace it. This
00069      * may be called from any thread.
00070      * \param serviceId
00071      *      A unique ID for the service. See Protocol::Common::ServiceId.
00072      * \param service
00073      *      The service to invoke when RPCs arrive with the given serviceId.
00074      *      This service will always be invoked on a thread pool.
00075      * \param maxThreads
00076      *      The maximum number of threads to execute RPCs concurrently inside
00077      *      the service.
00078      */
00079     void registerService(uint16_t serviceId,
00080                          std::shared_ptr<Service> service,
00081                          uint32_t maxThreads);
00082 
00083   private:
00084     /**
00085      * Services RPCs.
00086      */
00087     class RPCHandler : public OpaqueServer::Handler {
00088       public:
00089         explicit RPCHandler(Server& server);
00090         ~RPCHandler();
00091         /**
00092          * This is called by the base class, OpaqueServer::Handler, when an RPC
00093          * arrives.
00094          */
00095         void handleRPC(OpaqueServerRPC opaqueRPC);
00096         Server& server;
00097     };
00098 
00099     /**
00100      * Protects #services from concurrent modification.
00101      */
00102     std::mutex mutex;
00103 
00104     /**
00105      * Maps from service IDs to ThreadDispatchService instances.
00106      * Protected by #mutex.
00107      */
00108     std::unordered_map<uint16_t, std::shared_ptr<Service>> services;
00109 
00110     /**
00111      * Deals with RPCs created by #opaqueServer.
00112      */
00113     RPCHandler rpcHandler;
00114 
00115     /**
00116      * Listens for new RPCs on TCP connections and invokes #rpcHandler with
00117      * them.
00118      */
00119     OpaqueServer opaqueServer;
00120 
00121     // Server is non-copyable.
00122     Server(const Server&) = delete;
00123     Server& operator=(const Server&) = delete;
00124 }; // class Server
00125 
00126 } // namespace LogCabin::RPC
00127 } // namespace LogCabin
00128 
00129 #endif /* LOGCABIN_RPC_SERVER_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines