LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include <assert.h> 00017 00018 #include "Core/StringUtil.h" 00019 #include "Core/ThreadId.h" 00020 #include "RPC/ThreadDispatchService.h" 00021 00022 namespace LogCabin { 00023 namespace RPC { 00024 00025 ThreadDispatchService::ThreadDispatchService( 00026 std::shared_ptr<Service> threadSafeService, 00027 uint32_t minThreads, 00028 uint32_t maxThreads) 00029 : threadSafeService(threadSafeService) 00030 , maxThreads(maxThreads) 00031 , mutex() 00032 , threads() 00033 , numFreeWorkers(0) 00034 , conditionVariable() 00035 , exit(false) 00036 , rpcQueue() 00037 { 00038 assert(minThreads <= maxThreads); 00039 assert(0 < maxThreads); 00040 for (uint32_t i = 0; i < minThreads; ++i) 00041 threads.emplace_back(&ThreadDispatchService::workerMain, this); 00042 } 00043 00044 ThreadDispatchService::~ThreadDispatchService() 00045 { 00046 // Signal the threads to exit. 00047 { 00048 std::lock_guard<std::mutex> lockGuard(mutex); 00049 exit = true; 00050 conditionVariable.notify_all(); 00051 } 00052 00053 // Join the threads. 00054 while (!threads.empty()) { 00055 threads.back().join(); 00056 threads.pop_back(); 00057 } 00058 00059 // Close the sessions of any remaining RPCs that didn't get processed. 00060 while (!rpcQueue.empty()) { 00061 rpcQueue.front().closeSession(); 00062 rpcQueue.pop(); 00063 } 00064 } 00065 00066 void 00067 ThreadDispatchService::handleRPC(ServerRPC serverRPC) 00068 { 00069 std::lock_guard<std::mutex> lockGuard(mutex); 00070 assert(!exit); 00071 rpcQueue.push(std::move(serverRPC)); 00072 if (numFreeWorkers == 0 && threads.size() < maxThreads) 00073 threads.emplace_back(&ThreadDispatchService::workerMain, this); 00074 conditionVariable.notify_one(); 00075 } 00076 00077 std::string 00078 ThreadDispatchService::getName() const 00079 { 00080 return threadSafeService->getName(); 00081 } 00082 00083 void 00084 ThreadDispatchService::workerMain() 00085 { 00086 Core::ThreadId::setName( 00087 Core::StringUtil::format("%s(%lu)", 00088 threadSafeService->getName().c_str(), 00089 Core::ThreadId::getId())); 00090 while (true) { 00091 ServerRPC rpc; 00092 { // find an RPC to process 00093 std::unique_lock<std::mutex> lockGuard(mutex); 00094 ++numFreeWorkers; 00095 while (!exit && rpcQueue.empty()) 00096 conditionVariable.wait(lockGuard); 00097 --numFreeWorkers; 00098 if (exit) 00099 return; 00100 rpc = std::move(rpcQueue.front()); 00101 rpcQueue.pop(); 00102 } 00103 // execute RPC handler 00104 threadSafeService->handleRPC(std::move(rpc)); 00105 } 00106 } 00107 00108 } // namespace LogCabin::RPC 00109 } // namespace LogCabin