LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <deque> 00018 #include <thread> 00019 #include <vector> 00020 00021 #include "build/Storage/SegmentedLog.pb.h" 00022 #include "Core/Buffer.h" 00023 #include "Core/ConditionVariable.h" 00024 #include "Core/Mutex.h" 00025 #include "Core/RollingStat.h" 00026 #include "Storage/FilesystemUtil.h" 00027 #include "Storage/Log.h" 00028 00029 #ifndef LOGCABIN_STORAGE_SEGMENTEDLOG_H 00030 #define LOGCABIN_STORAGE_SEGMENTEDLOG_H 00031 00032 namespace LogCabin { 00033 00034 // forward declaration 00035 namespace Core { 00036 class Config; 00037 } 00038 00039 namespace Storage { 00040 00041 /** 00042 * This class persists a log on the filesystem efficiently. 00043 * 00044 * The log entries on disk are stored in a series of files called segments, and 00045 * each segment is about 8MB in size. Thus, most small appends do not need to 00046 * update filesystem metadata and can proceed with a single consecutive disk 00047 * write. 00048 * 00049 * The disk files consist of metadata files, closed segments, and open 00050 * segments. Metadata files are used to track Raft metadata, such as the 00051 * server's current term, and also the log's start index. Segments contain 00052 * contiguous entries that are part of the log. Closed segments are never 00053 * written to again (but may be renamed and truncated if a suffix of the log is 00054 * truncated). Open segments are where newly appended entries go. Once an open 00055 * segment reaches MAX_SEGMENT_SIZE, it is closed and a new one is used. 00056 * 00057 * Metadata files are named "metadata1" and "metadata2". The code alternates 00058 * between these so that there is always at least one readable metadata file. 00059 * On boot, the readable metadata file with the higher version number is used. 00060 * 00061 * Closed segments are named by the format string "%020lu-%020lu" with their 00062 * start and end indexes, both inclusive. Closed segments always contain at 00063 * least one entry; the end index is always at least as large as the start 00064 * index. Closed segment files may occasionally include data past their 00065 * filename's end index (these are ignored but a WARNING is issued). This can 00066 * happen if the suffix of the segment is truncated and a crash occurs at an 00067 * inopportune time (the segment file is first renamed, then truncated, and a 00068 * crash occurs in between). 00069 * 00070 * Open segments are named by the format string "open-%lu" with a unique 00071 * number. These should not exist when the server shuts down cleanly, but they 00072 * exist while the server is running and may be left around during a crash. 00073 * Open segments either contain entries which come after the last closed 00074 * segment or are full of zeros. When the server crashes while appending to an 00075 * open segment, the end of that file may be corrupt. We can't distinguish 00076 * between a corrupt file and a partially written entry. The code assumes it's 00077 * a partially written entry, issues a WARNING, and ignores it. 00078 * 00079 * Truncating a suffix of the log will remove all entries that are no longer 00080 * part of the log. Truncating a prefix of the log will only remove complete 00081 * segments that are before the new log start index. For example, if a 00082 * segment has entries 10 through 20 and the prefix of the log is truncated to 00083 * start at entry 15, that entire segment will be retained. 00084 * 00085 * Each segment file starts with a segment header, which currently contains 00086 * just a one-byte version number for the format of that segment. The current 00087 * format (version 1) is just a concatenation of serialized entry records. 00088 */ 00089 class SegmentedLog : public Log { 00090 /** 00091 * Clock used for measuring disk performance. 00092 */ 00093 typedef Core::Time::SteadyClock Clock; 00094 /** 00095 * Time point for measuring disk performance. 00096 */ 00097 typedef Clock::time_point TimePoint; 00098 00099 public: 00100 00101 /** 00102 * Specifies how individual records are serialized. 00103 */ 00104 enum class Encoding { 00105 /// ProtoBuf human-readable text format. 00106 TEXT, 00107 /// ProtoBuf binary format. 00108 BINARY, 00109 }; 00110 00111 /** 00112 * Constructor. 00113 * \param parentDir 00114 * A filesystem directory in which all the files for this storage 00115 * module are kept. 00116 * \param encoding 00117 * Specifies how individual records are stored. 00118 * \param config 00119 * Settings. 00120 */ 00121 SegmentedLog(const FilesystemUtil::File& parentDir, 00122 Encoding encoding, 00123 const Core::Config& config); 00124 ~SegmentedLog(); 00125 00126 // Methods implemented from Log interface 00127 std::pair<uint64_t, uint64_t> 00128 append(const std::vector<const Entry*>& entries); 00129 const Entry& getEntry(uint64_t) const; 00130 uint64_t getLogStartIndex() const; 00131 uint64_t getLastLogIndex() const; 00132 std::string getName() const; 00133 uint64_t getSizeBytes() const; 00134 std::unique_ptr<Log::Sync> takeSync(); 00135 void syncCompleteVirtual(std::unique_ptr<Log::Sync> sync); 00136 void truncatePrefix(uint64_t firstIndex); 00137 void truncateSuffix(uint64_t lastIndex); 00138 void updateMetadata(); 00139 void updateServerStats(Protocol::ServerStats& serverStats) const; 00140 00141 private: 00142 00143 /** 00144 * A producer/consumer monitor for a queue of files to use for open 00145 * segments. These are created asynchronously, hopefully ahead of the log 00146 * appends. 00147 * 00148 * This class is written in a monitor style; each public method acquires 00149 * #mutex. 00150 */ 00151 class PreparedSegments { 00152 public: 00153 /** 00154 * The type of element that is queued in #openSegments. 00155 * The first element of each pair is its filename relative to #dir. The 00156 * second element is its open file descriptor. 00157 */ 00158 typedef std::pair<std::string, FilesystemUtil::File> OpenSegment; 00159 00160 /** 00161 * Constructor. 00162 * \param queueSize 00163 * The maximum number of prepared segments to hold in the queue 00164 * at a time. 00165 */ 00166 explicit PreparedSegments(uint64_t queueSize); 00167 00168 /** 00169 * Destructor. 00170 */ 00171 ~PreparedSegments(); 00172 00173 /** 00174 * Do not block any more waiting threads, and return immediately. 00175 */ 00176 void exit(); 00177 00178 /** 00179 * Ensure that future filenames will be larger than this one. 00180 * This should generally be invoked before any producers are started, 00181 * since once they're started, there's no stopping them. 00182 * \param fileId 00183 * A lower bound on future IDs. 00184 */ 00185 void foundFile(uint64_t fileId); 00186 00187 /** 00188 * Immediately return all currently prepared segments. 00189 */ 00190 std::deque<OpenSegment> releaseAll(); 00191 00192 /** 00193 * Producers call this when they're done creating a new file. This must 00194 * be called once after each call waitForDemand(); otherwise, the 00195 * internal bookkeeping won't work. 00196 * \param segment 00197 * File to queue up for a consumer. 00198 */ 00199 void submitOpenSegment(OpenSegment segment); 00200 00201 /** 00202 * Producers call this first to block until work becomes needed. 00203 * \return 00204 * ID for new filename. 00205 * \throw Core::Util::ThreadInterruptedException 00206 * If exit() has been called. 00207 */ 00208 uint64_t waitForDemand(); 00209 00210 /** 00211 * Consumers call this when they need a prepared segment file. 00212 * \throw Core::Util::ThreadInterruptedException 00213 * If exit() has been called. 00214 */ 00215 OpenSegment waitForOpenSegment(); 00216 00217 /** 00218 * Reduce log message verbosity for unit tests. 00219 */ 00220 bool quietForUnitTests; 00221 00222 private: 00223 /** 00224 * Mutual exclusion for all of the members of this class. 00225 */ 00226 Core::Mutex mutex; 00227 /** 00228 * Notified when #openSegments shrinks in size or when #exiting becomes 00229 * true. 00230 */ 00231 Core::ConditionVariable consumed; 00232 /** 00233 * Notified when #openSegments grows in size or when #exiting becomes 00234 * true. 00235 */ 00236 Core::ConditionVariable produced; 00237 /** 00238 * Set to true when waiters should exit. 00239 */ 00240 bool exiting; 00241 /** 00242 * The number of producers that may be started to fulfill demand. 00243 */ 00244 uint64_t demanded; 00245 /** 00246 * Used to assign filenames to open segments (which is done before the 00247 * indexes they contain is known). This number is the largest of all 00248 * previously known numbers so that it can be incremented then assigned 00249 * to a new file. It's possible that numbers are reused across reboots. 00250 */ 00251 uint64_t filenameCounter; 00252 /** 00253 * The queue where open segments sit before they're consumed. These are 00254 * available for the log to use as future open segments. 00255 */ 00256 std::deque<OpenSegment> openSegments; 00257 }; 00258 00259 /** 00260 * Queues various operations on files, such as writes and fsyncs, to be 00261 * executed later. 00262 */ 00263 class Sync : public Log::Sync { 00264 public: 00265 struct Op { 00266 enum OpCode { 00267 WRITE, 00268 TRUNCATE, 00269 RENAME, 00270 FDATASYNC, 00271 FSYNC, 00272 CLOSE, 00273 UNLINKAT, 00274 NOOP, 00275 }; 00276 Op(int fd, OpCode opCode) 00277 : fd(fd) 00278 , opCode(opCode) 00279 , writeData() 00280 , filename1() 00281 , filename2() 00282 , size(0) 00283 { 00284 } 00285 int fd; 00286 OpCode opCode; 00287 Core::Buffer writeData; 00288 std::string filename1; 00289 std::string filename2; 00290 uint64_t size; 00291 }; 00292 00293 explicit Sync(uint64_t lastIndex, 00294 std::chrono::nanoseconds diskWriteDurationThreshold); 00295 ~Sync(); 00296 /** 00297 * Add how long the filesystem ops took to 'nanos'. This is invoked 00298 * from syncCompleteVirtual so that it is thread-safe with respect to 00299 * the 'nanos' variable. We can't do it in 'wait' directly since that 00300 * can execute concurrently with someone reading 'nanos'. 00301 */ 00302 void updateStats(Core::RollingStat& nanos) const; 00303 /** 00304 * Called at the start of wait to avoid some redundant disk flushes. 00305 */ 00306 void optimize(); 00307 void wait(); 00308 /// If a wait() exceeds this time, log a warning. 00309 const std::chrono::nanoseconds diskWriteDurationThreshold; 00310 /// List of operations to perform during wait(). 00311 std::deque<Op> ops; 00312 /// Time at start of wait() call. 00313 TimePoint waitStart; 00314 /// Time at end of wait() call. 00315 TimePoint waitEnd; 00316 }; 00317 00318 /** 00319 * An open or closed segment. These are stored in #segmentsByStartIndex. 00320 */ 00321 struct Segment { 00322 /** 00323 * Describes a log entry record within a segment. 00324 */ 00325 struct Record { 00326 /** 00327 * Constructor. 00328 */ 00329 explicit Record(uint64_t offset); 00330 00331 /** 00332 * Byte offset in the file where the entry begins. 00333 * This is used when truncating a segment. 00334 */ 00335 uint64_t offset; 00336 00337 /** 00338 * The entry itself. 00339 */ 00340 Log::Entry entry; 00341 }; 00342 00343 /** 00344 * Default constructor. Sets the segment to invalid. 00345 */ 00346 Segment(); 00347 00348 /** 00349 * Return a filename of the right form for a closed segment. 00350 * See also #filename. 00351 */ 00352 std::string makeClosedFilename() const; 00353 00354 /** 00355 * True for the open segment, false for closed segments. 00356 */ 00357 bool isOpen; 00358 /** 00359 * The index of the first entry in the segment. If the segment is open 00360 * and empty, this may not exist yet and will be #logStartIndex. 00361 */ 00362 uint64_t startIndex; 00363 /** 00364 * The index of the last entry in the segment, or #startIndex - 1 if 00365 * the segment is open and empty. 00366 */ 00367 uint64_t endIndex; 00368 /** 00369 * Size in bytes of the valid entries stored in the file plus 00370 * the version number at the start of the file. 00371 */ 00372 uint64_t bytes; 00373 /** 00374 * The name of the file within #dir containing this segment. 00375 */ 00376 std::string filename; 00377 /** 00378 * The entries in this segment, from startIndex to endIndex, inclusive. 00379 */ 00380 std::deque<Record> entries; 00381 00382 }; 00383 00384 /** 00385 * This goes at the start of every segment. 00386 */ 00387 struct SegmentHeader { 00388 /** 00389 * Always set to 1 for now. 00390 */ 00391 uint8_t version; 00392 } __attribute__((packed)); 00393 00394 ////////// initialization helper functions ////////// 00395 00396 /** 00397 * List the files in #dir and create Segment objects for any of them that 00398 * look like segments. This is only used during initialization. These 00399 * segments are passed through #loadClosedSegment() and #loadOpenSegment() 00400 * next. 00401 * Also updates SegmentPreparer::filenameCounter. 00402 * \return 00403 * Partially initialized Segment objects, one per discovered filename. 00404 */ 00405 std::vector<Segment> readSegmentFilenames(); 00406 00407 /** 00408 * Read a metadata file from disk. This is only used during initialization. 00409 * \param filename 00410 * Filename within #dir to attempt to open and read. 00411 * \param[out] metadata 00412 * Where the contents of the file end up. 00413 * \param quiet 00414 * Set to true to avoid warnings when the file can't be read; used in 00415 * unit tests. 00416 * \return 00417 * True if the file was read successfully, false otherwise. 00418 */ 00419 bool readMetadata(const std::string& filename, 00420 SegmentedLogMetadata::Metadata& metadata, 00421 bool quiet) const; 00422 00423 /** 00424 * Read the given closed segment from disk, issuing PANICs and WARNINGs 00425 * appropriately. This is only used during initialization. 00426 * 00427 * Deletes segment if its last index is below logStartIndex. 00428 * 00429 * Reads every entry described in the filename, and PANICs if any of those 00430 * can't be read. 00431 * 00432 * \param[in,out] segment 00433 * Closed segment to read from disk. 00434 * \param logStartIndex 00435 * The index of the first entry in the log, according to the log 00436 * metadata. 00437 * \return 00438 * True if the segment is valid; false if it has been removed entirely 00439 * from disk. 00440 */ 00441 bool loadClosedSegment(Segment& segment, uint64_t logStartIndex); 00442 00443 /** 00444 * Read the given open segment from disk, issuing PANICs and WARNINGs 00445 * appropriately, and closing the segment. This is only used during 00446 * initialization. 00447 * 00448 * Reads up through the end of the file or the last 00449 * entry with a valid checksum. If any valid entries are read, the segment 00450 * is truncated and closed. Otherwise, it is removed. 00451 * 00452 * Deletes segment if its last index is below logStartIndex. 00453 * 00454 * \param[in,out] segment 00455 * Open segment to read from disk. 00456 * \param logStartIndex 00457 * The index of the first entry in the log, according to the log 00458 * metadata. 00459 * \return 00460 * True if the segment is valid; false if it has been removed entirely 00461 * from disk. 00462 */ 00463 bool loadOpenSegment(Segment& segment, uint64_t logStartIndex); 00464 00465 00466 ////////// normal operation helper functions ////////// 00467 00468 /** 00469 * Run through a bunch of assertions of class invariants (for debugging). 00470 * For example, there should always be one open segment. See 00471 * #shouldCheckInvariants, controlled by the config option 'storageDebug', 00472 * and the BUILDTYPE. 00473 */ 00474 void checkInvariants(); 00475 00476 /** 00477 * Close the open segment if one is open. This removes the open segment if 00478 * it is empty, or closes it otherwise. Since it's a class invariant that 00479 * there is always an open segment, the caller should open a new segment 00480 * after calling this (unless it's shutting down). 00481 */ 00482 void closeSegment(); 00483 00484 /** 00485 * Return a reference to the current open segment (the one that new writes 00486 * should go into). Crashes if there is no open segment (but it's an 00487 * invariant of this class to maintain one). 00488 */ 00489 Segment& getOpenSegment(); 00490 const Segment& getOpenSegment() const; 00491 00492 /** 00493 * Set up a new open segment for the log head. 00494 * This is called when #append() needs more space but also when the end of 00495 * the log is truncated with #truncatePrefix() or #truncateSuffix(). 00496 * \pre 00497 * There is no currently open segment. 00498 */ 00499 void openNewSegment(); 00500 00501 /** 00502 * Read the next ProtoBuf record out of 'file'. 00503 * \param file 00504 * The open file, useful for error messages. 00505 * \param reader 00506 * A reader for 'file'. 00507 * \param[in,out] offset 00508 * The byte offset in the file at which to start reading as input. 00509 * The byte just after the last byte of data as output if successful, 00510 * otherwise unmodified. 00511 * \param[out] out 00512 * An empty ProtoBuf to fill in. 00513 * \return 00514 * Empty string if successful, otherwise error message. 00515 * 00516 * Format: 00517 * 00518 * |checksum|dataLen|data| 00519 * 00520 * The checksum is up to Core::Checksum::MAX_LENGTH bytes and is terminated 00521 * by a null character. It covers both dataLen and data. 00522 * 00523 * dataLen is an unsigned integer (8 bytes, big-endian byte order) that 00524 * specifies the length in bytes of data. 00525 * 00526 * data is a protobuf encoded as binary or text, depending on encoding. 00527 */ 00528 std::string readProtoFromFile(const FilesystemUtil::File& file, 00529 FilesystemUtil::FileContents& reader, 00530 uint64_t* offset, 00531 google::protobuf::Message* out) const; 00532 00533 /** 00534 * Prepare a ProtoBuf record to be written to disk. 00535 * \param in 00536 * ProtoBuf to be serialized. 00537 * \return 00538 * Buffer containing serialized record. 00539 */ 00540 Core::Buffer serializeProto(const google::protobuf::Message& in) const; 00541 00542 ////////// segment preparer thread functions ////////// 00543 00544 /** 00545 * Opens a file for a new segment and allocates its space on disk. 00546 * \param fileId 00547 * ID to use to generate filename; see 00548 * SegmentPreparer::filenameCounter. 00549 * \return 00550 * Filename and writable OS-level file. 00551 */ 00552 std::pair<std::string, FilesystemUtil::File> 00553 prepareNewSegment(uint64_t fileId); 00554 00555 /** 00556 * The main function for the #segmentPreparer thread. 00557 */ 00558 void segmentPreparerMain(); 00559 00560 ////////// member variables ////////// 00561 00562 /** 00563 * Specifies how individual records are stored. 00564 */ 00565 const Encoding encoding; 00566 00567 /** 00568 * The algorithm to use when writing new records. When reading records, any 00569 * available checksum is used. 00570 */ 00571 const std::string checksumAlgorithm; 00572 00573 /** 00574 * The maximum size in bytes for newly written segments. Controlled by the 00575 * 'storageSegmentBytes' config option. 00576 */ 00577 const uint64_t MAX_SEGMENT_SIZE; 00578 00579 /** 00580 * Set to true if checkInvariants() should do its job, or set to false for 00581 * performance. 00582 */ 00583 const bool shouldCheckInvariants; 00584 00585 /** 00586 * If a disk operation exceeds this much time, log a warning. 00587 */ 00588 const std::chrono::milliseconds diskWriteDurationThreshold; 00589 00590 /** 00591 * The metadata this class mintains. This should be combined with the 00592 * superclass's metadata when being written out to disk. 00593 */ 00594 SegmentedLogMetadata::Metadata metadata; 00595 00596 /** 00597 * The directory containing every file this log creates. 00598 */ 00599 FilesystemUtil::File dir; 00600 00601 /** 00602 * A writable OS-level file that contains the entries for the current open 00603 * segment. It is a class invariant that this is always a valid file. 00604 */ 00605 FilesystemUtil::File openSegmentFile; 00606 00607 /** 00608 * The index of the first entry in the log, see getLogStartIndex(). 00609 */ 00610 uint64_t logStartIndex; 00611 00612 /** 00613 * Ordered map of all closed segments and the open segment, indexed by the 00614 * startIndex of each segment. This is used to support all the key 00615 * operations, such as looking up an entry and truncation. 00616 */ 00617 std::map<uint64_t, Segment> segmentsByStartIndex; 00618 00619 /** 00620 * The total number of bytes occupied by the closed segments on disk. 00621 * Used to calculate getSizeBytes() efficiently. 00622 */ 00623 uint64_t totalClosedSegmentBytes; 00624 00625 /** 00626 * See PreparedSegments. 00627 */ 00628 PreparedSegments preparedSegments; 00629 00630 /** 00631 * Accumulates deferred filesystem operations for append() and 00632 * truncatePrefix(). 00633 */ 00634 std::unique_ptr<SegmentedLog::Sync> currentSync; 00635 00636 /** 00637 * Tracks the time it takes to write a metadata file. 00638 */ 00639 Core::RollingStat metadataWriteNanos; 00640 00641 /** 00642 * Tracks the time it takes to execute wait() on a Sync object. 00643 */ 00644 Core::RollingStat filesystemOpsNanos; 00645 00646 /** 00647 * Opens files, allocates the to full size, and places them on 00648 * #preparedSegments for the log to use. 00649 */ 00650 std::thread segmentPreparer; 00651 }; 00652 00653 } // namespace LogCabin::Storage 00654 } // namespace LogCabin 00655 00656 #endif /* LOGCABIN_STORAGE_SEGMENTEDLOG_H */