LogCabin
RPC/ThreadDispatchService.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  *
00003  * Permission to use, copy, modify, and distribute this software for any
00004  * purpose with or without fee is hereby granted, provided that the above
00005  * copyright notice and this permission notice appear in all copies.
00006  *
00007  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00008  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00009  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00010  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00011  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00012  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00013  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00014  */
00015 
00016 #include <assert.h>
00017 
00018 #include "Core/StringUtil.h"
00019 #include "Core/ThreadId.h"
00020 #include "RPC/ThreadDispatchService.h"
00021 
00022 namespace LogCabin {
00023 namespace RPC {
00024 
00025 ThreadDispatchService::ThreadDispatchService(
00026         std::shared_ptr<Service> threadSafeService,
00027         uint32_t minThreads,
00028         uint32_t maxThreads)
00029     : threadSafeService(threadSafeService)
00030     , maxThreads(maxThreads)
00031     , mutex()
00032     , threads()
00033     , numFreeWorkers(0)
00034     , conditionVariable()
00035     , exit(false)
00036     , rpcQueue()
00037 {
00038     assert(minThreads <= maxThreads);
00039     assert(0 < maxThreads);
00040     for (uint32_t i = 0; i < minThreads; ++i)
00041         threads.emplace_back(&ThreadDispatchService::workerMain, this);
00042 }
00043 
00044 ThreadDispatchService::~ThreadDispatchService()
00045 {
00046     // Signal the threads to exit.
00047     {
00048         std::lock_guard<std::mutex> lockGuard(mutex);
00049         exit = true;
00050         conditionVariable.notify_all();
00051     }
00052 
00053     // Join the threads.
00054     while (!threads.empty()) {
00055         threads.back().join();
00056         threads.pop_back();
00057     }
00058 
00059     // Close the sessions of any remaining RPCs that didn't get processed.
00060     while (!rpcQueue.empty()) {
00061         rpcQueue.front().closeSession();
00062         rpcQueue.pop();
00063     }
00064 }
00065 
00066 void
00067 ThreadDispatchService::handleRPC(ServerRPC serverRPC)
00068 {
00069     std::lock_guard<std::mutex> lockGuard(mutex);
00070     assert(!exit);
00071     rpcQueue.push(std::move(serverRPC));
00072     if (numFreeWorkers == 0 && threads.size() < maxThreads)
00073         threads.emplace_back(&ThreadDispatchService::workerMain, this);
00074     conditionVariable.notify_one();
00075 }
00076 
00077 std::string
00078 ThreadDispatchService::getName() const
00079 {
00080     return threadSafeService->getName();
00081 }
00082 
00083 void
00084 ThreadDispatchService::workerMain()
00085 {
00086     Core::ThreadId::setName(
00087         Core::StringUtil::format("%s(%lu)",
00088                                  threadSafeService->getName().c_str(),
00089                                  Core::ThreadId::getId()));
00090     while (true) {
00091         ServerRPC rpc;
00092         { // find an RPC to process
00093             std::unique_lock<std::mutex> lockGuard(mutex);
00094             ++numFreeWorkers;
00095             while (!exit && rpcQueue.empty())
00096                 conditionVariable.wait(lockGuard);
00097             --numFreeWorkers;
00098             if (exit)
00099                 return;
00100             rpc = std::move(rpcQueue.front());
00101             rpcQueue.pop();
00102         }
00103         // execute RPC handler
00104         threadSafeService->handleRPC(std::move(rpc));
00105     }
00106 }
00107 
00108 } // namespace LogCabin::RPC
00109 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines