LogCabin
Client/ClientImpl.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2014-2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <algorithm>
00018 
00019 #include "Core/Debug.h"
00020 #include "Client/ClientImpl.h"
00021 #include "Core/ProtoBuf.h"
00022 #include "Core/StringUtil.h"
00023 #include "Protocol/Common.h"
00024 #include "RPC/Address.h"
00025 #include "RPC/ClientRPC.h"
00026 #include "RPC/ClientSession.h"
00027 
00028 namespace LogCabin {
00029 namespace Client {
00030 
00031 namespace {
00032 /**
00033  * Parse an error response out of a ProtoBuf and into a Result object.
00034  */
00035 template<typename Message>
00036 Result
00037 treeError(const Message& response)
00038 {
00039     Result result;
00040     result.error = response.error();
00041     switch (response.status()) {
00042         case Protocol::Client::Status::OK:
00043             result.status = Status::OK;
00044             break;
00045         case Protocol::Client::Status::INVALID_ARGUMENT:
00046             result.status = Status::INVALID_ARGUMENT;
00047             break;
00048         case Protocol::Client::Status::LOOKUP_ERROR:
00049             result.status = Status::LOOKUP_ERROR;
00050             break;
00051         case Protocol::Client::Status::TYPE_ERROR:
00052             result.status = Status::TYPE_ERROR;
00053             break;
00054         case Protocol::Client::Status::CONDITION_NOT_MET:
00055             result.status = Status::CONDITION_NOT_MET;
00056             break;
00057         case Protocol::Client::Status::TIMEOUT:
00058             result.status = Status::TIMEOUT;
00059             break;
00060         case Protocol::Client::Status::SESSION_EXPIRED:
00061             PANIC("The client's session to the cluster expired. This is a "
00062                   "fatal error, since without a session the servers can't "
00063                   "tell if retried requests were already applied or not.");
00064             break;
00065         default:
00066             result.status = Status::INVALID_ARGUMENT;
00067             result.error = Core::StringUtil::format(
00068                 "Did not understand status code in response (%u). "
00069                 "Original error was: %s",
00070                 response.status(),
00071                 response.error().c_str());
00072             break;
00073     }
00074     return result;
00075 }
00076 
00077 /**
00078  * If the client has specified a condition for the operation, serialize it into
00079  * the request message.
00080  */
00081 template<typename Message>
00082 void
00083 setCondition(Message& request, const Condition& condition)
00084 {
00085     if (!condition.first.empty()) {
00086         request.mutable_condition()->set_path(condition.first);
00087         request.mutable_condition()->set_contents(condition.second);
00088     }
00089 }
00090 
00091 /**
00092  * Split a path into its components. Helper for ClientImpl::canonicalize.
00093  * \param[in] path
00094  *      Forward slash-delimited path (relative or absolute).
00095  * \param[out] components
00096  *      The components of path are appended to this vector.
00097  */
00098 void
00099 split(const std::string& path, std::vector<std::string>& components)
00100 {
00101     std::string word;
00102     for (auto it = path.begin(); it != path.end(); ++it) {
00103         if (*it == '/') {
00104             if (!word.empty()) {
00105                 components.push_back(word);
00106                 word.clear();
00107             }
00108         } else {
00109             word += *it;
00110         }
00111     }
00112     if (!word.empty())
00113         components.push_back(word);
00114 }
00115 
00116 /**
00117  * Wrapper around LeaderRPC::call() that repackages a timeout as a
00118  * ReadOnlyTree status and error message.
00119  */
00120 void
00121 treeCall(LeaderRPCBase& leaderRPC,
00122          const Protocol::Client::ReadOnlyTree::Request& request,
00123          Protocol::Client::ReadOnlyTree::Response& response,
00124          ClientImpl::TimePoint timeout)
00125 {
00126     VERBOSE("Calling read-only tree query with request:\n%s",
00127             Core::StringUtil::trim(
00128                 Core::ProtoBuf::dumpString(request)).c_str());
00129     LeaderRPC::Status status;
00130     Protocol::Client::StateMachineQuery::Request qrequest;
00131     Protocol::Client::StateMachineQuery::Response qresponse;
00132     *qrequest.mutable_tree() = request;
00133     status = leaderRPC.call(Protocol::Client::OpCode::STATE_MACHINE_QUERY,
00134                             qrequest, qresponse, timeout);
00135     switch (status) {
00136         case LeaderRPC::Status::OK:
00137             response = *qresponse.mutable_tree();
00138             VERBOSE("Reply to read-only tree query:\n%s",
00139                     Core::StringUtil::trim(
00140                         Core::ProtoBuf::dumpString(response)).c_str());
00141             break;
00142         case LeaderRPC::Status::TIMEOUT:
00143             response.set_status(Protocol::Client::Status::TIMEOUT);
00144             response.set_error("Client-specified timeout elapsed");
00145             VERBOSE("Timeout elapsed on read-only tree query");
00146             break;
00147         case LeaderRPC::Status::INVALID_REQUEST:
00148             // TODO(ongaro): Once any new Tree request types are introduced,
00149             // this PANIC will need to move up the call stack, so that we can
00150             // try a new-style request and then ask for forgiveness if it
00151             // fails. Same for the read-write tree calls below.
00152             PANIC("The server and/or replicated state machine doesn't support "
00153                   "the read-only tree query or claims the request is "
00154                   "malformed. Request is: %s",
00155                   Core::ProtoBuf::dumpString(request).c_str());
00156     }
00157 }
00158 
00159 /**
00160  * Wrapper around LeaderRPC::call() that repackages a timeout as a
00161  * ReadWriteTree status and error message. Also checks whether getRPCInfo
00162  * timed out.
00163  */
00164 void
00165 treeCall(LeaderRPCBase& leaderRPC,
00166          const Protocol::Client::ReadWriteTree::Request& request,
00167          Protocol::Client::ReadWriteTree::Response& response,
00168          ClientImpl::TimePoint timeout)
00169 {
00170     VERBOSE("Calling read-write tree command with request:\n%s",
00171             Core::StringUtil::trim(
00172                 Core::ProtoBuf::dumpString(request)).c_str());
00173     Protocol::Client::StateMachineCommand::Request crequest;
00174     Protocol::Client::StateMachineCommand::Response cresponse;
00175     *crequest.mutable_tree() = request;
00176     LeaderRPC::Status status;
00177     if (request.exactly_once().client_id() == 0) {
00178         VERBOSE("Already timed out on establishing session for read-write "
00179                 "tree command");
00180         status = LeaderRPC::Status::TIMEOUT;
00181     } else {
00182         status = leaderRPC.call(Protocol::Client::OpCode::STATE_MACHINE_COMMAND,
00183                                 crequest, cresponse, timeout);
00184     }
00185 
00186     switch (status) {
00187         case LeaderRPC::Status::OK:
00188             response = *cresponse.mutable_tree();
00189             VERBOSE("Reply to read-write tree command:\n%s",
00190                     Core::StringUtil::trim(
00191                         Core::ProtoBuf::dumpString(response)).c_str());
00192             break;
00193         case LeaderRPC::Status::TIMEOUT:
00194             response.set_status(Protocol::Client::Status::TIMEOUT);
00195             response.set_error("Client-specified timeout elapsed");
00196             VERBOSE("Timeout elapsed on read-write tree command");
00197             break;
00198         case LeaderRPC::Status::INVALID_REQUEST:
00199             PANIC("The server and/or replicated state machine doesn't support "
00200                   "the read-write tree command or claims the request is "
00201                   "malformed. Request is: %s",
00202                   Core::ProtoBuf::dumpString(request).c_str());
00203     }
00204 }
00205 
00206 
00207 } // anonymous namespace
00208 
00209 using Protocol::Client::OpCode;
00210 
00211 
00212 ////////// class ClientImpl::ExactlyOnceRPCHelper //////////
00213 
00214 ClientImpl::ExactlyOnceRPCHelper::ExactlyOnceRPCHelper(ClientImpl* client)
00215     : client(client)
00216     , mutex()
00217     , outstandingRPCNumbers()
00218     , clientId(0)
00219     , nextRPCNumber(1)
00220     , keepAliveCV()
00221     , exiting(false)
00222     , lastKeepAliveStart(TimePoint::min())
00223       // TODO(ongaro): set dynamically based on cluster configuration
00224     , keepAliveInterval(std::chrono::milliseconds(60 * 1000))
00225     , sessionCloseTimeout(std::chrono::milliseconds(
00226         client->config.read<uint64_t>(
00227             "sessionCloseTimeoutMilliseconds",
00228             client->config.read<uint64_t>(
00229                 "tcpConnectTimeoutMilliseconds",
00230                 1000))))
00231     , keepAliveCall()
00232     , keepAliveThread()
00233 {
00234 }
00235 
00236 ClientImpl::ExactlyOnceRPCHelper::~ExactlyOnceRPCHelper()
00237 {
00238 }
00239 
00240 void
00241 ClientImpl::ExactlyOnceRPCHelper::exit()
00242 {
00243     {
00244         std::lock_guard<Core::Mutex> lockGuard(mutex);
00245         exiting = true;
00246         keepAliveCV.notify_all();
00247         if (keepAliveCall)
00248             keepAliveCall->cancel();
00249         if (clientId > 0) {
00250             Protocol::Client::StateMachineCommand::Request request;
00251             Protocol::Client::StateMachineCommand::Response response;
00252             request.mutable_close_session()->set_client_id(clientId);
00253             LeaderRPC::Status status = client->leaderRPC->call(
00254                     OpCode::STATE_MACHINE_COMMAND,
00255                     request,
00256                     response,
00257                     ClientImpl::Clock::now() + sessionCloseTimeout);
00258             switch (status) {
00259                 case LeaderRPC::Status::OK:
00260                     break;
00261                 case LeaderRPC::Status::TIMEOUT:
00262                     using Core::StringUtil::toString;
00263                     WARNING("Could not definitively close client session %lu "
00264                             "within timeout (%s). It may remain open until it "
00265                             "expires.",
00266                             clientId,
00267                             toString(sessionCloseTimeout).c_str());
00268                     break;
00269                 case LeaderRPC::Status::INVALID_REQUEST:
00270                     WARNING("The server and/or replicated state machine "
00271                             "doesn't support the CloseSession command or "
00272                             "claims the request is malformed. This client's "
00273                             "session (%lu) will remain open until it expires. "
00274                             "Consider upgrading your servers (this command "
00275                             "was introduced in state machine version 2).",
00276                             clientId);
00277                     break;
00278             }
00279         }
00280     }
00281     if (keepAliveThread.joinable())
00282         keepAliveThread.join();
00283 }
00284 
00285 Protocol::Client::ExactlyOnceRPCInfo
00286 ClientImpl::ExactlyOnceRPCHelper::getRPCInfo(TimePoint timeout)
00287 {
00288     std::lock_guard<Core::Mutex> lockGuard(mutex);
00289     return getRPCInfo(Core::HoldingMutex(lockGuard), timeout);
00290 }
00291 
00292 void
00293 ClientImpl::ExactlyOnceRPCHelper::doneWithRPC(
00294         const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo)
00295 {
00296     std::lock_guard<Core::Mutex> lockGuard(mutex);
00297     doneWithRPC(rpcInfo, Core::HoldingMutex(lockGuard));
00298 }
00299 
00300 Protocol::Client::ExactlyOnceRPCInfo
00301 ClientImpl::ExactlyOnceRPCHelper::getRPCInfo(
00302         Core::HoldingMutex holdingMutex,
00303         TimePoint timeout)
00304 {
00305     Protocol::Client::ExactlyOnceRPCInfo rpcInfo;
00306     if (client == NULL) {
00307         // Filling in rpcInfo is disabled for some unit tests, since it's
00308         // easier if they treat rpcInfo opaquely.
00309         return rpcInfo;
00310     }
00311     if (clientId == 0) {
00312         lastKeepAliveStart = Clock::now();
00313         Protocol::Client::StateMachineCommand::Request request;
00314         Protocol::Client::StateMachineCommand::Response response;
00315         request.mutable_open_session();
00316         LeaderRPC::Status status =
00317             client->leaderRPC->call(OpCode::STATE_MACHINE_COMMAND,
00318                                     request,
00319                                     response,
00320                                     timeout);
00321         switch (status) {
00322             case LeaderRPC::Status::OK:
00323                 break;
00324             case LeaderRPC::Status::TIMEOUT:
00325                 rpcInfo.set_client_id(0);
00326                 return rpcInfo;
00327             case LeaderRPC::Status::INVALID_REQUEST:
00328                 PANIC("The server and/or replicated state machine doesn't "
00329                       "support the OpenSession command or claims the request "
00330                       "is malformed");
00331         }
00332         clientId = response.open_session().client_id();
00333         assert(clientId > 0);
00334         keepAliveThread = std::thread(
00335             &ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain,
00336             this);
00337     }
00338 
00339     lastKeepAliveStart = Clock::now();
00340     keepAliveCV.notify_all();
00341     rpcInfo.set_client_id(clientId);
00342     uint64_t rpcNumber = nextRPCNumber;
00343     ++nextRPCNumber;
00344     rpcInfo.set_rpc_number(rpcNumber);
00345     outstandingRPCNumbers.insert(rpcNumber);
00346     rpcInfo.set_first_outstanding_rpc(*outstandingRPCNumbers.begin());
00347     return rpcInfo;
00348 }
00349 
00350 void
00351 ClientImpl::ExactlyOnceRPCHelper::doneWithRPC(
00352                     const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo,
00353                     Core::HoldingMutex holdingMutex)
00354 {
00355     outstandingRPCNumbers.erase(rpcInfo.rpc_number());
00356 }
00357 
00358 void
00359 ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain()
00360 {
00361     std::unique_lock<Core::Mutex> lockGuard(mutex);
00362     while (!exiting) {
00363         TimePoint nextKeepAlive;
00364         if (keepAliveInterval.count() > 0) {
00365             nextKeepAlive = lastKeepAliveStart + keepAliveInterval;
00366         } else {
00367             nextKeepAlive = TimePoint::max();
00368         }
00369         if (Clock::now() > nextKeepAlive) {
00370             Protocol::Client::StateMachineCommand::Request request;
00371             Protocol::Client::ReadWriteTree::Request& trequest =
00372                 *request.mutable_tree();
00373             *trequest.mutable_exactly_once() = getRPCInfo(
00374                 Core::HoldingMutex(lockGuard),
00375                 TimePoint::max());
00376             setCondition(trequest,
00377                  {"keepalive",
00378                  "this is just a no-op to keep the client's session active; "
00379                  "the condition is expected to fail"});
00380             trequest.mutable_write()->set_path("keepalive");
00381             trequest.mutable_write()->set_contents("you shouldn't see this!");
00382             Protocol::Client::StateMachineCommand::Response response;
00383             keepAliveCall = client->leaderRPC->makeCall();
00384             keepAliveCall->start(OpCode::STATE_MACHINE_COMMAND, request,
00385                                  TimePoint::max());
00386             LeaderRPCBase::Call::Status callStatus;
00387             {
00388                 // release lock to allow concurrent cancellation
00389                 Core::MutexUnlock<Core::Mutex> unlockGuard(lockGuard);
00390                 callStatus = keepAliveCall->wait(response, TimePoint::max());
00391             }
00392             keepAliveCall.reset();
00393             switch (callStatus) {
00394                 case LeaderRPCBase::Call::Status::OK:
00395                     break;
00396                 case LeaderRPCBase::Call::Status::RETRY:
00397                     doneWithRPC(trequest.exactly_once(),
00398                                 Core::HoldingMutex(lockGuard));
00399                     continue; // retry outer loop
00400                 case LeaderRPCBase::Call::Status::TIMEOUT:
00401                     PANIC("Unexpected timeout for keep-alive");
00402                 case LeaderRPCBase::Call::Status::INVALID_REQUEST:
00403                     PANIC("The server rejected our keep-alive request (Tree "
00404                           "write with unmet condition) as invalid");
00405             }
00406             doneWithRPC(trequest.exactly_once(),
00407                         Core::HoldingMutex(lockGuard));
00408             const Protocol::Client::ReadWriteTree::Response& tresponse =
00409                 response.tree();
00410             if (tresponse.status() !=
00411                 Protocol::Client::Status::CONDITION_NOT_MET) {
00412                 WARNING("Keep-alive write should have failed its condition. "
00413                         "Unexpected status was %d: %s",
00414                         tresponse.status(),
00415                         tresponse.error().c_str());
00416             }
00417             continue;
00418         }
00419         keepAliveCV.wait_until(lockGuard, nextKeepAlive);
00420     }
00421 }
00422 
00423 ////////// class ClientImpl //////////
00424 
00425 ClientImpl::TimePoint
00426 ClientImpl::absTimeout(uint64_t relTimeoutNanos)
00427 {
00428     if (relTimeoutNanos == 0)
00429         return ClientImpl::TimePoint::max();
00430     ClientImpl::TimePoint now = ClientImpl::Clock::now();
00431     ClientImpl::TimePoint then =
00432         now + std::chrono::nanoseconds(relTimeoutNanos);
00433     if (then < now) // overflow
00434         return ClientImpl::TimePoint::max();
00435     else
00436         return then;
00437 }
00438 
00439 ClientImpl::ClientImpl(const std::map<std::string, std::string>& options)
00440     : config(options)
00441     , eventLoop()
00442     , clusterUUID()
00443     , sessionManager(eventLoop, config)
00444     , sessionCreationBackoff(5,                   // 5 new connections per
00445                              100UL * 1000 * 1000) // 100 ms
00446     , hosts()
00447     , leaderRPC()             // set in init()
00448     , exactlyOnceRPCHelper(this)
00449     , eventLoopThread()
00450 {
00451     NOTICE("Configuration settings:\n"
00452            "# begin config\n"
00453            "%s"
00454            "# end config",
00455            Core::StringUtil::toString(config).c_str());
00456     std::string uuid = config.read("clusterUUID", std::string(""));
00457     if (!uuid.empty())
00458         clusterUUID.set(uuid);
00459 }
00460 
00461 ClientImpl::~ClientImpl()
00462 {
00463     exactlyOnceRPCHelper.exit();
00464     eventLoop.exit();
00465     if (eventLoopThread.joinable())
00466         eventLoopThread.join();
00467 }
00468 
00469 void
00470 ClientImpl::init(const std::string& hosts)
00471 {
00472     this->hosts = hosts;
00473     eventLoopThread = std::thread(&Event::Loop::runForever, &eventLoop);
00474     initDerived();
00475 }
00476 
00477 void
00478 ClientImpl::initDerived()
00479 {
00480     if (!leaderRPC) { // sometimes set in unit tests
00481         NOTICE("Using server list: %s", hosts.c_str());
00482         leaderRPC.reset(new LeaderRPC(
00483             RPC::Address(hosts, Protocol::Common::DEFAULT_PORT),
00484             clusterUUID,
00485             sessionCreationBackoff,
00486             sessionManager));
00487     }
00488 }
00489 
00490 GetConfigurationResult
00491 ClientImpl::getConfiguration(TimePoint timeout)
00492 {
00493     Protocol::Client::GetConfiguration::Request request;
00494     Protocol::Client::GetConfiguration::Response response;
00495     typedef LeaderRPCBase::Status RPCStatus;
00496     RPCStatus status = leaderRPC->call(
00497         OpCode::GET_CONFIGURATION, request, response, timeout);
00498     Configuration configuration;
00499     for (auto it = response.servers().begin();
00500          it != response.servers().end();
00501          ++it) {
00502         configuration.push_back({it->server_id(), it->addresses()});
00503     }
00504     GetConfigurationResult result;
00505     if (status == RPCStatus::TIMEOUT) {
00506         result.status = GetConfigurationResult::Status::TIMEOUT;
00507         result.error = "Client-specified timeout elapsed";
00508         return result;
00509     } else {
00510         GetConfigurationResult configurationResult;
00511         result.status = GetConfigurationResult::Status::OK;
00512         result.configuration = response.id();
00513         result.servers = configuration;
00514         return result;
00515     }
00516 }
00517 
00518 ConfigurationResult
00519 ClientImpl::setConfiguration(uint64_t oldId,
00520                              const Configuration& newConfiguration,
00521                              TimePoint timeout)
00522 {
00523     Protocol::Client::SetConfiguration::Request request;
00524     request.set_old_id(oldId);
00525     for (auto it = newConfiguration.begin();
00526          it != newConfiguration.end();
00527          ++it) {
00528         Protocol::Client::Server* s = request.add_new_servers();
00529         s->set_server_id(it->serverId);
00530         s->set_addresses(it->addresses);
00531     }
00532     Protocol::Client::SetConfiguration::Response response;
00533     typedef LeaderRPCBase::Status RPCStatus;
00534     RPCStatus status = leaderRPC->call(
00535         OpCode::SET_CONFIGURATION, request, response, timeout);
00536     ConfigurationResult result;
00537     if (status == RPCStatus::TIMEOUT) {
00538         result.status = ConfigurationResult::Status::TIMEOUT;
00539         result.error = "Client-specified timeout elapsed";
00540         return result;
00541     }
00542     if (response.has_ok()) {
00543         result.status = ConfigurationResult::OK;
00544         return result;
00545     }
00546     if (response.has_configuration_changed()) {
00547         result.status = ConfigurationResult::CHANGED;
00548         result.error = ("configuration changed: " +
00549                         response.configuration_changed().error());
00550         return result;
00551     }
00552     if (response.has_configuration_bad()) {
00553         result.status = ConfigurationResult::BAD;
00554         for (auto it = response.configuration_bad().bad_servers().begin();
00555              it != response.configuration_bad().bad_servers().end();
00556              ++it) {
00557             result.badServers.emplace_back(it->server_id(), it->addresses());
00558         }
00559         result.error = "servers slow or unavailable";
00560         return result;
00561     }
00562     PANIC("Did not understand server response to setConfiguration RPC:\n%s",
00563           Core::ProtoBuf::dumpString(response).c_str());
00564 }
00565 
00566 Result
00567 ClientImpl::getServerInfo(const std::string& host,
00568                           TimePoint timeout,
00569                           Server& info)
00570 {
00571     Result timeoutResult;
00572     timeoutResult.status = Client::Status::TIMEOUT;
00573     timeoutResult.error = "Client-specified timeout elapsed";
00574 
00575     while (true) {
00576         sessionCreationBackoff.delayAndBegin(timeout);
00577 
00578         RPC::Address address(host, Protocol::Common::DEFAULT_PORT);
00579         address.refresh(timeout);
00580 
00581         std::shared_ptr<RPC::ClientSession> session =
00582             sessionManager.createSession(address, timeout, &clusterUUID);
00583 
00584         Protocol::Client::GetServerInfo::Request request;
00585         RPC::ClientRPC rpc(session,
00586                            Protocol::Common::ServiceId::CLIENT_SERVICE,
00587                            1,
00588                            OpCode::GET_SERVER_INFO,
00589                            request);
00590 
00591         typedef RPC::ClientRPC::Status RPCStatus;
00592         Protocol::Client::GetServerInfo::Response response;
00593         Protocol::Client::Error error;
00594         RPCStatus status = rpc.waitForReply(&response, &error, timeout);
00595 
00596         // Decode the response
00597         switch (status) {
00598             case RPCStatus::OK:
00599                 info.serverId = response.server_info().server_id();
00600                 info.addresses = response.server_info().addresses();
00601                 return Result();
00602             case RPCStatus::RPC_FAILED:
00603                 break;
00604             case RPCStatus::TIMEOUT:
00605                 return timeoutResult;
00606             case RPCStatus::SERVICE_SPECIFIC_ERROR:
00607                 // Hmm, we don't know what this server is trying to tell us,
00608                 // but something is wrong. The server shouldn't reply back with
00609                 // error codes we don't understand. That's why we gave it a
00610                 // serverSpecificErrorVersion number in the request header.
00611                 PANIC("Unknown error code %u returned in service-specific "
00612                       "error. This probably indicates a bug in the server",
00613                       error.error_code());
00614                 break;
00615             case RPCStatus::RPC_CANCELED:
00616                 PANIC("RPC canceled unexpectedly");
00617             case RPCStatus::INVALID_SERVICE:
00618                 PANIC("The server isn't running the ClientService");
00619             case RPCStatus::INVALID_REQUEST:
00620                 PANIC("The server's ClientService doesn't support the "
00621                       "GetServerInfo RPC or claims the request is malformed");
00622         }
00623         if (timeout < Clock::now())
00624             return timeoutResult;
00625         else
00626             continue;
00627     }
00628 }
00629 
00630 Result
00631 ClientImpl::canonicalize(const std::string& path,
00632                          const std::string& workingDirectory,
00633                          std::string& canonical)
00634 {
00635     canonical = "";
00636     std::vector<std::string> components;
00637     if (!path.empty() && *path.begin() != '/') {
00638         if (workingDirectory.empty() || *workingDirectory.begin() != '/') {
00639             Result result;
00640             result.status = Status::INVALID_ARGUMENT;
00641             result.error = Core::StringUtil::format(
00642                         "Can't use relative path '%s' from working directory "
00643                         "'%s' (working directory should be an absolute path)",
00644                         path.c_str(),
00645                         workingDirectory.c_str());
00646             return result;
00647 
00648         }
00649         split(workingDirectory, components);
00650     }
00651     split(path, components);
00652     // Iron out any ".", ".."
00653     size_t i = 0;
00654     while (i < components.size()) {
00655         if (components.at(i) == "..") {
00656             if (i > 0) {
00657                 // erase previous and ".." components
00658                 components.erase(components.begin() + ssize_t(i) - 1,
00659                                  components.begin() + ssize_t(i) + 1);
00660                 --i;
00661             } else {
00662                 Result result;
00663                 result.status = Status::INVALID_ARGUMENT;
00664                 result.error = Core::StringUtil::format(
00665                             "Path '%s' from working directory '%s' attempts "
00666                             "to look up directory above root ('/')",
00667                             path.c_str(),
00668                             workingDirectory.c_str());
00669                 return result;
00670             }
00671         } else if (components.at(i) == ".") {
00672             components.erase(components.begin() + ssize_t(i));
00673         } else {
00674             ++i;
00675         }
00676     }
00677     if (components.empty()) {
00678         canonical = "/";
00679     } else {
00680         for (auto it = components.begin(); it != components.end(); ++it)
00681             canonical += "/" + *it;
00682     }
00683     return Result();
00684 }
00685 
00686 Result
00687 ClientImpl::makeDirectory(const std::string& path,
00688                           const std::string& workingDirectory,
00689                           const Condition& condition,
00690                           TimePoint timeout)
00691 {
00692     std::string realPath;
00693     Result result = canonicalize(path, workingDirectory, realPath);
00694     if (result.status != Status::OK)
00695         return result;
00696     Protocol::Client::ReadWriteTree::Request request;
00697     *request.mutable_exactly_once() =
00698         exactlyOnceRPCHelper.getRPCInfo(timeout);
00699     setCondition(request, condition);
00700     request.mutable_make_directory()->set_path(realPath);
00701     Protocol::Client::ReadWriteTree::Response response;
00702     treeCall(*leaderRPC,
00703              request, response, timeout);
00704     exactlyOnceRPCHelper.doneWithRPC(request.exactly_once());
00705     if (response.status() != Protocol::Client::Status::OK)
00706         return treeError(response);
00707     return Result();
00708 }
00709 
00710 Result
00711 ClientImpl::listDirectory(const std::string& path,
00712                           const std::string& workingDirectory,
00713                           const Condition& condition,
00714                           TimePoint timeout,
00715                           std::vector<std::string>& children)
00716 {
00717     children.clear();
00718     std::string realPath;
00719     Result result = canonicalize(path, workingDirectory, realPath);
00720     if (result.status != Status::OK)
00721         return result;
00722     Protocol::Client::ReadOnlyTree::Request request;
00723     setCondition(request, condition);
00724     request.mutable_list_directory()->set_path(realPath);
00725     Protocol::Client::ReadOnlyTree::Response response;
00726     treeCall(*leaderRPC,
00727              request, response, timeout);
00728     if (response.status() != Protocol::Client::Status::OK)
00729         return treeError(response);
00730     children = std::vector<std::string>(
00731                     response.list_directory().child().begin(),
00732                     response.list_directory().child().end());
00733     return Result();
00734 }
00735 
00736 Result
00737 ClientImpl::removeDirectory(const std::string& path,
00738                             const std::string& workingDirectory,
00739                             const Condition& condition,
00740                             TimePoint timeout)
00741 {
00742     std::string realPath;
00743     Result result = canonicalize(path, workingDirectory, realPath);
00744     if (result.status != Status::OK)
00745         return result;
00746     Protocol::Client::ReadWriteTree::Request request;
00747     *request.mutable_exactly_once() =
00748         exactlyOnceRPCHelper.getRPCInfo(timeout);
00749     setCondition(request, condition);
00750     request.mutable_remove_directory()->set_path(realPath);
00751     Protocol::Client::ReadWriteTree::Response response;
00752     treeCall(*leaderRPC,
00753              request, response, timeout);
00754     exactlyOnceRPCHelper.doneWithRPC(request.exactly_once());
00755     if (response.status() != Protocol::Client::Status::OK)
00756         return treeError(response);
00757     return Result();
00758 }
00759 
00760 Result
00761 ClientImpl::write(const std::string& path,
00762                   const std::string& workingDirectory,
00763                   const std::string& contents,
00764                   const Condition& condition,
00765                   TimePoint timeout)
00766 {
00767     std::string realPath;
00768     Result result = canonicalize(path, workingDirectory, realPath);
00769     if (result.status != Status::OK)
00770         return result;
00771     Protocol::Client::ReadWriteTree::Request request;
00772     *request.mutable_exactly_once() =
00773         exactlyOnceRPCHelper.getRPCInfo(timeout);
00774     setCondition(request, condition);
00775     request.mutable_write()->set_path(realPath);
00776     request.mutable_write()->set_contents(contents);
00777     Protocol::Client::ReadWriteTree::Response response;
00778     treeCall(*leaderRPC,
00779              request, response, timeout);
00780     exactlyOnceRPCHelper.doneWithRPC(request.exactly_once());
00781     if (response.status() != Protocol::Client::Status::OK)
00782         return treeError(response);
00783     return Result();
00784 }
00785 
00786 Result
00787 ClientImpl::read(const std::string& path,
00788                  const std::string& workingDirectory,
00789                  const Condition& condition,
00790                  TimePoint timeout,
00791                  std::string& contents)
00792 {
00793     contents = "";
00794     std::string realPath;
00795     Result result = canonicalize(path, workingDirectory, realPath);
00796     if (result.status != Status::OK)
00797         return result;
00798     Protocol::Client::ReadOnlyTree::Request request;
00799     setCondition(request, condition);
00800     request.mutable_read()->set_path(realPath);
00801     Protocol::Client::ReadOnlyTree::Response response;
00802     treeCall(*leaderRPC,
00803              request, response, timeout);
00804     if (response.status() != Protocol::Client::Status::OK)
00805         return treeError(response);
00806     contents = response.read().contents();
00807     return Result();
00808 }
00809 
00810 Result
00811 ClientImpl::removeFile(const std::string& path,
00812                        const std::string& workingDirectory,
00813                        const Condition& condition,
00814                        TimePoint timeout)
00815 {
00816     std::string realPath;
00817     Result result = canonicalize(path, workingDirectory, realPath);
00818     if (result.status != Status::OK)
00819         return result;
00820     Protocol::Client::ReadWriteTree::Request request;
00821     *request.mutable_exactly_once() =
00822         exactlyOnceRPCHelper.getRPCInfo(timeout);
00823     setCondition(request, condition);
00824     request.mutable_remove_file()->set_path(realPath);
00825     Protocol::Client::ReadWriteTree::Response response;
00826     treeCall(*leaderRPC,
00827              request, response, timeout);
00828     exactlyOnceRPCHelper.doneWithRPC(request.exactly_once());
00829     if (response.status() != Protocol::Client::Status::OK)
00830         return treeError(response);
00831     return Result();
00832 }
00833 
00834 Result
00835 ClientImpl::serverControl(const std::string& host,
00836                           TimePoint timeout,
00837                           Protocol::ServerControl::OpCode opCode,
00838                           const google::protobuf::Message& request,
00839                           google::protobuf::Message& response)
00840 {
00841     Result timeoutResult;
00842     timeoutResult.status = Client::Status::TIMEOUT;
00843     timeoutResult.error = "Client-specified timeout elapsed";
00844 
00845     while (true) {
00846         sessionCreationBackoff.delayAndBegin(timeout);
00847 
00848         RPC::Address address(host, Protocol::Common::DEFAULT_PORT);
00849         address.refresh(timeout);
00850 
00851         // TODO(ongaro): Ideally we'd learn the serverID the same way we learn
00852         // the cluster UUID and then assert that in future calls. In practice,
00853         // we're only making one call for now, so it doesn't matter.
00854         std::shared_ptr<RPC::ClientSession> session =
00855             sessionManager.createSession(address, timeout, &clusterUUID);
00856 
00857         RPC::ClientRPC rpc(session,
00858                            Protocol::Common::ServiceId::CONTROL_SERVICE,
00859                            1,
00860                            opCode,
00861                            request);
00862 
00863         typedef RPC::ClientRPC::Status RPCStatus;
00864         Protocol::Client::Error error;
00865         RPCStatus status = rpc.waitForReply(&response, &error, timeout);
00866 
00867         // Decode the response
00868         switch (status) {
00869             case RPCStatus::OK:
00870                 return Result();
00871             case RPCStatus::RPC_FAILED:
00872                 break;
00873             case RPCStatus::TIMEOUT:
00874                 return timeoutResult;
00875             case RPCStatus::SERVICE_SPECIFIC_ERROR:
00876                 // Hmm, we don't know what this server is trying to tell us,
00877                 // but something is wrong. The server shouldn't reply back with
00878                 // error codes we don't understand. That's why we gave it a
00879                 // serverSpecificErrorVersion number in the request header.
00880                 PANIC("Unknown error code %u returned in service-specific "
00881                       "error. This probably indicates a bug in the server",
00882                       error.error_code());
00883                 break;
00884             case RPCStatus::RPC_CANCELED:
00885                 PANIC("RPC canceled unexpectedly");
00886             case RPCStatus::INVALID_SERVICE:
00887                 PANIC("The server isn't running the ControlService");
00888             case RPCStatus::INVALID_REQUEST:
00889                 // ControlService was added in v1.1.0.
00890                 EXIT("The server's ControlService doesn't support the "
00891                      "RPC or claims the request is malformed");
00892         }
00893         if (timeout < Clock::now())
00894             return timeoutResult;
00895         else
00896             continue;
00897     }
00898 }
00899 
00900 
00901 } // namespace LogCabin::Client
00902 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines