LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include "RPC/OpaqueServerRPC.h" 00017 #include "RPC/Server.h" 00018 #include "RPC/ServerRPC.h" 00019 #include "RPC/ThreadDispatchService.h" 00020 00021 namespace LogCabin { 00022 namespace RPC { 00023 00024 00025 ////////// Server::RPCHandler ////////// 00026 00027 Server::RPCHandler::RPCHandler(Server& server) 00028 : server(server) 00029 { 00030 } 00031 00032 Server::RPCHandler::~RPCHandler() 00033 { 00034 } 00035 00036 void 00037 Server::RPCHandler::handleRPC(OpaqueServerRPC opaqueRPC) 00038 { 00039 ServerRPC rpc(std::move(opaqueRPC)); 00040 if (!rpc.needsReply()) { 00041 // The RPC may have had an invalid header, in which case it needs no 00042 // further action. 00043 return; 00044 } 00045 std::shared_ptr<Service> service; 00046 { 00047 std::lock_guard<std::mutex> lockGuard(server.mutex); 00048 auto it = server.services.find(rpc.getService()); 00049 if (it != server.services.end()) 00050 service = it->second; 00051 } 00052 if (service) 00053 service->handleRPC(std::move(rpc)); 00054 else 00055 rpc.rejectInvalidService(); 00056 } 00057 00058 ////////// Server ////////// 00059 00060 Server::Server(Event::Loop& eventLoop, uint32_t maxMessageLength) 00061 : mutex() 00062 , services() 00063 , rpcHandler(*this) 00064 , opaqueServer(rpcHandler, eventLoop, maxMessageLength) 00065 { 00066 } 00067 00068 Server::~Server() 00069 { 00070 } 00071 00072 std::string 00073 Server::bind(const Address& listenAddress) 00074 { 00075 return opaqueServer.bind(listenAddress); 00076 } 00077 00078 void 00079 Server::registerService(uint16_t serviceId, 00080 std::shared_ptr<Service> service, 00081 uint32_t maxThreads) 00082 { 00083 std::lock_guard<std::mutex> lockGuard(mutex); 00084 services[serviceId] = 00085 std::make_shared<ThreadDispatchService>(service, 0, maxThreads); 00086 } 00087 00088 } // namespace LogCabin::RPC 00089 } // namespace LogCabin