LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 /** 00017 * \file 00018 * This is a basic latency/bandwidth benchmark of LogCabin. 00019 */ 00020 00021 // std::atomic header file renamed in gcc 4.5. 00022 // Clang uses <atomic> but has defines like gcc 4.2. 00023 #if __GNUC__ == 4 && __GNUC_MINOR__ < 5 && !__clang__ 00024 #include <cstdatomic> 00025 #else 00026 #include <atomic> 00027 #endif 00028 #include <cassert> 00029 #include <ctime> 00030 #include <getopt.h> 00031 #include <iostream> 00032 #include <thread> 00033 #include <unistd.h> 00034 00035 #include <LogCabin/Client.h> 00036 #include <LogCabin/Debug.h> 00037 #include <LogCabin/Util.h> 00038 00039 namespace { 00040 00041 using LogCabin::Client::Cluster; 00042 using LogCabin::Client::Result; 00043 using LogCabin::Client::Status; 00044 using LogCabin::Client::Tree; 00045 using LogCabin::Client::Util::parseNonNegativeDuration; 00046 00047 /** 00048 * Parses argv for the main function. 00049 */ 00050 class OptionParser { 00051 public: 00052 OptionParser(int& argc, char**& argv) 00053 : argc(argc) 00054 , argv(argv) 00055 , cluster("logcabin:5254") 00056 , logPolicy("") 00057 , size(1024) 00058 , writers(1) 00059 , totalWrites(1000) 00060 , timeout(parseNonNegativeDuration("30s")) 00061 { 00062 while (true) { 00063 static struct option longOptions[] = { 00064 {"cluster", required_argument, NULL, 'c'}, 00065 {"help", no_argument, NULL, 'h'}, 00066 {"size", required_argument, NULL, 's'}, 00067 {"threads", required_argument, NULL, 't'}, 00068 {"timeout", required_argument, NULL, 'd'}, 00069 {"writes", required_argument, NULL, 'w'}, 00070 {"verbose", no_argument, NULL, 'v'}, 00071 {"verbosity", required_argument, NULL, 256}, 00072 {0, 0, 0, 0} 00073 }; 00074 int c = getopt_long(argc, argv, "c:hs:t:w:v", longOptions, NULL); 00075 00076 // Detect the end of the options. 00077 if (c == -1) 00078 break; 00079 00080 switch (c) { 00081 case 'c': 00082 cluster = optarg; 00083 break; 00084 case 'd': 00085 timeout = parseNonNegativeDuration(optarg); 00086 break; 00087 case 'h': 00088 usage(); 00089 exit(0); 00090 case 's': 00091 size = uint64_t(atol(optarg)); 00092 break; 00093 case 't': 00094 writers = uint64_t(atol(optarg)); 00095 break; 00096 case 'w': 00097 totalWrites = uint64_t(atol(optarg)); 00098 break; 00099 case 'v': 00100 logPolicy = "VERBOSE"; 00101 break; 00102 case 256: 00103 logPolicy = optarg; 00104 break; 00105 case '?': 00106 default: 00107 // getopt_long already printed an error message. 00108 usage(); 00109 exit(1); 00110 } 00111 } 00112 } 00113 00114 void usage() { 00115 std::cout 00116 << "Writes repeatedly to LogCabin. Stops once it reaches " 00117 << "the given number of" 00118 << std::endl 00119 << "writes or the timeout, whichever comes first." 00120 << std::endl 00121 << std::endl 00122 << "This program is subject to change (it is not part of " 00123 << "LogCabin's stable API)." 00124 << std::endl 00125 << std::endl 00126 00127 << "Usage: " << argv[0] << " [options]" 00128 << std::endl 00129 << std::endl 00130 00131 << "Options:" 00132 << std::endl 00133 00134 << " -c <addresses>, --cluster=<addresses> " 00135 << "Network addresses of the LogCabin" 00136 << std::endl 00137 << " " 00138 << "servers, comma-separated" 00139 << std::endl 00140 << " " 00141 << "[default: logcabin:5254]" 00142 << std::endl 00143 00144 << " -h, --help " 00145 << "Print this usage information" 00146 << std::endl 00147 00148 << " --size <bytes> " 00149 << "Size of value in each write [default: 1024]" 00150 << std::endl 00151 00152 << " --threads <num> " 00153 << "Number of concurrent writers [default: 1]" 00154 << std::endl 00155 00156 << " --timeout <time> " 00157 << "Time after which to exit [default: 30s]" 00158 << std::endl 00159 00160 << " --writes <num> " 00161 << "Number of total writes [default: 1000]" 00162 << std::endl 00163 00164 << " -v, --verbose " 00165 << "Same as --verbosity=VERBOSE" 00166 << std::endl 00167 00168 << " --verbosity=<policy> " 00169 << "Set which log messages are shown." 00170 << std::endl 00171 << " " 00172 << "Comma-separated LEVEL or PATTERN@LEVEL rules." 00173 << std::endl 00174 << " " 00175 << "Levels: SILENT, ERROR, WARNING, NOTICE, VERBOSE." 00176 << std::endl 00177 << " " 00178 << "Patterns match filename prefixes or suffixes." 00179 << std::endl 00180 << " " 00181 << "Example: Client@NOTICE,Test.cc@SILENT,VERBOSE." 00182 << std::endl; 00183 } 00184 00185 int& argc; 00186 char**& argv; 00187 std::string cluster; 00188 std::string logPolicy; 00189 uint64_t size; 00190 uint64_t writers; 00191 uint64_t totalWrites; 00192 uint64_t timeout; 00193 }; 00194 00195 /** 00196 * The main function for a single client thread. 00197 * \param id 00198 * Unique ID for this thread, counting from 0. 00199 * \param options 00200 * Arguments describing benchmark. 00201 * \param tree 00202 * Interface to LogCabin. 00203 * \param key 00204 * Key to write repeatedly. 00205 * \param value 00206 * Value to write at key repeatedly. 00207 * \param exit 00208 * When this becomes true, this thread should exit. 00209 * \param[out] writesDone 00210 * The number of writes this thread has completed. 00211 */ 00212 void 00213 writeThreadMain(uint64_t id, 00214 const OptionParser& options, 00215 Tree tree, 00216 const std::string& key, 00217 const std::string& value, 00218 std::atomic<bool>& exit, 00219 uint64_t& writesDone) 00220 { 00221 uint64_t numWrites = options.totalWrites / options.writers; 00222 // assign any odd leftover writes in a balanced way 00223 if (options.totalWrites - numWrites * options.writers > id) 00224 numWrites += 1; 00225 for (uint64_t i = 0; i < numWrites; ++i) { 00226 if (exit) 00227 break; 00228 tree.writeEx(key, value); 00229 writesDone = i + 1; 00230 } 00231 } 00232 00233 /** 00234 * Return the time since the Unix epoch in nanoseconds. 00235 */ 00236 uint64_t timeNanos() 00237 { 00238 struct timespec now; 00239 int r = clock_gettime(CLOCK_REALTIME, &now); 00240 assert(r == 0); 00241 return uint64_t(now.tv_sec) * 1000 * 1000 * 1000 + uint64_t(now.tv_nsec); 00242 } 00243 00244 /** 00245 * Main function for the timer thread, whose job is to wait until a particular 00246 * timeout elapses and then set 'exit' to true. 00247 * \param timeout 00248 * Seconds to wait before setting exit to true. 00249 * \param[in,out] exit 00250 * If this is set to true from another thread, the timer thread will exit 00251 * soonish. Also, if the timeout elapses, the timer thread will set this 00252 * to true and exit. 00253 */ 00254 void 00255 timerThreadMain(uint64_t timeout, std::atomic<bool>& exit) 00256 { 00257 uint64_t start = timeNanos(); 00258 while (!exit) { 00259 usleep(50 * 1000); 00260 if ((timeNanos() - start) > timeout) { 00261 exit = true; 00262 } 00263 } 00264 } 00265 00266 } // anonymous namespace 00267 00268 int 00269 main(int argc, char** argv) 00270 { 00271 try { 00272 00273 OptionParser options(argc, argv); 00274 LogCabin::Client::Debug::setLogPolicy( 00275 LogCabin::Client::Debug::logPolicyFromString( 00276 options.logPolicy)); 00277 Cluster cluster = Cluster(options.cluster); 00278 Tree tree = cluster.getTree(); 00279 00280 std::string key("/bench"); 00281 std::string value(options.size, 'v'); 00282 00283 uint64_t startNanos = timeNanos(); 00284 std::atomic<bool> exit(false); 00285 std::vector<uint64_t> writesDonePerThread(options.writers); 00286 uint64_t totalWritesDone = 0; 00287 std::vector<std::thread> threads; 00288 std::thread timer(timerThreadMain, options.timeout, std::ref(exit)); 00289 for (uint64_t i = 0; i < options.writers; ++i) { 00290 threads.emplace_back(writeThreadMain, i, std::ref(options), 00291 tree, std::ref(key), std::ref(value), 00292 std::ref(exit), 00293 std::ref(writesDonePerThread.at(i))); 00294 } 00295 for (uint64_t i = 0; i < options.writers; ++i) { 00296 threads.at(i).join(); 00297 totalWritesDone += writesDonePerThread.at(i); 00298 } 00299 uint64_t endNanos = timeNanos(); 00300 exit = true; 00301 timer.join(); 00302 00303 tree.removeFile(key); 00304 std::cout << "Benchmark took " 00305 << static_cast<double>(endNanos - startNanos) / 1e6 00306 << " ms to write " 00307 << totalWritesDone 00308 << " objects" 00309 << std::endl; 00310 return 0; 00311 00312 } catch (const LogCabin::Client::Exception& e) { 00313 std::cerr << "Exiting due to LogCabin::Client::Exception: " 00314 << e.what() 00315 << std::endl; 00316 exit(1); 00317 } 00318 }