LogCabin
RPC/OpaqueServer.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2011-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <errno.h>
00018 #include <string.h>
00019 #include <sys/epoll.h>
00020 #include <sys/socket.h>
00021 #include <sys/types.h>
00022 #include <unistd.h>
00023 
00024 #include "Core/Debug.h"
00025 #include "Event/Loop.h"
00026 #include "Protocol/Common.h"
00027 #include "RPC/Address.h"
00028 #include "RPC/OpaqueServer.h"
00029 #include "RPC/OpaqueServerRPC.h"
00030 
00031 namespace LogCabin {
00032 namespace RPC {
00033 
00034 
00035 ////////// OpaqueServer::MessageSocketHandler //////////
00036 
00037 OpaqueServer::MessageSocketHandler::MessageSocketHandler(OpaqueServer* server)
00038     : server(server)
00039     , self()
00040 {
00041 }
00042 
00043 void
00044 OpaqueServer::MessageSocketHandler::handleReceivedMessage(
00045         MessageId messageId,
00046         Core::Buffer message)
00047 {
00048     if (server == NULL)
00049         return;
00050     switch (messageId) {
00051         case Protocol::Common::PING_MESSAGE_ID: {
00052             std::shared_ptr<SocketWithHandler> socketRef = self.lock();
00053             if (socketRef) { // expect so, since we're receiving messages
00054                 VERBOSE("Responding to ping");
00055                 socketRef->monitor.sendMessage(messageId,
00056                                                Core::Buffer());
00057             }
00058             break;
00059         }
00060         case Protocol::Common::VERSION_MESSAGE_ID: {
00061             std::shared_ptr<SocketWithHandler> socketRef = self.lock();
00062             if (socketRef) { // expect so, since we're receiving messages
00063                 VERBOSE("Responding to version request "
00064                         "(this server supports max version %u)",
00065                         MessageSocket::MAX_VERSION_SUPPORTED);
00066                 using Protocol::Common::VersionMessage::Response;
00067                 Response* response = new Response();
00068                 response->maxVersionSupported =
00069                     htobe16(MessageSocket::MAX_VERSION_SUPPORTED);
00070                 socketRef->monitor.sendMessage(
00071                     messageId,
00072                     Core::Buffer(response, sizeof(*response),
00073                                  Core::Buffer::deleteObjectFn<Response*>));
00074             }
00075             break;
00076         }
00077         default: { // normal RPC request
00078             VERBOSE("Handling RPC");
00079             OpaqueServerRPC rpc(self, messageId, std::move(message));
00080             server->rpcHandler.handleRPC(std::move(rpc));
00081         }
00082     }
00083 }
00084 
00085 void
00086 OpaqueServer::MessageSocketHandler::handleDisconnect()
00087 {
00088     VERBOSE("Disconnected from client");
00089     std::shared_ptr<SocketWithHandler> socketRef = self.lock();
00090     if (server != NULL && socketRef) {
00091         // This drops the reference count on the socket. It may cause the
00092         // SocketWithHandler object (which includes this object) to be
00093         // destroyed when 'socketRef' goes out of scope.
00094         server->sockets.erase(socketRef);
00095         server = NULL;
00096     }
00097 }
00098 
00099 
00100 ////////// OpaqueServer::SocketWithHandler //////////
00101 
00102 std::shared_ptr<OpaqueServer::SocketWithHandler>
00103 OpaqueServer::SocketWithHandler::make(OpaqueServer* server, int fd)
00104 {
00105     std::shared_ptr<SocketWithHandler> socket(
00106         new SocketWithHandler(server, fd));
00107     socket->handler.self = socket;
00108     return socket;
00109 }
00110 
00111 OpaqueServer::SocketWithHandler::SocketWithHandler(
00112         OpaqueServer* server,
00113         int fd)
00114     : handler(server)
00115     , monitor(handler, server->eventLoop, fd, server->maxMessageLength)
00116 {
00117 }
00118 
00119 OpaqueServer::SocketWithHandler::~SocketWithHandler() {
00120     // 'handler' shouldn't have access to 'monitor' while/after 'monitor' is
00121     // destroyed. Clear the reference here, then C++ will destroy 'monitor',
00122     // then 'handler'.
00123     handler.self.reset();
00124 }
00125 
00126 
00127 ////////// OpaqueServer::BoundListener //////////
00128 
00129 OpaqueServer::BoundListener::BoundListener(
00130         OpaqueServer& server,
00131         int fd)
00132     : Event::File(fd)
00133     , server(server)
00134 {
00135 }
00136 
00137 void
00138 OpaqueServer::BoundListener::handleFileEvent(uint32_t events)
00139 {
00140     int clientfd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
00141     if (clientfd < 0) {
00142         PANIC("Could not accept connection on fd %d: %s",
00143               fd, strerror(errno));
00144     }
00145 
00146     server.sockets.insert(SocketWithHandler::make(&server, clientfd));
00147 }
00148 
00149 
00150 ////////// OpaqueServer::BoundListenerWithMonitor //////////
00151 
00152 OpaqueServer::BoundListenerWithMonitor::BoundListenerWithMonitor(
00153         OpaqueServer& server,
00154         int fd)
00155     : handler(server, fd)
00156     , monitor(server.eventLoop, handler, EPOLLIN)
00157 {
00158 }
00159 
00160 OpaqueServer::BoundListenerWithMonitor::~BoundListenerWithMonitor()
00161 {
00162 }
00163 
00164 
00165 ////////// OpaqueServer //////////
00166 
00167 OpaqueServer::OpaqueServer(Handler& handler,
00168                            Event::Loop& eventLoop,
00169                            uint32_t maxMessageLength)
00170     : rpcHandler(handler)
00171     , eventLoop(eventLoop)
00172     , maxMessageLength(maxMessageLength)
00173     , sockets()
00174     , boundListenersMutex()
00175     , boundListeners()
00176 {
00177 }
00178 
00179 OpaqueServer::~OpaqueServer()
00180 {
00181     // Stop accepting new connections.
00182     {
00183         std::lock_guard<Core::Mutex> lock(boundListenersMutex);
00184         boundListeners.clear();
00185     }
00186 
00187     // Stop the socket objects from handling new RPCs and accessing the
00188     // 'sockets' set. They may continue to process existing RPCs, though
00189     // idle sockets will be destroyed here.
00190     {
00191         // Block the event loop to operate on 'sockets' safely.
00192         Event::Loop::Lock lockGuard(eventLoop);
00193         for (auto it = sockets.begin(); it != sockets.end(); ++it) {
00194             std::shared_ptr<SocketWithHandler> socket = *it;
00195             socket->handler.server = NULL;
00196         }
00197         sockets.clear();
00198     }
00199 }
00200 
00201 std::string
00202 OpaqueServer::bind(const Address& listenAddress)
00203 {
00204     using Core::StringUtil::format;
00205 
00206     if (!listenAddress.isValid()) {
00207         return format("Can't listen on invalid address: %s",
00208                       listenAddress.toString().c_str());
00209     }
00210 
00211     int fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0);
00212     if (fd < 0)
00213         PANIC("Could not create new TCP socket");
00214 
00215     int flag = 1;
00216     int r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
00217                        &flag, sizeof(flag));
00218     if (r < 0) {
00219         PANIC("Could not set SO_REUSEADDR on socket: %s",
00220               strerror(errno));
00221     }
00222 
00223 
00224     r = ::bind(fd, listenAddress.getSockAddr(),
00225                    listenAddress.getSockAddrLen());
00226     if (r != 0) {
00227         std::string msg =
00228             format("Could not bind to address %s: %s%s",
00229                    listenAddress.toString().c_str(),
00230                    strerror(errno),
00231                    errno == EINVAL ? " (is the port in use?)" : "");
00232         r = close(fd);
00233         if (r != 0) {
00234             WARNING("Could not close socket that failed to bind: %s",
00235                     strerror(errno));
00236         }
00237         return msg;
00238     }
00239 
00240     // Why 128? No clue. It's what libevent was setting it to.
00241     r = listen(fd, 128);
00242     if (r != 0) {
00243         PANIC("Could not invoke listen() on address %s: %s",
00244               listenAddress.toString().c_str(),
00245               strerror(errno));
00246     }
00247 
00248     std::lock_guard<Core::Mutex> lock(boundListenersMutex);
00249     boundListeners.emplace_back(*this, fd);
00250     return "";
00251 }
00252 
00253 } // namespace LogCabin::RPC
00254 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines