LogCabin
|
00001 /* Copyright (c) 2011-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <errno.h> 00018 #include <string.h> 00019 #include <sys/epoll.h> 00020 #include <sys/socket.h> 00021 #include <sys/types.h> 00022 #include <unistd.h> 00023 00024 #include "Core/Debug.h" 00025 #include "Event/Loop.h" 00026 #include "Protocol/Common.h" 00027 #include "RPC/Address.h" 00028 #include "RPC/OpaqueServer.h" 00029 #include "RPC/OpaqueServerRPC.h" 00030 00031 namespace LogCabin { 00032 namespace RPC { 00033 00034 00035 ////////// OpaqueServer::MessageSocketHandler ////////// 00036 00037 OpaqueServer::MessageSocketHandler::MessageSocketHandler(OpaqueServer* server) 00038 : server(server) 00039 , self() 00040 { 00041 } 00042 00043 void 00044 OpaqueServer::MessageSocketHandler::handleReceivedMessage( 00045 MessageId messageId, 00046 Core::Buffer message) 00047 { 00048 if (server == NULL) 00049 return; 00050 switch (messageId) { 00051 case Protocol::Common::PING_MESSAGE_ID: { 00052 std::shared_ptr<SocketWithHandler> socketRef = self.lock(); 00053 if (socketRef) { // expect so, since we're receiving messages 00054 VERBOSE("Responding to ping"); 00055 socketRef->monitor.sendMessage(messageId, 00056 Core::Buffer()); 00057 } 00058 break; 00059 } 00060 case Protocol::Common::VERSION_MESSAGE_ID: { 00061 std::shared_ptr<SocketWithHandler> socketRef = self.lock(); 00062 if (socketRef) { // expect so, since we're receiving messages 00063 VERBOSE("Responding to version request " 00064 "(this server supports max version %u)", 00065 MessageSocket::MAX_VERSION_SUPPORTED); 00066 using Protocol::Common::VersionMessage::Response; 00067 Response* response = new Response(); 00068 response->maxVersionSupported = 00069 htobe16(MessageSocket::MAX_VERSION_SUPPORTED); 00070 socketRef->monitor.sendMessage( 00071 messageId, 00072 Core::Buffer(response, sizeof(*response), 00073 Core::Buffer::deleteObjectFn<Response*>)); 00074 } 00075 break; 00076 } 00077 default: { // normal RPC request 00078 VERBOSE("Handling RPC"); 00079 OpaqueServerRPC rpc(self, messageId, std::move(message)); 00080 server->rpcHandler.handleRPC(std::move(rpc)); 00081 } 00082 } 00083 } 00084 00085 void 00086 OpaqueServer::MessageSocketHandler::handleDisconnect() 00087 { 00088 VERBOSE("Disconnected from client"); 00089 std::shared_ptr<SocketWithHandler> socketRef = self.lock(); 00090 if (server != NULL && socketRef) { 00091 // This drops the reference count on the socket. It may cause the 00092 // SocketWithHandler object (which includes this object) to be 00093 // destroyed when 'socketRef' goes out of scope. 00094 server->sockets.erase(socketRef); 00095 server = NULL; 00096 } 00097 } 00098 00099 00100 ////////// OpaqueServer::SocketWithHandler ////////// 00101 00102 std::shared_ptr<OpaqueServer::SocketWithHandler> 00103 OpaqueServer::SocketWithHandler::make(OpaqueServer* server, int fd) 00104 { 00105 std::shared_ptr<SocketWithHandler> socket( 00106 new SocketWithHandler(server, fd)); 00107 socket->handler.self = socket; 00108 return socket; 00109 } 00110 00111 OpaqueServer::SocketWithHandler::SocketWithHandler( 00112 OpaqueServer* server, 00113 int fd) 00114 : handler(server) 00115 , monitor(handler, server->eventLoop, fd, server->maxMessageLength) 00116 { 00117 } 00118 00119 OpaqueServer::SocketWithHandler::~SocketWithHandler() { 00120 // 'handler' shouldn't have access to 'monitor' while/after 'monitor' is 00121 // destroyed. Clear the reference here, then C++ will destroy 'monitor', 00122 // then 'handler'. 00123 handler.self.reset(); 00124 } 00125 00126 00127 ////////// OpaqueServer::BoundListener ////////// 00128 00129 OpaqueServer::BoundListener::BoundListener( 00130 OpaqueServer& server, 00131 int fd) 00132 : Event::File(fd) 00133 , server(server) 00134 { 00135 } 00136 00137 void 00138 OpaqueServer::BoundListener::handleFileEvent(uint32_t events) 00139 { 00140 int clientfd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); 00141 if (clientfd < 0) { 00142 PANIC("Could not accept connection on fd %d: %s", 00143 fd, strerror(errno)); 00144 } 00145 00146 server.sockets.insert(SocketWithHandler::make(&server, clientfd)); 00147 } 00148 00149 00150 ////////// OpaqueServer::BoundListenerWithMonitor ////////// 00151 00152 OpaqueServer::BoundListenerWithMonitor::BoundListenerWithMonitor( 00153 OpaqueServer& server, 00154 int fd) 00155 : handler(server, fd) 00156 , monitor(server.eventLoop, handler, EPOLLIN) 00157 { 00158 } 00159 00160 OpaqueServer::BoundListenerWithMonitor::~BoundListenerWithMonitor() 00161 { 00162 } 00163 00164 00165 ////////// OpaqueServer ////////// 00166 00167 OpaqueServer::OpaqueServer(Handler& handler, 00168 Event::Loop& eventLoop, 00169 uint32_t maxMessageLength) 00170 : rpcHandler(handler) 00171 , eventLoop(eventLoop) 00172 , maxMessageLength(maxMessageLength) 00173 , sockets() 00174 , boundListenersMutex() 00175 , boundListeners() 00176 { 00177 } 00178 00179 OpaqueServer::~OpaqueServer() 00180 { 00181 // Stop accepting new connections. 00182 { 00183 std::lock_guard<Core::Mutex> lock(boundListenersMutex); 00184 boundListeners.clear(); 00185 } 00186 00187 // Stop the socket objects from handling new RPCs and accessing the 00188 // 'sockets' set. They may continue to process existing RPCs, though 00189 // idle sockets will be destroyed here. 00190 { 00191 // Block the event loop to operate on 'sockets' safely. 00192 Event::Loop::Lock lockGuard(eventLoop); 00193 for (auto it = sockets.begin(); it != sockets.end(); ++it) { 00194 std::shared_ptr<SocketWithHandler> socket = *it; 00195 socket->handler.server = NULL; 00196 } 00197 sockets.clear(); 00198 } 00199 } 00200 00201 std::string 00202 OpaqueServer::bind(const Address& listenAddress) 00203 { 00204 using Core::StringUtil::format; 00205 00206 if (!listenAddress.isValid()) { 00207 return format("Can't listen on invalid address: %s", 00208 listenAddress.toString().c_str()); 00209 } 00210 00211 int fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0); 00212 if (fd < 0) 00213 PANIC("Could not create new TCP socket"); 00214 00215 int flag = 1; 00216 int r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 00217 &flag, sizeof(flag)); 00218 if (r < 0) { 00219 PANIC("Could not set SO_REUSEADDR on socket: %s", 00220 strerror(errno)); 00221 } 00222 00223 00224 r = ::bind(fd, listenAddress.getSockAddr(), 00225 listenAddress.getSockAddrLen()); 00226 if (r != 0) { 00227 std::string msg = 00228 format("Could not bind to address %s: %s%s", 00229 listenAddress.toString().c_str(), 00230 strerror(errno), 00231 errno == EINVAL ? " (is the port in use?)" : ""); 00232 r = close(fd); 00233 if (r != 0) { 00234 WARNING("Could not close socket that failed to bind: %s", 00235 strerror(errno)); 00236 } 00237 return msg; 00238 } 00239 00240 // Why 128? No clue. It's what libevent was setting it to. 00241 r = listen(fd, 128); 00242 if (r != 0) { 00243 PANIC("Could not invoke listen() on address %s: %s", 00244 listenAddress.toString().c_str(), 00245 strerror(errno)); 00246 } 00247 00248 std::lock_guard<Core::Mutex> lock(boundListenersMutex); 00249 boundListeners.emplace_back(*this, fd); 00250 return ""; 00251 } 00252 00253 } // namespace LogCabin::RPC 00254 } // namespace LogCabin