LogCabin
Client/ClientImpl.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2014-2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <memory>
00018 #include <set>
00019 #include <string>
00020 #include <thread>
00021 
00022 #include "build/Protocol/ServerControl.pb.h"
00023 #include "include/LogCabin/Client.h"
00024 #include "Client/Backoff.h"
00025 #include "Client/LeaderRPC.h"
00026 #include "Client/SessionManager.h"
00027 #include "Core/ConditionVariable.h"
00028 #include "Core/Config.h"
00029 #include "Core/Mutex.h"
00030 #include "Core/Time.h"
00031 #include "Event/Loop.h"
00032 
00033 #ifndef LOGCABIN_CLIENT_CLIENTIMPL_H
00034 #define LOGCABIN_CLIENT_CLIENTIMPL_H
00035 
00036 namespace LogCabin {
00037 namespace Client {
00038 
00039 /**
00040  * A predicate on tree operations.
00041  * First component: the absolute path corresponding to the 'path'
00042  * argument of setCondition(), or empty if no condition is set.
00043  * Second component: the file contents given as the 'value' argument of
00044  * setCondition().
00045  */
00046 typedef std::pair<std::string, std::string> Condition;
00047 
00048 /**
00049  * The implementation of the client library.
00050  * This is wrapped by Client::Cluster and Client::Log for usability.
00051  */
00052 class ClientImpl {
00053   public:
00054     /// Clock used for timeouts.
00055     typedef LeaderRPC::Clock Clock;
00056     /// Type for absolute time values used for timeouts.
00057     typedef LeaderRPC::TimePoint TimePoint;
00058 
00059     /**
00060      * Return the absolute time when the calling operation should timeout.
00061      * \param relTimeoutNanos
00062      *      The number of nanoseconds from now to time out, or 0 for no
00063      *      timeout.
00064      */
00065     static TimePoint absTimeout(uint64_t relTimeoutNanos);
00066 
00067     /// Constructor.
00068     explicit ClientImpl(const std::map<std::string, std::string>& options =
00069                             std::map<std::string, std::string>());
00070     /// Destructor.
00071     virtual ~ClientImpl();
00072 
00073     /**
00074      * Initialize this object. This must be called directly after the
00075      * constructor.
00076      * \param hosts
00077      *      A string describing the hosts in the cluster. This should be of the
00078      *      form host:port, where host is usually a DNS name that resolves to
00079      *      multiple IP addresses, or a comma-delimited list.
00080      */
00081     void init(const std::string& hosts);
00082 
00083     /**
00084      * Called by init() to do any necessary initialization of the derived
00085      * class.
00086      */
00087     virtual void initDerived();
00088 
00089     GetConfigurationResult getConfiguration(TimePoint timeout);
00090     ConfigurationResult setConfiguration(
00091                             uint64_t oldId,
00092                             const Configuration& newConfiguration,
00093                             TimePoint timeout);
00094 
00095     /// See Cluster::getServerInfo.
00096     Result getServerInfo(const std::string& host,
00097                          TimePoint timeout,
00098                          Server& info);
00099 
00100     /**
00101      * Return the canonicalized path name resulting from accessing path
00102      * relative to workingDirectory.
00103      * \return
00104      *      Status and error message. Possible errors are:
00105      *       - INVALID_ARGUMENT if path is relative and workingDirectory is not
00106      *         an absolute path.
00107      *       - INVALID_ARGUMENT if path attempts to access above the root
00108      *         directory.
00109      */
00110     Result canonicalize(const std::string& path,
00111                         const std::string& workingDirectory,
00112                         std::string& canonical);
00113 
00114     /// See Tree::makeDirectory.
00115     Result makeDirectory(const std::string& path,
00116                          const std::string& workingDirectory,
00117                          const Condition& condition,
00118                          TimePoint timeout);
00119 
00120     /// See Tree::listDirectory.
00121     Result listDirectory(const std::string& path,
00122                          const std::string& workingDirectory,
00123                          const Condition& condition,
00124                          TimePoint timeout,
00125                          std::vector<std::string>& children);
00126 
00127     /// See Tree::removeDirectory.
00128     Result removeDirectory(const std::string& path,
00129                            const std::string& workingDirectory,
00130                            const Condition& condition,
00131                            TimePoint timeout);
00132 
00133     /// See Tree::write.
00134     Result write(const std::string& path,
00135                  const std::string& workingDirectory,
00136                  const std::string& contents,
00137                  const Condition& condition,
00138                  TimePoint timeout);
00139 
00140     /// See Tree::read.
00141     Result read(const std::string& path,
00142                 const std::string& workingDirectory,
00143                 const Condition& condition,
00144                 TimePoint timeout,
00145                 std::string& contents);
00146 
00147     /// See Tree::removeFile.
00148     Result removeFile(const std::string& path,
00149                       const std::string& workingDirectory,
00150                       const Condition& condition,
00151                       TimePoint timeout);
00152 
00153     /**
00154      * Low-level interface to ServerControl service used by
00155      * Client/ServerControl.cc.
00156      */
00157     Result serverControl(const std::string& host,
00158                          TimePoint timeout,
00159                          Protocol::ServerControl::OpCode opCode,
00160                          const google::protobuf::Message& request,
00161                          google::protobuf::Message& response);
00162 
00163   protected:
00164 
00165     /**
00166      * Options/settings.
00167      */
00168     const Core::Config config;
00169 
00170     /**
00171      * The Event::Loop used to drive the underlying RPC mechanism.
00172      */
00173     Event::Loop eventLoop;
00174 
00175     /**
00176      * A unique ID for the cluster that this client may connect to. This is
00177      * initialized to a value from the options map passed to the
00178      * Client::Cluster constructor. If it's not set then, it may be set later
00179      * as a result of learning a UUID from some server.
00180      */
00181     SessionManager::ClusterUUID clusterUUID;
00182 
00183     /**
00184      * Used to create new sessions.
00185      */
00186     SessionManager sessionManager;
00187 
00188     /**
00189      * Used to rate-limit the creation of ClientSession objects (TCP
00190      * connections).
00191      */
00192     Backoff sessionCreationBackoff;
00193 
00194     /**
00195      * Describes the hosts in the cluster.
00196      */
00197     std::string hosts;
00198 
00199     /**
00200      * Used to send RPCs to the leader of the LogCabin cluster.
00201      */
00202     std::unique_ptr<LeaderRPCBase> leaderRPC;
00203 
00204     /**
00205      * This class helps with providing exactly-once semantics for read-write
00206      * RPCs. For example, it assigns sequence numbers to RPCs, which servers
00207      * then use to prevent duplicate processing of duplicate requests.
00208      *
00209      * This class is implemented in a monitor style.
00210      */
00211     class ExactlyOnceRPCHelper {
00212       public:
00213         /**
00214          * Constructor.
00215          * \param client
00216          *     Used to open a session with the cluster. Should not be NULL.
00217          */
00218         explicit ExactlyOnceRPCHelper(ClientImpl* client);
00219         /**
00220          * Destructor.
00221          */
00222         ~ExactlyOnceRPCHelper();
00223         /**
00224          * Prepare to shut down (join with thread).
00225          */
00226         void exit();
00227         /**
00228          * Call this before sending an RPC.
00229          * \param timeout
00230          *      If this timeout elapses before a session can be opened with the
00231          *      cluster, this method will return early and the returned
00232          *      information will have a client_id set to 0, which is not a
00233          *      valid ID.
00234          * \return
00235          *      Info to be used with read-write RPCs, or if the timeout
00236          *      elapsed, a client_id set to 0.
00237          */
00238         Protocol::Client::ExactlyOnceRPCInfo getRPCInfo(TimePoint timeout);
00239         /**
00240          * Call this after receiving an RPCs response.
00241          */
00242         void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&);
00243 
00244       private:
00245 
00246         /**
00247          * Internal version of getRPCInfo() to avoid deadlock with self.
00248          */
00249         Protocol::Client::ExactlyOnceRPCInfo getRPCInfo(
00250             Core::HoldingMutex holdingMutex,
00251             TimePoint timeout);
00252         /**
00253          * Internal version of doneWithRPC() to avoid deadlock with self.
00254          */
00255         void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&,
00256                          Core::HoldingMutex holdingMutex);
00257         /**
00258          * Main function for keep-alive thread. Periodically makes
00259          * requests to the cluster to keep the client's session active.
00260          */
00261         void keepAliveThreadMain();
00262 
00263         /**
00264          * Used to open a session with the cluster.
00265          * const and non-NULL except for unit tests.
00266          */
00267         ClientImpl* client;
00268         /**
00269          * Protects all the members of this class.
00270          */
00271         mutable Core::Mutex mutex;
00272         /**
00273          * The numbers of the RPCs for which this client is still awaiting a
00274          * response.
00275          */
00276         std::set<uint64_t> outstandingRPCNumbers;
00277         /**
00278          * The client's session ID as returned by the open session RPC, or 0 if
00279          * one has not yet been assigned.
00280          */
00281         uint64_t clientId;
00282         /**
00283          * The number to assign to the next RPC.
00284          */
00285         uint64_t nextRPCNumber;
00286         /**
00287          * keepAliveThread blocks on this. Notified when lastKeepAliveStart,
00288          * keepAliveIntervalMs, or exiting changes.
00289          */
00290         Core::ConditionVariable keepAliveCV;
00291         /**
00292          * Flag to keepAliveThread that it should shut down.
00293          */
00294         bool exiting;
00295         /**
00296          * Time just before the last keep-alive or read-write request to the
00297          * cluster was made. The next keep-alive request will be invoked
00298          * keepAliveIntervalMs after this, if no intervening requests are made.
00299          */
00300         TimePoint lastKeepAliveStart;
00301         /**
00302          * How often session keep-alive requests are sent during periods of
00303          * inactivity.
00304          */
00305         std::chrono::milliseconds keepAliveInterval;
00306         /**
00307          * How long to wait for the CloseSession RPC before giving up.
00308          */
00309         std::chrono::milliseconds sessionCloseTimeout;
00310 
00311         /**
00312          * If set, this is an ongoing keep-alive RPC. This call is canceled to
00313          * interrupt #keepAliveThread when exiting.
00314          */
00315         std::unique_ptr<LeaderRPCBase::Call> keepAliveCall;
00316 
00317         /**
00318          * Runs keepAliveThreadMain().
00319          * Since this thread would be unexpected/wasteful for clients that only
00320          * issue read-only requests (or no requests at all), it is spawned
00321          * lazily, if/when the client opens its session with the cluster (upon
00322          * its first read-write request).
00323          */
00324         std::thread keepAliveThread;
00325 
00326         // ExactlyOnceRPCHelper is not copyable.
00327         ExactlyOnceRPCHelper(const ExactlyOnceRPCHelper&) = delete;
00328         ExactlyOnceRPCHelper& operator=(const ExactlyOnceRPCHelper&) = delete;
00329     } exactlyOnceRPCHelper;
00330 
00331     /**
00332      * A thread that runs the Event::Loop.
00333      */
00334     std::thread eventLoopThread;
00335 
00336     // ClientImpl is not copyable
00337     ClientImpl(const ClientImpl&) = delete;
00338     ClientImpl& operator=(const ClientImpl&) = delete;
00339 };
00340 
00341 } // namespace LogCabin::Client
00342 } // namespace LogCabin
00343 
00344 #endif /* LOGCABIN_CLIENT_CLIENTIMPL_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines