LogCabin
RPC/Server.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  *
00003  * Permission to use, copy, modify, and distribute this software for any
00004  * purpose with or without fee is hereby granted, provided that the above
00005  * copyright notice and this permission notice appear in all copies.
00006  *
00007  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00008  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00009  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00010  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00011  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00012  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00013  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00014  */
00015 
00016 #include "RPC/OpaqueServerRPC.h"
00017 #include "RPC/Server.h"
00018 #include "RPC/ServerRPC.h"
00019 #include "RPC/ThreadDispatchService.h"
00020 
00021 namespace LogCabin {
00022 namespace RPC {
00023 
00024 
00025 ////////// Server::RPCHandler //////////
00026 
00027 Server::RPCHandler::RPCHandler(Server& server)
00028     : server(server)
00029 {
00030 }
00031 
00032 Server::RPCHandler::~RPCHandler()
00033 {
00034 }
00035 
00036 void
00037 Server::RPCHandler::handleRPC(OpaqueServerRPC opaqueRPC)
00038 {
00039     ServerRPC rpc(std::move(opaqueRPC));
00040     if (!rpc.needsReply()) {
00041         // The RPC may have had an invalid header, in which case it needs no
00042         // further action.
00043         return;
00044     }
00045     std::shared_ptr<Service> service;
00046     {
00047         std::lock_guard<std::mutex> lockGuard(server.mutex);
00048         auto it = server.services.find(rpc.getService());
00049         if (it != server.services.end())
00050             service = it->second;
00051     }
00052     if (service)
00053         service->handleRPC(std::move(rpc));
00054     else
00055         rpc.rejectInvalidService();
00056 }
00057 
00058 ////////// Server //////////
00059 
00060 Server::Server(Event::Loop& eventLoop, uint32_t maxMessageLength)
00061     : mutex()
00062     , services()
00063     , rpcHandler(*this)
00064     , opaqueServer(rpcHandler, eventLoop, maxMessageLength)
00065 {
00066 }
00067 
00068 Server::~Server()
00069 {
00070 }
00071 
00072 std::string
00073 Server::bind(const Address& listenAddress)
00074 {
00075     return opaqueServer.bind(listenAddress);
00076 }
00077 
00078 void
00079 Server::registerService(uint16_t serviceId,
00080                         std::shared_ptr<Service> service,
00081                         uint32_t maxThreads)
00082 {
00083     std::lock_guard<std::mutex> lockGuard(mutex);
00084     services[serviceId] =
00085         std::make_shared<ThreadDispatchService>(service, 0, maxThreads);
00086 }
00087 
00088 } // namespace LogCabin::RPC
00089 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines