LogCabin
|
00001 /* Copyright (c) 2012 Stanford University 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include <cinttypes> 00017 #include <mutex> 00018 #include <queue> 00019 #include <thread> 00020 #include <vector> 00021 00022 #include "Core/ConditionVariable.h" 00023 #include "RPC/ServerRPC.h" 00024 #include "RPC/Service.h" 00025 00026 #ifndef LOGCABIN_RPC_THREADDISPATCHSERVICE_H 00027 #define LOGCABIN_RPC_THREADDISPATCHSERVICE_H 00028 00029 namespace LogCabin { 00030 namespace RPC { 00031 00032 /** 00033 * This class is an adaptor to enable multi-threaded services. 00034 * This Service is intended to plug into a Server and run directly on the 00035 * Event::Loop thread. You provide it with another Service on the constructor, 00036 * and the job of this class is to manage a thread pool on which to call 00037 * your Service's handleRPC() method. 00038 */ 00039 class ThreadDispatchService : public Service { 00040 public: 00041 /** 00042 * Constructor. 00043 * \param threadSafeService 00044 * The underlying service that will handle RPCs inside of worker 00045 * threads spawned by this class. 00046 * \param minThreads 00047 * The number of threads with which to start the thread pool. 00048 * These will be created in the constructor. 00049 * \param maxThreads 00050 * The maximum number of threads this class is allowed to use for its 00051 * thread pool. The thread pool dynamically grows as needed up until 00052 * this limit. This should be set to at least 'minThreads' and more 00053 * than 0. 00054 */ 00055 ThreadDispatchService(std::shared_ptr<Service> threadSafeService, 00056 uint32_t minThreads, uint32_t maxThreads); 00057 00058 /** 00059 * Destructor. This will attempt to join all threads and will close 00060 * sessions on RPCs that have not been serviced. 00061 */ 00062 ~ThreadDispatchService(); 00063 00064 void handleRPC(ServerRPC serverRPC); 00065 std::string getName() const; 00066 00067 private: 00068 /** 00069 * The main loop executed in workers. 00070 */ 00071 void workerMain(); 00072 00073 /** 00074 * The service that will handle RPCs inside of worker thread spawned by 00075 * this class. 00076 */ 00077 std::shared_ptr<Service> threadSafeService; 00078 00079 /** 00080 * The maximum number of threads this class is allowed to use for its 00081 * thread pool. 00082 */ 00083 const uint32_t maxThreads; 00084 00085 /** 00086 * This mutex protects all of the members of this class defined below this 00087 * point. 00088 */ 00089 std::mutex mutex; 00090 00091 /** 00092 * The thread pool of workers that process RPCs. 00093 */ 00094 std::vector<std::thread> threads; 00095 00096 /** 00097 * The number of workers that are waiting for work (on the condition 00098 * variable). This is used to dynamically launch new workers when 00099 * necessary. 00100 */ 00101 uint32_t numFreeWorkers; 00102 00103 /** 00104 * Notifies workers that there are available RPCs to process or #exit has 00105 * been set. To wait on this, one needs to hold #mutex. 00106 */ 00107 Core::ConditionVariable conditionVariable; 00108 00109 /** 00110 * A flag to tell workers that they should exit. 00111 */ 00112 bool exit; 00113 00114 /** 00115 * The queue of work that worker threads pull from. 00116 */ 00117 std::queue<ServerRPC> rpcQueue; 00118 00119 // ThreadDispatchService is non-copyable. 00120 ThreadDispatchService(const ThreadDispatchService&) = delete; 00121 ThreadDispatchService& operator=(const ThreadDispatchService&) = delete; 00122 }; // class ThreadDispatchService 00123 00124 } // namespace LogCabin::RPC 00125 } // namespace LogCabin 00126 00127 #endif /* LOGCABIN_RPC_THREADDISPATCHSERVICE_H */