LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include "RPC/OpaqueServerRPC.h" 00017 00018 namespace LogCabin { 00019 namespace RPC { 00020 00021 OpaqueServerRPC::OpaqueServerRPC() 00022 : request() 00023 , response() 00024 , socket() 00025 , messageId(~0UL) 00026 , responseTarget(NULL) 00027 { 00028 } 00029 00030 OpaqueServerRPC::OpaqueServerRPC( 00031 std::weak_ptr<OpaqueServer::SocketWithHandler> socket, 00032 MessageSocket::MessageId messageId, 00033 Core::Buffer request) 00034 : request(std::move(request)) 00035 , response() 00036 , socket(socket) 00037 , messageId(messageId) 00038 , responseTarget(NULL) 00039 { 00040 } 00041 00042 OpaqueServerRPC::OpaqueServerRPC(OpaqueServerRPC&& other) 00043 : request(std::move(other.request)) 00044 , response(std::move(other.response)) 00045 , socket(std::move(other.socket)) 00046 , messageId(std::move(other.messageId)) 00047 , responseTarget(std::move(other.responseTarget)) 00048 { 00049 } 00050 00051 OpaqueServerRPC::~OpaqueServerRPC() 00052 { 00053 } 00054 00055 OpaqueServerRPC& 00056 OpaqueServerRPC::operator=(OpaqueServerRPC&& other) 00057 { 00058 request = std::move(other.request); 00059 response = std::move(other.response); 00060 socket = std::move(other.socket); 00061 messageId = std::move(other.messageId); 00062 responseTarget = std::move(other.responseTarget); 00063 return *this; 00064 } 00065 00066 void 00067 OpaqueServerRPC::closeSession() 00068 { 00069 std::shared_ptr<OpaqueServer::SocketWithHandler> socketRef = socket.lock(); 00070 if (socketRef) 00071 socketRef->monitor.close(); 00072 socket.reset(); 00073 responseTarget = NULL; 00074 } 00075 00076 void 00077 OpaqueServerRPC::sendReply() 00078 { 00079 std::shared_ptr<OpaqueServer::SocketWithHandler> socketRef = socket.lock(); 00080 if (socketRef) { 00081 socketRef->monitor.sendMessage(messageId, std::move(response)); 00082 } else { 00083 // During normal operation, this indicates that either the socket has 00084 // been disconnected or the reply has already been sent. 00085 00086 // For unit testing only, we can store replies from mock RPCs 00087 // that have no sessions. 00088 if (responseTarget != NULL) { 00089 *responseTarget = std::move(response); 00090 responseTarget = NULL; 00091 } 00092 00093 // Drop the reply on the floor. 00094 response.reset(); 00095 } 00096 // Prevent the server from replying again. 00097 socket.reset(); 00098 } 00099 00100 } // namespace LogCabin::RPC 00101 } // namespace LogCabin