LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2014-2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <algorithm> 00018 00019 #include "Core/Debug.h" 00020 #include "Client/ClientImpl.h" 00021 #include "Core/ProtoBuf.h" 00022 #include "Core/StringUtil.h" 00023 #include "Protocol/Common.h" 00024 #include "RPC/Address.h" 00025 #include "RPC/ClientRPC.h" 00026 #include "RPC/ClientSession.h" 00027 00028 namespace LogCabin { 00029 namespace Client { 00030 00031 namespace { 00032 /** 00033 * Parse an error response out of a ProtoBuf and into a Result object. 00034 */ 00035 template<typename Message> 00036 Result 00037 treeError(const Message& response) 00038 { 00039 Result result; 00040 result.error = response.error(); 00041 switch (response.status()) { 00042 case Protocol::Client::Status::OK: 00043 result.status = Status::OK; 00044 break; 00045 case Protocol::Client::Status::INVALID_ARGUMENT: 00046 result.status = Status::INVALID_ARGUMENT; 00047 break; 00048 case Protocol::Client::Status::LOOKUP_ERROR: 00049 result.status = Status::LOOKUP_ERROR; 00050 break; 00051 case Protocol::Client::Status::TYPE_ERROR: 00052 result.status = Status::TYPE_ERROR; 00053 break; 00054 case Protocol::Client::Status::CONDITION_NOT_MET: 00055 result.status = Status::CONDITION_NOT_MET; 00056 break; 00057 case Protocol::Client::Status::TIMEOUT: 00058 result.status = Status::TIMEOUT; 00059 break; 00060 case Protocol::Client::Status::SESSION_EXPIRED: 00061 PANIC("The client's session to the cluster expired. This is a " 00062 "fatal error, since without a session the servers can't " 00063 "tell if retried requests were already applied or not."); 00064 break; 00065 default: 00066 result.status = Status::INVALID_ARGUMENT; 00067 result.error = Core::StringUtil::format( 00068 "Did not understand status code in response (%u). " 00069 "Original error was: %s", 00070 response.status(), 00071 response.error().c_str()); 00072 break; 00073 } 00074 return result; 00075 } 00076 00077 /** 00078 * If the client has specified a condition for the operation, serialize it into 00079 * the request message. 00080 */ 00081 template<typename Message> 00082 void 00083 setCondition(Message& request, const Condition& condition) 00084 { 00085 if (!condition.first.empty()) { 00086 request.mutable_condition()->set_path(condition.first); 00087 request.mutable_condition()->set_contents(condition.second); 00088 } 00089 } 00090 00091 /** 00092 * Split a path into its components. Helper for ClientImpl::canonicalize. 00093 * \param[in] path 00094 * Forward slash-delimited path (relative or absolute). 00095 * \param[out] components 00096 * The components of path are appended to this vector. 00097 */ 00098 void 00099 split(const std::string& path, std::vector<std::string>& components) 00100 { 00101 std::string word; 00102 for (auto it = path.begin(); it != path.end(); ++it) { 00103 if (*it == '/') { 00104 if (!word.empty()) { 00105 components.push_back(word); 00106 word.clear(); 00107 } 00108 } else { 00109 word += *it; 00110 } 00111 } 00112 if (!word.empty()) 00113 components.push_back(word); 00114 } 00115 00116 /** 00117 * Wrapper around LeaderRPC::call() that repackages a timeout as a 00118 * ReadOnlyTree status and error message. 00119 */ 00120 void 00121 treeCall(LeaderRPCBase& leaderRPC, 00122 const Protocol::Client::ReadOnlyTree::Request& request, 00123 Protocol::Client::ReadOnlyTree::Response& response, 00124 ClientImpl::TimePoint timeout) 00125 { 00126 VERBOSE("Calling read-only tree query with request:\n%s", 00127 Core::StringUtil::trim( 00128 Core::ProtoBuf::dumpString(request)).c_str()); 00129 LeaderRPC::Status status; 00130 Protocol::Client::StateMachineQuery::Request qrequest; 00131 Protocol::Client::StateMachineQuery::Response qresponse; 00132 *qrequest.mutable_tree() = request; 00133 status = leaderRPC.call(Protocol::Client::OpCode::STATE_MACHINE_QUERY, 00134 qrequest, qresponse, timeout); 00135 switch (status) { 00136 case LeaderRPC::Status::OK: 00137 response = *qresponse.mutable_tree(); 00138 VERBOSE("Reply to read-only tree query:\n%s", 00139 Core::StringUtil::trim( 00140 Core::ProtoBuf::dumpString(response)).c_str()); 00141 break; 00142 case LeaderRPC::Status::TIMEOUT: 00143 response.set_status(Protocol::Client::Status::TIMEOUT); 00144 response.set_error("Client-specified timeout elapsed"); 00145 VERBOSE("Timeout elapsed on read-only tree query"); 00146 break; 00147 case LeaderRPC::Status::INVALID_REQUEST: 00148 // TODO(ongaro): Once any new Tree request types are introduced, 00149 // this PANIC will need to move up the call stack, so that we can 00150 // try a new-style request and then ask for forgiveness if it 00151 // fails. Same for the read-write tree calls below. 00152 PANIC("The server and/or replicated state machine doesn't support " 00153 "the read-only tree query or claims the request is " 00154 "malformed. Request is: %s", 00155 Core::ProtoBuf::dumpString(request).c_str()); 00156 } 00157 } 00158 00159 /** 00160 * Wrapper around LeaderRPC::call() that repackages a timeout as a 00161 * ReadWriteTree status and error message. Also checks whether getRPCInfo 00162 * timed out. 00163 */ 00164 void 00165 treeCall(LeaderRPCBase& leaderRPC, 00166 const Protocol::Client::ReadWriteTree::Request& request, 00167 Protocol::Client::ReadWriteTree::Response& response, 00168 ClientImpl::TimePoint timeout) 00169 { 00170 VERBOSE("Calling read-write tree command with request:\n%s", 00171 Core::StringUtil::trim( 00172 Core::ProtoBuf::dumpString(request)).c_str()); 00173 Protocol::Client::StateMachineCommand::Request crequest; 00174 Protocol::Client::StateMachineCommand::Response cresponse; 00175 *crequest.mutable_tree() = request; 00176 LeaderRPC::Status status; 00177 if (request.exactly_once().client_id() == 0) { 00178 VERBOSE("Already timed out on establishing session for read-write " 00179 "tree command"); 00180 status = LeaderRPC::Status::TIMEOUT; 00181 } else { 00182 status = leaderRPC.call(Protocol::Client::OpCode::STATE_MACHINE_COMMAND, 00183 crequest, cresponse, timeout); 00184 } 00185 00186 switch (status) { 00187 case LeaderRPC::Status::OK: 00188 response = *cresponse.mutable_tree(); 00189 VERBOSE("Reply to read-write tree command:\n%s", 00190 Core::StringUtil::trim( 00191 Core::ProtoBuf::dumpString(response)).c_str()); 00192 break; 00193 case LeaderRPC::Status::TIMEOUT: 00194 response.set_status(Protocol::Client::Status::TIMEOUT); 00195 response.set_error("Client-specified timeout elapsed"); 00196 VERBOSE("Timeout elapsed on read-write tree command"); 00197 break; 00198 case LeaderRPC::Status::INVALID_REQUEST: 00199 PANIC("The server and/or replicated state machine doesn't support " 00200 "the read-write tree command or claims the request is " 00201 "malformed. Request is: %s", 00202 Core::ProtoBuf::dumpString(request).c_str()); 00203 } 00204 } 00205 00206 00207 } // anonymous namespace 00208 00209 using Protocol::Client::OpCode; 00210 00211 00212 ////////// class ClientImpl::ExactlyOnceRPCHelper ////////// 00213 00214 ClientImpl::ExactlyOnceRPCHelper::ExactlyOnceRPCHelper(ClientImpl* client) 00215 : client(client) 00216 , mutex() 00217 , outstandingRPCNumbers() 00218 , clientId(0) 00219 , nextRPCNumber(1) 00220 , keepAliveCV() 00221 , exiting(false) 00222 , lastKeepAliveStart(TimePoint::min()) 00223 // TODO(ongaro): set dynamically based on cluster configuration 00224 , keepAliveInterval(std::chrono::milliseconds(60 * 1000)) 00225 , sessionCloseTimeout(std::chrono::milliseconds( 00226 client->config.read<uint64_t>( 00227 "sessionCloseTimeoutMilliseconds", 00228 client->config.read<uint64_t>( 00229 "tcpConnectTimeoutMilliseconds", 00230 1000)))) 00231 , keepAliveCall() 00232 , keepAliveThread() 00233 { 00234 } 00235 00236 ClientImpl::ExactlyOnceRPCHelper::~ExactlyOnceRPCHelper() 00237 { 00238 } 00239 00240 void 00241 ClientImpl::ExactlyOnceRPCHelper::exit() 00242 { 00243 { 00244 std::lock_guard<Core::Mutex> lockGuard(mutex); 00245 exiting = true; 00246 keepAliveCV.notify_all(); 00247 if (keepAliveCall) 00248 keepAliveCall->cancel(); 00249 if (clientId > 0) { 00250 Protocol::Client::StateMachineCommand::Request request; 00251 Protocol::Client::StateMachineCommand::Response response; 00252 request.mutable_close_session()->set_client_id(clientId); 00253 LeaderRPC::Status status = client->leaderRPC->call( 00254 OpCode::STATE_MACHINE_COMMAND, 00255 request, 00256 response, 00257 ClientImpl::Clock::now() + sessionCloseTimeout); 00258 switch (status) { 00259 case LeaderRPC::Status::OK: 00260 break; 00261 case LeaderRPC::Status::TIMEOUT: 00262 using Core::StringUtil::toString; 00263 WARNING("Could not definitively close client session %lu " 00264 "within timeout (%s). It may remain open until it " 00265 "expires.", 00266 clientId, 00267 toString(sessionCloseTimeout).c_str()); 00268 break; 00269 case LeaderRPC::Status::INVALID_REQUEST: 00270 WARNING("The server and/or replicated state machine " 00271 "doesn't support the CloseSession command or " 00272 "claims the request is malformed. This client's " 00273 "session (%lu) will remain open until it expires. " 00274 "Consider upgrading your servers (this command " 00275 "was introduced in state machine version 2).", 00276 clientId); 00277 break; 00278 } 00279 } 00280 } 00281 if (keepAliveThread.joinable()) 00282 keepAliveThread.join(); 00283 } 00284 00285 Protocol::Client::ExactlyOnceRPCInfo 00286 ClientImpl::ExactlyOnceRPCHelper::getRPCInfo(TimePoint timeout) 00287 { 00288 std::lock_guard<Core::Mutex> lockGuard(mutex); 00289 return getRPCInfo(Core::HoldingMutex(lockGuard), timeout); 00290 } 00291 00292 void 00293 ClientImpl::ExactlyOnceRPCHelper::doneWithRPC( 00294 const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo) 00295 { 00296 std::lock_guard<Core::Mutex> lockGuard(mutex); 00297 doneWithRPC(rpcInfo, Core::HoldingMutex(lockGuard)); 00298 } 00299 00300 Protocol::Client::ExactlyOnceRPCInfo 00301 ClientImpl::ExactlyOnceRPCHelper::getRPCInfo( 00302 Core::HoldingMutex holdingMutex, 00303 TimePoint timeout) 00304 { 00305 Protocol::Client::ExactlyOnceRPCInfo rpcInfo; 00306 if (client == NULL) { 00307 // Filling in rpcInfo is disabled for some unit tests, since it's 00308 // easier if they treat rpcInfo opaquely. 00309 return rpcInfo; 00310 } 00311 if (clientId == 0) { 00312 lastKeepAliveStart = Clock::now(); 00313 Protocol::Client::StateMachineCommand::Request request; 00314 Protocol::Client::StateMachineCommand::Response response; 00315 request.mutable_open_session(); 00316 LeaderRPC::Status status = 00317 client->leaderRPC->call(OpCode::STATE_MACHINE_COMMAND, 00318 request, 00319 response, 00320 timeout); 00321 switch (status) { 00322 case LeaderRPC::Status::OK: 00323 break; 00324 case LeaderRPC::Status::TIMEOUT: 00325 rpcInfo.set_client_id(0); 00326 return rpcInfo; 00327 case LeaderRPC::Status::INVALID_REQUEST: 00328 PANIC("The server and/or replicated state machine doesn't " 00329 "support the OpenSession command or claims the request " 00330 "is malformed"); 00331 } 00332 clientId = response.open_session().client_id(); 00333 assert(clientId > 0); 00334 keepAliveThread = std::thread( 00335 &ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain, 00336 this); 00337 } 00338 00339 lastKeepAliveStart = Clock::now(); 00340 keepAliveCV.notify_all(); 00341 rpcInfo.set_client_id(clientId); 00342 uint64_t rpcNumber = nextRPCNumber; 00343 ++nextRPCNumber; 00344 rpcInfo.set_rpc_number(rpcNumber); 00345 outstandingRPCNumbers.insert(rpcNumber); 00346 rpcInfo.set_first_outstanding_rpc(*outstandingRPCNumbers.begin()); 00347 return rpcInfo; 00348 } 00349 00350 void 00351 ClientImpl::ExactlyOnceRPCHelper::doneWithRPC( 00352 const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo, 00353 Core::HoldingMutex holdingMutex) 00354 { 00355 outstandingRPCNumbers.erase(rpcInfo.rpc_number()); 00356 } 00357 00358 void 00359 ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain() 00360 { 00361 std::unique_lock<Core::Mutex> lockGuard(mutex); 00362 while (!exiting) { 00363 TimePoint nextKeepAlive; 00364 if (keepAliveInterval.count() > 0) { 00365 nextKeepAlive = lastKeepAliveStart + keepAliveInterval; 00366 } else { 00367 nextKeepAlive = TimePoint::max(); 00368 } 00369 if (Clock::now() > nextKeepAlive) { 00370 Protocol::Client::StateMachineCommand::Request request; 00371 Protocol::Client::ReadWriteTree::Request& trequest = 00372 *request.mutable_tree(); 00373 *trequest.mutable_exactly_once() = getRPCInfo( 00374 Core::HoldingMutex(lockGuard), 00375 TimePoint::max()); 00376 setCondition(trequest, 00377 {"keepalive", 00378 "this is just a no-op to keep the client's session active; " 00379 "the condition is expected to fail"}); 00380 trequest.mutable_write()->set_path("keepalive"); 00381 trequest.mutable_write()->set_contents("you shouldn't see this!"); 00382 Protocol::Client::StateMachineCommand::Response response; 00383 keepAliveCall = client->leaderRPC->makeCall(); 00384 keepAliveCall->start(OpCode::STATE_MACHINE_COMMAND, request, 00385 TimePoint::max()); 00386 LeaderRPCBase::Call::Status callStatus; 00387 { 00388 // release lock to allow concurrent cancellation 00389 Core::MutexUnlock<Core::Mutex> unlockGuard(lockGuard); 00390 callStatus = keepAliveCall->wait(response, TimePoint::max()); 00391 } 00392 keepAliveCall.reset(); 00393 switch (callStatus) { 00394 case LeaderRPCBase::Call::Status::OK: 00395 break; 00396 case LeaderRPCBase::Call::Status::RETRY: 00397 doneWithRPC(trequest.exactly_once(), 00398 Core::HoldingMutex(lockGuard)); 00399 continue; // retry outer loop 00400 case LeaderRPCBase::Call::Status::TIMEOUT: 00401 PANIC("Unexpected timeout for keep-alive"); 00402 case LeaderRPCBase::Call::Status::INVALID_REQUEST: 00403 PANIC("The server rejected our keep-alive request (Tree " 00404 "write with unmet condition) as invalid"); 00405 } 00406 doneWithRPC(trequest.exactly_once(), 00407 Core::HoldingMutex(lockGuard)); 00408 const Protocol::Client::ReadWriteTree::Response& tresponse = 00409 response.tree(); 00410 if (tresponse.status() != 00411 Protocol::Client::Status::CONDITION_NOT_MET) { 00412 WARNING("Keep-alive write should have failed its condition. " 00413 "Unexpected status was %d: %s", 00414 tresponse.status(), 00415 tresponse.error().c_str()); 00416 } 00417 continue; 00418 } 00419 keepAliveCV.wait_until(lockGuard, nextKeepAlive); 00420 } 00421 } 00422 00423 ////////// class ClientImpl ////////// 00424 00425 ClientImpl::TimePoint 00426 ClientImpl::absTimeout(uint64_t relTimeoutNanos) 00427 { 00428 if (relTimeoutNanos == 0) 00429 return ClientImpl::TimePoint::max(); 00430 ClientImpl::TimePoint now = ClientImpl::Clock::now(); 00431 ClientImpl::TimePoint then = 00432 now + std::chrono::nanoseconds(relTimeoutNanos); 00433 if (then < now) // overflow 00434 return ClientImpl::TimePoint::max(); 00435 else 00436 return then; 00437 } 00438 00439 ClientImpl::ClientImpl(const std::map<std::string, std::string>& options) 00440 : config(options) 00441 , eventLoop() 00442 , clusterUUID() 00443 , sessionManager(eventLoop, config) 00444 , sessionCreationBackoff(5, // 5 new connections per 00445 100UL * 1000 * 1000) // 100 ms 00446 , hosts() 00447 , leaderRPC() // set in init() 00448 , exactlyOnceRPCHelper(this) 00449 , eventLoopThread() 00450 { 00451 NOTICE("Configuration settings:\n" 00452 "# begin config\n" 00453 "%s" 00454 "# end config", 00455 Core::StringUtil::toString(config).c_str()); 00456 std::string uuid = config.read("clusterUUID", std::string("")); 00457 if (!uuid.empty()) 00458 clusterUUID.set(uuid); 00459 } 00460 00461 ClientImpl::~ClientImpl() 00462 { 00463 exactlyOnceRPCHelper.exit(); 00464 eventLoop.exit(); 00465 if (eventLoopThread.joinable()) 00466 eventLoopThread.join(); 00467 } 00468 00469 void 00470 ClientImpl::init(const std::string& hosts) 00471 { 00472 this->hosts = hosts; 00473 eventLoopThread = std::thread(&Event::Loop::runForever, &eventLoop); 00474 initDerived(); 00475 } 00476 00477 void 00478 ClientImpl::initDerived() 00479 { 00480 if (!leaderRPC) { // sometimes set in unit tests 00481 NOTICE("Using server list: %s", hosts.c_str()); 00482 leaderRPC.reset(new LeaderRPC( 00483 RPC::Address(hosts, Protocol::Common::DEFAULT_PORT), 00484 clusterUUID, 00485 sessionCreationBackoff, 00486 sessionManager)); 00487 } 00488 } 00489 00490 GetConfigurationResult 00491 ClientImpl::getConfiguration(TimePoint timeout) 00492 { 00493 Protocol::Client::GetConfiguration::Request request; 00494 Protocol::Client::GetConfiguration::Response response; 00495 typedef LeaderRPCBase::Status RPCStatus; 00496 RPCStatus status = leaderRPC->call( 00497 OpCode::GET_CONFIGURATION, request, response, timeout); 00498 Configuration configuration; 00499 for (auto it = response.servers().begin(); 00500 it != response.servers().end(); 00501 ++it) { 00502 configuration.push_back({it->server_id(), it->addresses()}); 00503 } 00504 GetConfigurationResult result; 00505 if (status == RPCStatus::TIMEOUT) { 00506 result.status = GetConfigurationResult::Status::TIMEOUT; 00507 result.error = "Client-specified timeout elapsed"; 00508 return result; 00509 } else { 00510 GetConfigurationResult configurationResult; 00511 result.status = GetConfigurationResult::Status::OK; 00512 result.configuration = response.id(); 00513 result.servers = configuration; 00514 return result; 00515 } 00516 } 00517 00518 ConfigurationResult 00519 ClientImpl::setConfiguration(uint64_t oldId, 00520 const Configuration& newConfiguration, 00521 TimePoint timeout) 00522 { 00523 Protocol::Client::SetConfiguration::Request request; 00524 request.set_old_id(oldId); 00525 for (auto it = newConfiguration.begin(); 00526 it != newConfiguration.end(); 00527 ++it) { 00528 Protocol::Client::Server* s = request.add_new_servers(); 00529 s->set_server_id(it->serverId); 00530 s->set_addresses(it->addresses); 00531 } 00532 Protocol::Client::SetConfiguration::Response response; 00533 typedef LeaderRPCBase::Status RPCStatus; 00534 RPCStatus status = leaderRPC->call( 00535 OpCode::SET_CONFIGURATION, request, response, timeout); 00536 ConfigurationResult result; 00537 if (status == RPCStatus::TIMEOUT) { 00538 result.status = ConfigurationResult::Status::TIMEOUT; 00539 result.error = "Client-specified timeout elapsed"; 00540 return result; 00541 } 00542 if (response.has_ok()) { 00543 result.status = ConfigurationResult::OK; 00544 return result; 00545 } 00546 if (response.has_configuration_changed()) { 00547 result.status = ConfigurationResult::CHANGED; 00548 result.error = ("configuration changed: " + 00549 response.configuration_changed().error()); 00550 return result; 00551 } 00552 if (response.has_configuration_bad()) { 00553 result.status = ConfigurationResult::BAD; 00554 for (auto it = response.configuration_bad().bad_servers().begin(); 00555 it != response.configuration_bad().bad_servers().end(); 00556 ++it) { 00557 result.badServers.emplace_back(it->server_id(), it->addresses()); 00558 } 00559 result.error = "servers slow or unavailable"; 00560 return result; 00561 } 00562 PANIC("Did not understand server response to setConfiguration RPC:\n%s", 00563 Core::ProtoBuf::dumpString(response).c_str()); 00564 } 00565 00566 Result 00567 ClientImpl::getServerInfo(const std::string& host, 00568 TimePoint timeout, 00569 Server& info) 00570 { 00571 Result timeoutResult; 00572 timeoutResult.status = Client::Status::TIMEOUT; 00573 timeoutResult.error = "Client-specified timeout elapsed"; 00574 00575 while (true) { 00576 sessionCreationBackoff.delayAndBegin(timeout); 00577 00578 RPC::Address address(host, Protocol::Common::DEFAULT_PORT); 00579 address.refresh(timeout); 00580 00581 std::shared_ptr<RPC::ClientSession> session = 00582 sessionManager.createSession(address, timeout, &clusterUUID); 00583 00584 Protocol::Client::GetServerInfo::Request request; 00585 RPC::ClientRPC rpc(session, 00586 Protocol::Common::ServiceId::CLIENT_SERVICE, 00587 1, 00588 OpCode::GET_SERVER_INFO, 00589 request); 00590 00591 typedef RPC::ClientRPC::Status RPCStatus; 00592 Protocol::Client::GetServerInfo::Response response; 00593 Protocol::Client::Error error; 00594 RPCStatus status = rpc.waitForReply(&response, &error, timeout); 00595 00596 // Decode the response 00597 switch (status) { 00598 case RPCStatus::OK: 00599 info.serverId = response.server_info().server_id(); 00600 info.addresses = response.server_info().addresses(); 00601 return Result(); 00602 case RPCStatus::RPC_FAILED: 00603 break; 00604 case RPCStatus::TIMEOUT: 00605 return timeoutResult; 00606 case RPCStatus::SERVICE_SPECIFIC_ERROR: 00607 // Hmm, we don't know what this server is trying to tell us, 00608 // but something is wrong. The server shouldn't reply back with 00609 // error codes we don't understand. That's why we gave it a 00610 // serverSpecificErrorVersion number in the request header. 00611 PANIC("Unknown error code %u returned in service-specific " 00612 "error. This probably indicates a bug in the server", 00613 error.error_code()); 00614 break; 00615 case RPCStatus::RPC_CANCELED: 00616 PANIC("RPC canceled unexpectedly"); 00617 case RPCStatus::INVALID_SERVICE: 00618 PANIC("The server isn't running the ClientService"); 00619 case RPCStatus::INVALID_REQUEST: 00620 PANIC("The server's ClientService doesn't support the " 00621 "GetServerInfo RPC or claims the request is malformed"); 00622 } 00623 if (timeout < Clock::now()) 00624 return timeoutResult; 00625 else 00626 continue; 00627 } 00628 } 00629 00630 Result 00631 ClientImpl::canonicalize(const std::string& path, 00632 const std::string& workingDirectory, 00633 std::string& canonical) 00634 { 00635 canonical = ""; 00636 std::vector<std::string> components; 00637 if (!path.empty() && *path.begin() != '/') { 00638 if (workingDirectory.empty() || *workingDirectory.begin() != '/') { 00639 Result result; 00640 result.status = Status::INVALID_ARGUMENT; 00641 result.error = Core::StringUtil::format( 00642 "Can't use relative path '%s' from working directory " 00643 "'%s' (working directory should be an absolute path)", 00644 path.c_str(), 00645 workingDirectory.c_str()); 00646 return result; 00647 00648 } 00649 split(workingDirectory, components); 00650 } 00651 split(path, components); 00652 // Iron out any ".", ".." 00653 size_t i = 0; 00654 while (i < components.size()) { 00655 if (components.at(i) == "..") { 00656 if (i > 0) { 00657 // erase previous and ".." components 00658 components.erase(components.begin() + ssize_t(i) - 1, 00659 components.begin() + ssize_t(i) + 1); 00660 --i; 00661 } else { 00662 Result result; 00663 result.status = Status::INVALID_ARGUMENT; 00664 result.error = Core::StringUtil::format( 00665 "Path '%s' from working directory '%s' attempts " 00666 "to look up directory above root ('/')", 00667 path.c_str(), 00668 workingDirectory.c_str()); 00669 return result; 00670 } 00671 } else if (components.at(i) == ".") { 00672 components.erase(components.begin() + ssize_t(i)); 00673 } else { 00674 ++i; 00675 } 00676 } 00677 if (components.empty()) { 00678 canonical = "/"; 00679 } else { 00680 for (auto it = components.begin(); it != components.end(); ++it) 00681 canonical += "/" + *it; 00682 } 00683 return Result(); 00684 } 00685 00686 Result 00687 ClientImpl::makeDirectory(const std::string& path, 00688 const std::string& workingDirectory, 00689 const Condition& condition, 00690 TimePoint timeout) 00691 { 00692 std::string realPath; 00693 Result result = canonicalize(path, workingDirectory, realPath); 00694 if (result.status != Status::OK) 00695 return result; 00696 Protocol::Client::ReadWriteTree::Request request; 00697 *request.mutable_exactly_once() = 00698 exactlyOnceRPCHelper.getRPCInfo(timeout); 00699 setCondition(request, condition); 00700 request.mutable_make_directory()->set_path(realPath); 00701 Protocol::Client::ReadWriteTree::Response response; 00702 treeCall(*leaderRPC, 00703 request, response, timeout); 00704 exactlyOnceRPCHelper.doneWithRPC(request.exactly_once()); 00705 if (response.status() != Protocol::Client::Status::OK) 00706 return treeError(response); 00707 return Result(); 00708 } 00709 00710 Result 00711 ClientImpl::listDirectory(const std::string& path, 00712 const std::string& workingDirectory, 00713 const Condition& condition, 00714 TimePoint timeout, 00715 std::vector<std::string>& children) 00716 { 00717 children.clear(); 00718 std::string realPath; 00719 Result result = canonicalize(path, workingDirectory, realPath); 00720 if (result.status != Status::OK) 00721 return result; 00722 Protocol::Client::ReadOnlyTree::Request request; 00723 setCondition(request, condition); 00724 request.mutable_list_directory()->set_path(realPath); 00725 Protocol::Client::ReadOnlyTree::Response response; 00726 treeCall(*leaderRPC, 00727 request, response, timeout); 00728 if (response.status() != Protocol::Client::Status::OK) 00729 return treeError(response); 00730 children = std::vector<std::string>( 00731 response.list_directory().child().begin(), 00732 response.list_directory().child().end()); 00733 return Result(); 00734 } 00735 00736 Result 00737 ClientImpl::removeDirectory(const std::string& path, 00738 const std::string& workingDirectory, 00739 const Condition& condition, 00740 TimePoint timeout) 00741 { 00742 std::string realPath; 00743 Result result = canonicalize(path, workingDirectory, realPath); 00744 if (result.status != Status::OK) 00745 return result; 00746 Protocol::Client::ReadWriteTree::Request request; 00747 *request.mutable_exactly_once() = 00748 exactlyOnceRPCHelper.getRPCInfo(timeout); 00749 setCondition(request, condition); 00750 request.mutable_remove_directory()->set_path(realPath); 00751 Protocol::Client::ReadWriteTree::Response response; 00752 treeCall(*leaderRPC, 00753 request, response, timeout); 00754 exactlyOnceRPCHelper.doneWithRPC(request.exactly_once()); 00755 if (response.status() != Protocol::Client::Status::OK) 00756 return treeError(response); 00757 return Result(); 00758 } 00759 00760 Result 00761 ClientImpl::write(const std::string& path, 00762 const std::string& workingDirectory, 00763 const std::string& contents, 00764 const Condition& condition, 00765 TimePoint timeout) 00766 { 00767 std::string realPath; 00768 Result result = canonicalize(path, workingDirectory, realPath); 00769 if (result.status != Status::OK) 00770 return result; 00771 Protocol::Client::ReadWriteTree::Request request; 00772 *request.mutable_exactly_once() = 00773 exactlyOnceRPCHelper.getRPCInfo(timeout); 00774 setCondition(request, condition); 00775 request.mutable_write()->set_path(realPath); 00776 request.mutable_write()->set_contents(contents); 00777 Protocol::Client::ReadWriteTree::Response response; 00778 treeCall(*leaderRPC, 00779 request, response, timeout); 00780 exactlyOnceRPCHelper.doneWithRPC(request.exactly_once()); 00781 if (response.status() != Protocol::Client::Status::OK) 00782 return treeError(response); 00783 return Result(); 00784 } 00785 00786 Result 00787 ClientImpl::read(const std::string& path, 00788 const std::string& workingDirectory, 00789 const Condition& condition, 00790 TimePoint timeout, 00791 std::string& contents) 00792 { 00793 contents = ""; 00794 std::string realPath; 00795 Result result = canonicalize(path, workingDirectory, realPath); 00796 if (result.status != Status::OK) 00797 return result; 00798 Protocol::Client::ReadOnlyTree::Request request; 00799 setCondition(request, condition); 00800 request.mutable_read()->set_path(realPath); 00801 Protocol::Client::ReadOnlyTree::Response response; 00802 treeCall(*leaderRPC, 00803 request, response, timeout); 00804 if (response.status() != Protocol::Client::Status::OK) 00805 return treeError(response); 00806 contents = response.read().contents(); 00807 return Result(); 00808 } 00809 00810 Result 00811 ClientImpl::removeFile(const std::string& path, 00812 const std::string& workingDirectory, 00813 const Condition& condition, 00814 TimePoint timeout) 00815 { 00816 std::string realPath; 00817 Result result = canonicalize(path, workingDirectory, realPath); 00818 if (result.status != Status::OK) 00819 return result; 00820 Protocol::Client::ReadWriteTree::Request request; 00821 *request.mutable_exactly_once() = 00822 exactlyOnceRPCHelper.getRPCInfo(timeout); 00823 setCondition(request, condition); 00824 request.mutable_remove_file()->set_path(realPath); 00825 Protocol::Client::ReadWriteTree::Response response; 00826 treeCall(*leaderRPC, 00827 request, response, timeout); 00828 exactlyOnceRPCHelper.doneWithRPC(request.exactly_once()); 00829 if (response.status() != Protocol::Client::Status::OK) 00830 return treeError(response); 00831 return Result(); 00832 } 00833 00834 Result 00835 ClientImpl::serverControl(const std::string& host, 00836 TimePoint timeout, 00837 Protocol::ServerControl::OpCode opCode, 00838 const google::protobuf::Message& request, 00839 google::protobuf::Message& response) 00840 { 00841 Result timeoutResult; 00842 timeoutResult.status = Client::Status::TIMEOUT; 00843 timeoutResult.error = "Client-specified timeout elapsed"; 00844 00845 while (true) { 00846 sessionCreationBackoff.delayAndBegin(timeout); 00847 00848 RPC::Address address(host, Protocol::Common::DEFAULT_PORT); 00849 address.refresh(timeout); 00850 00851 // TODO(ongaro): Ideally we'd learn the serverID the same way we learn 00852 // the cluster UUID and then assert that in future calls. In practice, 00853 // we're only making one call for now, so it doesn't matter. 00854 std::shared_ptr<RPC::ClientSession> session = 00855 sessionManager.createSession(address, timeout, &clusterUUID); 00856 00857 RPC::ClientRPC rpc(session, 00858 Protocol::Common::ServiceId::CONTROL_SERVICE, 00859 1, 00860 opCode, 00861 request); 00862 00863 typedef RPC::ClientRPC::Status RPCStatus; 00864 Protocol::Client::Error error; 00865 RPCStatus status = rpc.waitForReply(&response, &error, timeout); 00866 00867 // Decode the response 00868 switch (status) { 00869 case RPCStatus::OK: 00870 return Result(); 00871 case RPCStatus::RPC_FAILED: 00872 break; 00873 case RPCStatus::TIMEOUT: 00874 return timeoutResult; 00875 case RPCStatus::SERVICE_SPECIFIC_ERROR: 00876 // Hmm, we don't know what this server is trying to tell us, 00877 // but something is wrong. The server shouldn't reply back with 00878 // error codes we don't understand. That's why we gave it a 00879 // serverSpecificErrorVersion number in the request header. 00880 PANIC("Unknown error code %u returned in service-specific " 00881 "error. This probably indicates a bug in the server", 00882 error.error_code()); 00883 break; 00884 case RPCStatus::RPC_CANCELED: 00885 PANIC("RPC canceled unexpectedly"); 00886 case RPCStatus::INVALID_SERVICE: 00887 PANIC("The server isn't running the ControlService"); 00888 case RPCStatus::INVALID_REQUEST: 00889 // ControlService was added in v1.1.0. 00890 EXIT("The server's ControlService doesn't support the " 00891 "RPC or claims the request is malformed"); 00892 } 00893 if (timeout < Clock::now()) 00894 return timeoutResult; 00895 else 00896 continue; 00897 } 00898 } 00899 00900 00901 } // namespace LogCabin::Client 00902 } // namespace LogCabin