LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <cinttypes> 00018 #include <memory> 00019 #include <mutex> 00020 #include <unordered_map> 00021 00022 #include "RPC/OpaqueServer.h" 00023 #include "RPC/Service.h" 00024 00025 #ifndef LOGCABIN_RPC_SERVER_H 00026 #define LOGCABIN_RPC_SERVER_H 00027 00028 namespace LogCabin { 00029 namespace RPC { 00030 00031 /** 00032 * A Server listens for incoming RPCs over TCP connections and dispatches these 00033 * to Services. 00034 * Servers can be created from any thread, but they will always run on 00035 * the thread running the Event::Loop. Services will always run on a thread 00036 * pool. 00037 */ 00038 class Server { 00039 public: 00040 /** 00041 * Constructor. This object won't actually do anything until bind() is 00042 * called. 00043 * \param eventLoop 00044 * Event::Loop that will be used to find out when the underlying 00045 * socket may be read from or written to without blocking. 00046 * \param maxMessageLength 00047 * The maximum number of bytes to allow per request/response. This 00048 * exists to limit the amount of buffer space a single RPC can use. 00049 * Attempting to send longer responses will PANIC; attempting to 00050 * receive longer requests will disconnect the underlying socket. 00051 */ 00052 Server(Event::Loop& eventLoop, uint32_t maxMessageLength); 00053 00054 /** 00055 * Destructor. ServerRPC objects originating from this Server may be kept 00056 * around after this destructor returns; however, they won't actually send 00057 * replies anymore. 00058 */ 00059 ~Server(); 00060 00061 /** 00062 * See OpaqueServer::bind(). 00063 */ 00064 std::string bind(const Address& listenAddress); 00065 00066 /** 00067 * Register a Service to receive RPCs from clients. If a service has 00068 * already been registered for this service ID, this will replace it. This 00069 * may be called from any thread. 00070 * \param serviceId 00071 * A unique ID for the service. See Protocol::Common::ServiceId. 00072 * \param service 00073 * The service to invoke when RPCs arrive with the given serviceId. 00074 * This service will always be invoked on a thread pool. 00075 * \param maxThreads 00076 * The maximum number of threads to execute RPCs concurrently inside 00077 * the service. 00078 */ 00079 void registerService(uint16_t serviceId, 00080 std::shared_ptr<Service> service, 00081 uint32_t maxThreads); 00082 00083 private: 00084 /** 00085 * Services RPCs. 00086 */ 00087 class RPCHandler : public OpaqueServer::Handler { 00088 public: 00089 explicit RPCHandler(Server& server); 00090 ~RPCHandler(); 00091 /** 00092 * This is called by the base class, OpaqueServer::Handler, when an RPC 00093 * arrives. 00094 */ 00095 void handleRPC(OpaqueServerRPC opaqueRPC); 00096 Server& server; 00097 }; 00098 00099 /** 00100 * Protects #services from concurrent modification. 00101 */ 00102 std::mutex mutex; 00103 00104 /** 00105 * Maps from service IDs to ThreadDispatchService instances. 00106 * Protected by #mutex. 00107 */ 00108 std::unordered_map<uint16_t, std::shared_ptr<Service>> services; 00109 00110 /** 00111 * Deals with RPCs created by #opaqueServer. 00112 */ 00113 RPCHandler rpcHandler; 00114 00115 /** 00116 * Listens for new RPCs on TCP connections and invokes #rpcHandler with 00117 * them. 00118 */ 00119 OpaqueServer opaqueServer; 00120 00121 // Server is non-copyable. 00122 Server(const Server&) = delete; 00123 Server& operator=(const Server&) = delete; 00124 }; // class Server 00125 00126 } // namespace LogCabin::RPC 00127 } // namespace LogCabin 00128 00129 #endif /* LOGCABIN_RPC_SERVER_H */