LogCabin
RPC/ThreadDispatchService.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012 Stanford University
00002  *
00003  * Permission to use, copy, modify, and distribute this software for any
00004  * purpose with or without fee is hereby granted, provided that the above
00005  * copyright notice and this permission notice appear in all copies.
00006  *
00007  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00008  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00009  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00010  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00011  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00012  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00013  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00014  */
00015 
00016 #include <cinttypes>
00017 #include <mutex>
00018 #include <queue>
00019 #include <thread>
00020 #include <vector>
00021 
00022 #include "Core/ConditionVariable.h"
00023 #include "RPC/ServerRPC.h"
00024 #include "RPC/Service.h"
00025 
00026 #ifndef LOGCABIN_RPC_THREADDISPATCHSERVICE_H
00027 #define LOGCABIN_RPC_THREADDISPATCHSERVICE_H
00028 
00029 namespace LogCabin {
00030 namespace RPC {
00031 
00032 /**
00033  * This class is an adaptor to enable multi-threaded services.
00034  * This Service is intended to plug into a Server and run directly on the
00035  * Event::Loop thread. You provide it with another Service on the constructor,
00036  * and the job of this class is to manage a thread pool on which to call
00037  * your Service's handleRPC() method.
00038  */
00039 class ThreadDispatchService : public Service {
00040   public:
00041     /**
00042      * Constructor.
00043      * \param threadSafeService
00044      *      The underlying service that will handle RPCs inside of worker
00045      *      threads spawned by this class.
00046      * \param minThreads
00047      *      The number of threads with which to start the thread pool.
00048      *      These will be created in the constructor.
00049      * \param maxThreads
00050      *      The maximum number of threads this class is allowed to use for its
00051      *      thread pool. The thread pool dynamically grows as needed up until
00052      *      this limit. This should be set to at least 'minThreads' and more
00053      *      than 0.
00054      */
00055     ThreadDispatchService(std::shared_ptr<Service> threadSafeService,
00056                           uint32_t minThreads, uint32_t maxThreads);
00057 
00058     /**
00059      * Destructor. This will attempt to join all threads and will close
00060      * sessions on RPCs that have not been serviced.
00061      */
00062     ~ThreadDispatchService();
00063 
00064     void handleRPC(ServerRPC serverRPC);
00065     std::string getName() const;
00066 
00067   private:
00068     /**
00069      * The main loop executed in workers.
00070      */
00071     void workerMain();
00072 
00073     /**
00074      * The service that will handle RPCs inside of worker thread spawned by
00075      * this class.
00076      */
00077     std::shared_ptr<Service> threadSafeService;
00078 
00079     /**
00080      * The maximum number of threads this class is allowed to use for its
00081      * thread pool.
00082      */
00083     const uint32_t maxThreads;
00084 
00085     /**
00086      * This mutex protects all of the members of this class defined below this
00087      * point.
00088      */
00089     std::mutex mutex;
00090 
00091     /**
00092      * The thread pool of workers that process RPCs.
00093      */
00094     std::vector<std::thread> threads;
00095 
00096     /**
00097      * The number of workers that are waiting for work (on the condition
00098      * variable). This is used to dynamically launch new workers when
00099      * necessary.
00100      */
00101     uint32_t numFreeWorkers;
00102 
00103     /**
00104      * Notifies workers that there are available RPCs to process or #exit has
00105      * been set. To wait on this, one needs to hold #mutex.
00106      */
00107     Core::ConditionVariable conditionVariable;
00108 
00109     /**
00110      * A flag to tell workers that they should exit.
00111      */
00112     bool exit;
00113 
00114     /**
00115      * The queue of work that worker threads pull from.
00116      */
00117     std::queue<ServerRPC> rpcQueue;
00118 
00119     // ThreadDispatchService is non-copyable.
00120     ThreadDispatchService(const ThreadDispatchService&) = delete;
00121     ThreadDispatchService& operator=(const ThreadDispatchService&) = delete;
00122 }; // class ThreadDispatchService
00123 
00124 } // namespace LogCabin::RPC
00125 } // namespace LogCabin
00126 
00127 #endif /* LOGCABIN_RPC_THREADDISPATCHSERVICE_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines