LogCabin
Storage/SegmentedLog.h
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <deque>
00018 #include <thread>
00019 #include <vector>
00020 
00021 #include "build/Storage/SegmentedLog.pb.h"
00022 #include "Core/Buffer.h"
00023 #include "Core/ConditionVariable.h"
00024 #include "Core/Mutex.h"
00025 #include "Core/RollingStat.h"
00026 #include "Storage/FilesystemUtil.h"
00027 #include "Storage/Log.h"
00028 
00029 #ifndef LOGCABIN_STORAGE_SEGMENTEDLOG_H
00030 #define LOGCABIN_STORAGE_SEGMENTEDLOG_H
00031 
00032 namespace LogCabin {
00033 
00034 // forward declaration
00035 namespace Core {
00036 class Config;
00037 }
00038 
00039 namespace Storage {
00040 
00041 /**
00042  * This class persists a log on the filesystem efficiently.
00043  *
00044  * The log entries on disk are stored in a series of files called segments, and
00045  * each segment is about 8MB in size. Thus, most small appends do not need to
00046  * update filesystem metadata and can proceed with a single consecutive disk
00047  * write.
00048  *
00049  * The disk files consist of metadata files, closed segments, and open
00050  * segments. Metadata files are used to track Raft metadata, such as the
00051  * server's current term, and also the log's start index. Segments contain
00052  * contiguous entries that are part of the log. Closed segments are never
00053  * written to again (but may be renamed and truncated if a suffix of the log is
00054  * truncated). Open segments are where newly appended entries go. Once an open
00055  * segment reaches MAX_SEGMENT_SIZE, it is closed and a new one is used.
00056  *
00057  * Metadata files are named "metadata1" and "metadata2". The code alternates
00058  * between these so that there is always at least one readable metadata file.
00059  * On boot, the readable metadata file with the higher version number is used.
00060  *
00061  * Closed segments are named by the format string "%020lu-%020lu" with their
00062  * start and end indexes, both inclusive. Closed segments always contain at
00063  * least one entry; the end index is always at least as large as the start
00064  * index. Closed segment files may occasionally include data past their
00065  * filename's end index (these are ignored but a WARNING is issued). This can
00066  * happen if the suffix of the segment is truncated and a crash occurs at an
00067  * inopportune time (the segment file is first renamed, then truncated, and a
00068  * crash occurs in between).
00069  *
00070  * Open segments are named by the format string "open-%lu" with a unique
00071  * number. These should not exist when the server shuts down cleanly, but they
00072  * exist while the server is running and may be left around during a crash.
00073  * Open segments either contain entries which come after the last closed
00074  * segment or are full of zeros. When the server crashes while appending to an
00075  * open segment, the end of that file may be corrupt. We can't distinguish
00076  * between a corrupt file and a partially written entry. The code assumes it's
00077  * a partially written entry, issues a WARNING, and ignores it.
00078  *
00079  * Truncating a suffix of the log will remove all entries that are no longer
00080  * part of the log. Truncating a prefix of the log will only remove complete
00081  * segments that are before the new log start index. For example, if a
00082  * segment has entries 10 through 20 and the prefix of the log is truncated to
00083  * start at entry 15, that entire segment will be retained.
00084  *
00085  * Each segment file starts with a segment header, which currently contains
00086  * just a one-byte version number for the format of that segment. The current
00087  * format (version 1) is just a concatenation of serialized entry records.
00088  */
00089 class SegmentedLog : public Log {
00090     /**
00091      * Clock used for measuring disk performance.
00092      */
00093     typedef Core::Time::SteadyClock Clock;
00094     /**
00095      * Time point for measuring disk performance.
00096      */
00097     typedef Clock::time_point TimePoint;
00098 
00099   public:
00100 
00101     /**
00102      * Specifies how individual records are serialized.
00103      */
00104     enum class Encoding {
00105       /// ProtoBuf human-readable text format.
00106       TEXT,
00107       /// ProtoBuf binary format.
00108       BINARY,
00109     };
00110 
00111     /**
00112      * Constructor.
00113      * \param parentDir
00114      *      A filesystem directory in which all the files for this storage
00115      *      module are kept.
00116      * \param encoding
00117      *      Specifies how individual records are stored.
00118      * \param config
00119      *      Settings.
00120      */
00121     SegmentedLog(const FilesystemUtil::File& parentDir,
00122                  Encoding encoding,
00123                  const Core::Config& config);
00124     ~SegmentedLog();
00125 
00126     // Methods implemented from Log interface
00127     std::pair<uint64_t, uint64_t>
00128     append(const std::vector<const Entry*>& entries);
00129     const Entry& getEntry(uint64_t) const;
00130     uint64_t getLogStartIndex() const;
00131     uint64_t getLastLogIndex() const;
00132     std::string getName() const;
00133     uint64_t getSizeBytes() const;
00134     std::unique_ptr<Log::Sync> takeSync();
00135     void syncCompleteVirtual(std::unique_ptr<Log::Sync> sync);
00136     void truncatePrefix(uint64_t firstIndex);
00137     void truncateSuffix(uint64_t lastIndex);
00138     void updateMetadata();
00139     void updateServerStats(Protocol::ServerStats& serverStats) const;
00140 
00141   private:
00142 
00143     /**
00144      * A producer/consumer monitor for a queue of files to use for open
00145      * segments. These are created asynchronously, hopefully ahead of the log
00146      * appends.
00147      *
00148      * This class is written in a monitor style; each public method acquires
00149      * #mutex.
00150      */
00151     class PreparedSegments {
00152       public:
00153         /**
00154          * The type of element that is queued in #openSegments.
00155          * The first element of each pair is its filename relative to #dir. The
00156          * second element is its open file descriptor.
00157          */
00158         typedef std::pair<std::string, FilesystemUtil::File> OpenSegment;
00159 
00160         /**
00161          * Constructor.
00162          * \param queueSize
00163          *      The maximum number of prepared segments to hold in the queue
00164          *      at a time.
00165          */
00166         explicit PreparedSegments(uint64_t queueSize);
00167 
00168         /**
00169          * Destructor.
00170          */
00171         ~PreparedSegments();
00172 
00173         /**
00174          * Do not block any more waiting threads, and return immediately.
00175          */
00176         void exit();
00177 
00178         /**
00179          * Ensure that future filenames will be larger than this one.
00180          * This should generally be invoked before any producers are started,
00181          * since once they're started, there's no stopping them.
00182          * \param fileId
00183          *      A lower bound on future IDs.
00184          */
00185         void foundFile(uint64_t fileId);
00186 
00187         /**
00188          * Immediately return all currently prepared segments.
00189          */
00190         std::deque<OpenSegment> releaseAll();
00191 
00192         /**
00193          * Producers call this when they're done creating a new file. This must
00194          * be called once after each call waitForDemand(); otherwise, the
00195          * internal bookkeeping won't work.
00196          * \param segment
00197          *      File to queue up for a consumer.
00198          */
00199         void submitOpenSegment(OpenSegment segment);
00200 
00201         /**
00202          * Producers call this first to block until work becomes needed.
00203          * \return
00204          *      ID for new filename.
00205          * \throw Core::Util::ThreadInterruptedException
00206          *      If exit() has been called.
00207          */
00208         uint64_t waitForDemand();
00209 
00210         /**
00211          * Consumers call this when they need a prepared segment file.
00212          * \throw Core::Util::ThreadInterruptedException
00213          *      If exit() has been called.
00214          */
00215         OpenSegment waitForOpenSegment();
00216 
00217         /**
00218          * Reduce log message verbosity for unit tests.
00219          */
00220         bool quietForUnitTests;
00221 
00222       private:
00223         /**
00224          * Mutual exclusion for all of the members of this class.
00225          */
00226         Core::Mutex mutex;
00227         /**
00228          * Notified when #openSegments shrinks in size or when #exiting becomes
00229          * true.
00230          */
00231         Core::ConditionVariable consumed;
00232         /**
00233          * Notified when #openSegments grows in size or when #exiting becomes
00234          * true.
00235          */
00236         Core::ConditionVariable produced;
00237         /**
00238          * Set to true when waiters should exit.
00239          */
00240         bool exiting;
00241         /**
00242          * The number of producers that may be started to fulfill demand.
00243          */
00244         uint64_t demanded;
00245         /**
00246          * Used to assign filenames to open segments (which is done before the
00247          * indexes they contain is known). This number is the largest of all
00248          * previously known numbers so that it can be incremented then assigned
00249          * to a new file. It's possible that numbers are reused across reboots.
00250          */
00251         uint64_t filenameCounter;
00252         /**
00253          * The queue where open segments sit before they're consumed. These are
00254          * available for the log to use as future open segments.
00255          */
00256         std::deque<OpenSegment> openSegments;
00257     };
00258 
00259     /**
00260      * Queues various operations on files, such as writes and fsyncs, to be
00261      * executed later.
00262      */
00263     class Sync : public Log::Sync {
00264       public:
00265         struct Op {
00266             enum OpCode {
00267                 WRITE,
00268                 TRUNCATE,
00269                 RENAME,
00270                 FDATASYNC,
00271                 FSYNC,
00272                 CLOSE,
00273                 UNLINKAT,
00274                 NOOP,
00275             };
00276             Op(int fd, OpCode opCode)
00277                 : fd(fd)
00278                 , opCode(opCode)
00279                 , writeData()
00280                 , filename1()
00281                 , filename2()
00282                 , size(0)
00283             {
00284             }
00285             int fd;
00286             OpCode opCode;
00287             Core::Buffer writeData;
00288             std::string filename1;
00289             std::string filename2;
00290             uint64_t size;
00291         };
00292 
00293         explicit Sync(uint64_t lastIndex,
00294                       std::chrono::nanoseconds diskWriteDurationThreshold);
00295         ~Sync();
00296         /**
00297          * Add how long the filesystem ops took to 'nanos'. This is invoked
00298          * from syncCompleteVirtual so that it is thread-safe with respect to
00299          * the 'nanos' variable. We can't do it in 'wait' directly since that
00300          * can execute concurrently with someone reading 'nanos'.
00301          */
00302         void updateStats(Core::RollingStat& nanos) const;
00303         /**
00304          * Called at the start of wait to avoid some redundant disk flushes.
00305          */
00306         void optimize();
00307         void wait();
00308         /// If a wait() exceeds this time, log a warning.
00309         const std::chrono::nanoseconds diskWriteDurationThreshold;
00310         /// List of operations to perform during wait().
00311         std::deque<Op> ops;
00312         /// Time at start of wait() call.
00313         TimePoint waitStart;
00314         /// Time at end of wait() call.
00315         TimePoint waitEnd;
00316     };
00317 
00318     /**
00319      * An open or closed segment. These are stored in #segmentsByStartIndex.
00320      */
00321     struct Segment {
00322         /**
00323          * Describes a log entry record within a segment.
00324          */
00325         struct Record {
00326             /**
00327              * Constructor.
00328              */
00329             explicit Record(uint64_t offset);
00330 
00331             /**
00332              * Byte offset in the file where the entry begins.
00333              * This is used when truncating a segment.
00334              */
00335             uint64_t offset;
00336 
00337             /**
00338              * The entry itself.
00339              */
00340             Log::Entry entry;
00341         };
00342 
00343         /**
00344          * Default constructor. Sets the segment to invalid.
00345          */
00346         Segment();
00347 
00348         /**
00349          * Return a filename of the right form for a closed segment.
00350          * See also #filename.
00351          */
00352         std::string makeClosedFilename() const;
00353 
00354         /**
00355          * True for the open segment, false for closed segments.
00356          */
00357         bool isOpen;
00358         /**
00359          * The index of the first entry in the segment. If the segment is open
00360          * and empty, this may not exist yet and will be #logStartIndex.
00361          */
00362         uint64_t startIndex;
00363         /**
00364          * The index of the last entry in the segment, or #startIndex - 1 if
00365          * the segment is open and empty.
00366          */
00367         uint64_t endIndex;
00368         /**
00369          * Size in bytes of the valid entries stored in the file plus
00370          * the version number at the start of the file.
00371          */
00372         uint64_t bytes;
00373         /**
00374          * The name of the file within #dir containing this segment.
00375          */
00376         std::string filename;
00377         /**
00378          * The entries in this segment, from startIndex to endIndex, inclusive.
00379          */
00380         std::deque<Record> entries;
00381 
00382     };
00383 
00384     /**
00385      * This goes at the start of every segment.
00386      */
00387     struct SegmentHeader {
00388         /**
00389          * Always set to 1 for now.
00390          */
00391         uint8_t version;
00392     } __attribute__((packed));
00393 
00394     ////////// initialization helper functions //////////
00395 
00396     /**
00397      * List the files in #dir and create Segment objects for any of them that
00398      * look like segments. This is only used during initialization. These
00399      * segments are passed through #loadClosedSegment() and #loadOpenSegment()
00400      * next.
00401      * Also updates SegmentPreparer::filenameCounter.
00402      * \return
00403      *      Partially initialized Segment objects, one per discovered filename.
00404      */
00405     std::vector<Segment> readSegmentFilenames();
00406 
00407     /**
00408      * Read a metadata file from disk. This is only used during initialization.
00409      * \param filename
00410      *      Filename within #dir to attempt to open and read.
00411      * \param[out] metadata
00412      *      Where the contents of the file end up.
00413      * \param quiet
00414      *      Set to true to avoid warnings when the file can't be read; used in
00415      *      unit tests.
00416      * \return
00417      *      True if the file was read successfully, false otherwise.
00418      */
00419     bool readMetadata(const std::string& filename,
00420                              SegmentedLogMetadata::Metadata& metadata,
00421                              bool quiet) const;
00422 
00423     /**
00424      * Read the given closed segment from disk, issuing PANICs and WARNINGs
00425      * appropriately. This is only used during initialization.
00426      *
00427      * Deletes segment if its last index is below logStartIndex.
00428      *
00429      * Reads every entry described in the filename, and PANICs if any of those
00430      * can't be read.
00431      *
00432      * \param[in,out] segment
00433      *      Closed segment to read from disk.
00434      * \param logStartIndex
00435      *      The index of the first entry in the log, according to the log
00436      *      metadata.
00437      * \return
00438      *      True if the segment is valid; false if it has been removed entirely
00439      *      from disk.
00440      */
00441     bool loadClosedSegment(Segment& segment, uint64_t logStartIndex);
00442 
00443     /**
00444      * Read the given open segment from disk, issuing PANICs and WARNINGs
00445      * appropriately, and closing the segment. This is only used during
00446      * initialization.
00447      *
00448      * Reads up through the end of the file or the last
00449      * entry with a valid checksum. If any valid entries are read, the segment
00450      * is truncated and closed. Otherwise, it is removed.
00451      *
00452      * Deletes segment if its last index is below logStartIndex.
00453      *
00454      * \param[in,out] segment
00455      *      Open segment to read from disk.
00456      * \param logStartIndex
00457      *      The index of the first entry in the log, according to the log
00458      *      metadata.
00459      * \return
00460      *      True if the segment is valid; false if it has been removed entirely
00461      *      from disk.
00462      */
00463     bool loadOpenSegment(Segment& segment, uint64_t logStartIndex);
00464 
00465 
00466     ////////// normal operation helper functions //////////
00467 
00468     /**
00469      * Run through a bunch of assertions of class invariants (for debugging).
00470      * For example, there should always be one open segment. See
00471      * #shouldCheckInvariants, controlled by the config option 'storageDebug',
00472      * and the BUILDTYPE.
00473      */
00474     void checkInvariants();
00475 
00476     /**
00477      * Close the open segment if one is open. This removes the open segment if
00478      * it is empty, or closes it otherwise. Since it's a class invariant that
00479      * there is always an open segment, the caller should open a new segment
00480      * after calling this (unless it's shutting down).
00481      */
00482     void closeSegment();
00483 
00484     /**
00485      * Return a reference to the current open segment (the one that new writes
00486      * should go into). Crashes if there is no open segment (but it's an
00487      * invariant of this class to maintain one).
00488      */
00489     Segment& getOpenSegment();
00490     const Segment& getOpenSegment() const;
00491 
00492     /**
00493      * Set up a new open segment for the log head.
00494      * This is called when #append() needs more space but also when the end of
00495      * the log is truncated with #truncatePrefix() or #truncateSuffix().
00496      * \pre
00497      *      There is no currently open segment.
00498      */
00499     void openNewSegment();
00500 
00501     /**
00502      * Read the next ProtoBuf record out of 'file'.
00503      * \param file
00504      *      The open file, useful for error messages.
00505      * \param reader
00506      *      A reader for 'file'.
00507      * \param[in,out] offset
00508      *      The byte offset in the file at which to start reading as input.
00509      *      The byte just after the last byte of data as output if successful,
00510      *      otherwise unmodified.
00511      * \param[out] out
00512      *      An empty ProtoBuf to fill in.
00513      * \return
00514      *      Empty string if successful, otherwise error message.
00515      *
00516      * Format:
00517      *
00518      * |checksum|dataLen|data|
00519      *
00520      * The checksum is up to Core::Checksum::MAX_LENGTH bytes and is terminated
00521      * by a null character. It covers both dataLen and data.
00522      *
00523      * dataLen is an unsigned integer (8 bytes, big-endian byte order) that
00524      * specifies the length in bytes of data.
00525      *
00526      * data is a protobuf encoded as binary or text, depending on encoding.
00527      */
00528     std::string readProtoFromFile(const FilesystemUtil::File& file,
00529                                   FilesystemUtil::FileContents& reader,
00530                                   uint64_t* offset,
00531                                   google::protobuf::Message* out) const;
00532 
00533     /**
00534      * Prepare a ProtoBuf record to be written to disk.
00535      * \param in
00536      *      ProtoBuf to be serialized.
00537      * \return
00538      *      Buffer containing serialized record.
00539      */
00540     Core::Buffer serializeProto(const google::protobuf::Message& in) const;
00541 
00542     ////////// segment preparer thread functions //////////
00543 
00544     /**
00545      * Opens a file for a new segment and allocates its space on disk.
00546      * \param fileId
00547      *      ID to use to generate filename; see
00548      *      SegmentPreparer::filenameCounter.
00549      * \return
00550      *      Filename and writable OS-level file.
00551      */
00552     std::pair<std::string, FilesystemUtil::File>
00553     prepareNewSegment(uint64_t fileId);
00554 
00555     /**
00556      * The main function for the #segmentPreparer thread.
00557      */
00558     void segmentPreparerMain();
00559 
00560     ////////// member variables //////////
00561 
00562     /**
00563      * Specifies how individual records are stored.
00564      */
00565     const Encoding encoding;
00566 
00567     /**
00568      * The algorithm to use when writing new records. When reading records, any
00569      * available checksum is used.
00570      */
00571     const std::string checksumAlgorithm;
00572 
00573     /**
00574      * The maximum size in bytes for newly written segments. Controlled by the
00575      * 'storageSegmentBytes' config option.
00576      */
00577     const uint64_t MAX_SEGMENT_SIZE;
00578 
00579     /**
00580      * Set to true if checkInvariants() should do its job, or set to false for
00581      * performance.
00582      */
00583     const bool shouldCheckInvariants;
00584 
00585     /**
00586      * If a disk operation exceeds this much time, log a warning.
00587      */
00588     const std::chrono::milliseconds diskWriteDurationThreshold;
00589 
00590     /**
00591      * The metadata this class mintains. This should be combined with the
00592      * superclass's metadata when being written out to disk.
00593      */
00594     SegmentedLogMetadata::Metadata metadata;
00595 
00596     /**
00597      * The directory containing every file this log creates.
00598      */
00599     FilesystemUtil::File dir;
00600 
00601     /**
00602      * A writable OS-level file that contains the entries for the current open
00603      * segment. It is a class invariant that this is always a valid file.
00604      */
00605     FilesystemUtil::File openSegmentFile;
00606 
00607     /**
00608      * The index of the first entry in the log, see getLogStartIndex().
00609      */
00610     uint64_t logStartIndex;
00611 
00612     /**
00613      * Ordered map of all closed segments and the open segment, indexed by the
00614      * startIndex of each segment. This is used to support all the key
00615      * operations, such as looking up an entry and truncation.
00616      */
00617     std::map<uint64_t, Segment> segmentsByStartIndex;
00618 
00619     /**
00620      * The total number of bytes occupied by the closed segments on disk.
00621      * Used to calculate getSizeBytes() efficiently.
00622      */
00623     uint64_t totalClosedSegmentBytes;
00624 
00625     /**
00626      * See PreparedSegments.
00627      */
00628     PreparedSegments preparedSegments;
00629 
00630     /**
00631      * Accumulates deferred filesystem operations for append() and
00632      * truncatePrefix().
00633      */
00634     std::unique_ptr<SegmentedLog::Sync> currentSync;
00635 
00636     /**
00637      * Tracks the time it takes to write a metadata file.
00638      */
00639     Core::RollingStat metadataWriteNanos;
00640 
00641     /**
00642      * Tracks the time it takes to execute wait() on a Sync object.
00643      */
00644     Core::RollingStat filesystemOpsNanos;
00645 
00646     /**
00647      * Opens files, allocates the to full size, and places them on
00648      * #preparedSegments for the log to use.
00649      */
00650     std::thread segmentPreparer;
00651 };
00652 
00653 } // namespace LogCabin::Storage
00654 } // namespace LogCabin
00655 
00656 #endif /* LOGCABIN_STORAGE_SEGMENTEDLOG_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines