LogCabin
Storage/SegmentedLog.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2014 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #define _BSD_SOURCE
00018 #include <endian.h>
00019 
00020 #include <algorithm>
00021 #include <fcntl.h>
00022 #include <sys/stat.h>
00023 #include <unistd.h>
00024 
00025 #include "build/Protocol/Raft.pb.h"
00026 #include "Core/Checksum.h"
00027 #include "Core/Debug.h"
00028 #include "Core/ProtoBuf.h"
00029 #include "Core/StringUtil.h"
00030 #include "Core/ThreadId.h"
00031 #include "Core/Time.h"
00032 #include "Core/Util.h"
00033 #include "Storage/FilesystemUtil.h"
00034 #include "Storage/SegmentedLog.h"
00035 #include "Server/Globals.h"
00036 
00037 namespace LogCabin {
00038 namespace Storage {
00039 
00040 namespace FS = FilesystemUtil;
00041 using Core::StringUtil::format;
00042 
00043 namespace {
00044 
00045 /**
00046  * Format string for open segment filenames.
00047  * First param: incrementing counter.
00048  */
00049 #define OPEN_SEGMENT_FORMAT "open-%lu"
00050 
00051 /**
00052  * Format string for closed segment filenames.
00053  * First param: start index, inclusive.
00054  * Second param: end index, inclusive.
00055  */
00056 #define CLOSED_SEGMENT_FORMAT "%020lu-%020lu"
00057 
00058 /**
00059  * Return true if all the bytes in range [start, start + length) are zero.
00060  */
00061 bool
00062 isAllZeros(const void* _start, size_t length)
00063 {
00064     const uint8_t* start = static_cast<const uint8_t*>(_start);
00065     for (size_t offset = 0; offset < length; ++offset) {
00066         if (start[offset] != 0)
00067             return false;
00068     }
00069     return true;
00070 }
00071 
00072 } // anonymous namespace
00073 
00074 
00075 ////////// SegmentedLog::PreparedSegments //////////
00076 
00077 
00078 SegmentedLog::PreparedSegments::PreparedSegments(uint64_t queueSize)
00079     : quietForUnitTests(false)
00080     , mutex()
00081     , consumed()
00082     , produced()
00083     , exiting(false)
00084     , demanded(queueSize)
00085     , filenameCounter(0)
00086     , openSegments()
00087 {
00088 }
00089 
00090 SegmentedLog::PreparedSegments::~PreparedSegments()
00091 {
00092 }
00093 
00094 void
00095 SegmentedLog::PreparedSegments::exit()
00096 {
00097     std::lock_guard<Core::Mutex> lockGuard(mutex);
00098     exiting = true;
00099     consumed.notify_all();
00100     produced.notify_all();
00101 }
00102 
00103 void
00104 SegmentedLog::PreparedSegments::foundFile(uint64_t fileId)
00105 {
00106     std::lock_guard<Core::Mutex> lockGuard(mutex);
00107     if (filenameCounter < fileId)
00108         filenameCounter = fileId;
00109 }
00110 
00111 std::deque<SegmentedLog::PreparedSegments::OpenSegment>
00112 SegmentedLog::PreparedSegments::releaseAll()
00113 {
00114     std::lock_guard<Core::Mutex> lockGuard(mutex);
00115     std::deque<OpenSegment> ret;
00116     std::swap(openSegments, ret);
00117     return ret;
00118 }
00119 
00120 void
00121 SegmentedLog::PreparedSegments::submitOpenSegment(OpenSegment segment)
00122 {
00123     std::lock_guard<Core::Mutex> lockGuard(mutex);
00124     openSegments.push_back(std::move(segment));
00125     produced.notify_one();
00126 }
00127 
00128 uint64_t
00129 SegmentedLog::PreparedSegments::waitForDemand()
00130 {
00131     std::unique_lock<Core::Mutex> lockGuard(mutex);
00132     while (!exiting) {
00133         if (demanded > 0) {
00134             --demanded;
00135             ++filenameCounter;
00136             return filenameCounter;
00137         }
00138         consumed.wait(lockGuard);
00139     }
00140     throw Core::Util::ThreadInterruptedException();
00141 }
00142 
00143 SegmentedLog::PreparedSegments::OpenSegment
00144 SegmentedLog::PreparedSegments::waitForOpenSegment()
00145 {
00146     std::unique_lock<Core::Mutex> lockGuard(mutex);
00147     uint64_t numWaits = 0;
00148     while (true) {
00149         if (exiting) {
00150             NOTICE("Exiting");
00151             throw Core::Util::ThreadInterruptedException();
00152         }
00153         if (!openSegments.empty()) {
00154             break;
00155         }
00156         if (numWaits == 0 && !quietForUnitTests) {
00157             WARNING("Prepared segment not ready, having to wait on it. "
00158                     "This is perfectly safe but bad for performance. "
00159                     "Consider increasing storageOpenSegments in the config.");
00160         }
00161         ++numWaits;
00162         produced.wait(lockGuard);
00163     }
00164     if (numWaits > 0 && !quietForUnitTests) {
00165         WARNING("Done waiting: prepared segment now ready");
00166     }
00167     OpenSegment r = std::move(openSegments.front());
00168     openSegments.pop_front();
00169     consumed.notify_one();
00170     ++demanded;
00171     return r;
00172 }
00173 
00174 
00175 ////////// SegmentedLog::Sync //////////
00176 
00177 
00178 SegmentedLog::Sync::Sync(uint64_t lastIndex,
00179                          std::chrono::nanoseconds diskWriteDurationThreshold)
00180     : Log::Sync(lastIndex)
00181     , diskWriteDurationThreshold(diskWriteDurationThreshold)
00182     , ops()
00183     , waitStart(TimePoint::max())
00184     , waitEnd(TimePoint::max())
00185 {
00186 }
00187 
00188 SegmentedLog::Sync::~Sync()
00189 {
00190 }
00191 
00192 void
00193 SegmentedLog::Sync::optimize()
00194 {
00195     if (ops.size() < 3)
00196         return;
00197     auto prev = ops.begin();
00198     auto it = prev + 1;
00199     auto next = it + 1;
00200     while (next != ops.end()) {
00201         if (prev->opCode == Op::FDATASYNC &&
00202             it->opCode == Op::WRITE &&
00203             next->opCode == Op::FDATASYNC &&
00204             prev->fd == it->fd &&
00205             it->fd == next->fd) {
00206             prev->opCode = Op::NOOP;
00207         }
00208         prev = it;
00209         it = next;
00210         ++next;
00211     }
00212 }
00213 
00214 void
00215 SegmentedLog::Sync::wait()
00216 {
00217     optimize();
00218 
00219     waitStart = Clock::now();
00220     uint64_t writes = 0;
00221     uint64_t totalBytesWritten = 0;
00222     uint64_t truncates = 0;
00223     uint64_t renames = 0;
00224     uint64_t fdatasyncs = 0;
00225     uint64_t fsyncs = 0;
00226     uint64_t closes = 0;
00227     uint64_t unlinks = 0;
00228 
00229     while (!ops.empty()) {
00230         Op& op = ops.front();
00231         FS::File f(op.fd, "-unknown-");
00232         switch (op.opCode) {
00233             case Op::WRITE: {
00234                 ssize_t written = FS::write(op.fd,
00235                         op.writeData.getData(),
00236                         op.writeData.getLength());
00237                 if (written < 0) {
00238                     PANIC("Failed to write to fd %d: %s",
00239                           op.fd,
00240                           strerror(errno));
00241                 }
00242                 ++writes;
00243                 totalBytesWritten += op.writeData.getLength();
00244                 break;
00245             }
00246             case Op::TRUNCATE: {
00247                 FS::truncate(f, op.size);
00248                 ++truncates;
00249                 break;
00250             }
00251             case Op::RENAME: {
00252                 FS::rename(f, op.filename1,
00253                            f, op.filename2);
00254                 ++renames;
00255                 break;
00256             }
00257             case Op::FDATASYNC: {
00258                 FS::fdatasync(f);
00259                 ++fdatasyncs;
00260                 break;
00261             }
00262             case Op::FSYNC: {
00263                 FS::fsync(f);
00264                 ++fsyncs;
00265                 break;
00266             }
00267             case Op::CLOSE: {
00268                 f.close();
00269                 ++closes;
00270                 break;
00271             }
00272             case Op::UNLINKAT: {
00273                 FS::removeFile(f, op.filename1);
00274                 ++unlinks;
00275                 break;
00276             }
00277             case Op::NOOP: {
00278                 break;
00279             }
00280         }
00281         f.release();
00282         ops.pop_front();
00283     }
00284 
00285     waitEnd = Clock::now();
00286     std::chrono::nanoseconds elapsed = waitEnd - waitStart;
00287     if (elapsed > diskWriteDurationThreshold) {
00288         WARNING("Executing filesystem operations took longer than expected "
00289                 "(%s for %lu writes totaling %lu bytes, %lu truncates, "
00290                 "%lu renames, %lu fdatasyncs, %lu fsyncs, %lu closes, and "
00291                 "%lu unlinks)",
00292                 Core::StringUtil::toString(elapsed).c_str(),
00293                 writes,
00294                 totalBytesWritten,
00295                 truncates,
00296                 renames,
00297                 fdatasyncs,
00298                 fsyncs,
00299                 closes,
00300                 unlinks);
00301     }
00302 }
00303 
00304 void
00305 SegmentedLog::Sync::updateStats(Core::RollingStat& nanos) const
00306 {
00307     std::chrono::nanoseconds elapsed = waitEnd - waitStart;
00308     nanos.push(uint64_t(elapsed.count()));
00309     if (elapsed > diskWriteDurationThreshold)
00310         nanos.noteExceptional(waitStart, uint64_t(elapsed.count()));
00311 }
00312 
00313 
00314 ////////// SegmentedLog::Segment::Record //////////
00315 
00316 
00317 SegmentedLog::Segment::Record::Record(uint64_t offset)
00318     : offset(offset)
00319     , entry()
00320 {
00321 }
00322 
00323 
00324 ////////// SegmentedLog::Segment //////////
00325 
00326 SegmentedLog::Segment::Segment()
00327     : isOpen(false)
00328     , startIndex(~0UL)
00329     , endIndex(~0UL - 1)
00330     , bytes(0)
00331     , filename("--invalid--")
00332     , entries()
00333 {
00334 }
00335 
00336 std::string
00337 SegmentedLog::Segment::makeClosedFilename() const
00338 {
00339     return format(CLOSED_SEGMENT_FORMAT,
00340                   startIndex, endIndex);
00341 }
00342 
00343 ////////// SegmentedLog public functions //////////
00344 
00345 
00346 SegmentedLog::SegmentedLog(const FS::File& parentDir,
00347                            Encoding encoding,
00348                            const Core::Config& config)
00349     : encoding(encoding)
00350     , checksumAlgorithm(config.read<std::string>("storageChecksum", "CRC32"))
00351     , MAX_SEGMENT_SIZE(config.read<uint64_t>("storageSegmentBytes",
00352                                              8 * 1024 * 1024))
00353     , shouldCheckInvariants(config.read<bool>("storageDebug", false))
00354     , diskWriteDurationThreshold(config.read<uint64_t>(
00355         "electionTimeoutMilliseconds", 500) / 4)
00356     , metadata()
00357     , dir(FS::openDir(parentDir,
00358                       (encoding == Encoding::BINARY
00359                         ? "Segmented-Binary"
00360                         : "Segmented-Text")))
00361     , openSegmentFile()
00362     , logStartIndex(1)
00363     , segmentsByStartIndex()
00364     , totalClosedSegmentBytes(0)
00365     , preparedSegments(
00366         std::max(config.read<uint64_t>("storageOpenSegments", 3),
00367                  1UL))
00368     , currentSync(new SegmentedLog::Sync(0, diskWriteDurationThreshold))
00369     , metadataWriteNanos()
00370     , filesystemOpsNanos()
00371     , segmentPreparer()
00372 {
00373     std::vector<Segment> segments = readSegmentFilenames();
00374 
00375     bool quiet = config.read<bool>("unittest-quiet", false);
00376     preparedSegments.quietForUnitTests = quiet;
00377     SegmentedLogMetadata::Metadata metadata1;
00378     SegmentedLogMetadata::Metadata metadata2;
00379     bool ok1 = readMetadata("metadata1", metadata1, quiet);
00380     bool ok2 = readMetadata("metadata2", metadata2, quiet);
00381     if (ok1 && ok2) {
00382         if (metadata1.version() > metadata2.version())
00383             metadata = metadata1;
00384         else
00385             metadata = metadata2;
00386     } else if (ok1) {
00387         metadata = metadata1;
00388     } else if (ok2) {
00389         metadata = metadata2;
00390     } else {
00391         // Brand new servers won't have metadata, and that's ok.
00392         if (!segments.empty()) {
00393             PANIC("No readable metadata file but found segments in %s",
00394                   dir.path.c_str());
00395         }
00396         metadata.set_entries_start(logStartIndex);
00397     }
00398 
00399     logStartIndex = metadata.entries_start();
00400     Log::metadata = metadata.raft_metadata();
00401     // Write both metadata files
00402     updateMetadata();
00403     updateMetadata();
00404     FS::fsync(dir); // in case metadata files didn't exist
00405 
00406 
00407     // Read data from segments, closing any open segments.
00408     for (auto it = segments.begin(); it != segments.end(); ++it) {
00409         Segment& segment = *it;
00410         bool keep = segment.isOpen ? loadOpenSegment(segment, logStartIndex)
00411                                    : loadClosedSegment(segment, logStartIndex);
00412         if (keep) {
00413             assert(!segment.isOpen);
00414             uint64_t startIndex = segment.startIndex;
00415             std::string filename = segment.filename;
00416             auto result = segmentsByStartIndex.insert({startIndex,
00417                                                        std::move(segment)});
00418             if (!result.second) {
00419                 Segment& other = result.first->second;
00420                 PANIC("Two segments contain entry %lu: %s and %s",
00421                       startIndex,
00422                       other.filename.c_str(),
00423                       filename.c_str());
00424             }
00425         }
00426     }
00427 
00428     // Check to make sure no entry is present in more than one segment,
00429     // and that there's no gap in the numbering for entries we have.
00430     if (!segmentsByStartIndex.empty()) {
00431         uint64_t nextIndex = segmentsByStartIndex.begin()->first;
00432         for (auto it = segmentsByStartIndex.begin();
00433              it != segmentsByStartIndex.end();
00434              ++it) {
00435             Segment& segment = it->second;
00436             if (nextIndex < segment.startIndex) {
00437                 PANIC("Did not find segment containing entries "
00438                       "%lu to %lu (inclusive)",
00439                       nextIndex, segment.startIndex - 1);
00440             } else if (segment.startIndex < nextIndex) {
00441                 PANIC("Segment %s contains duplicate entries "
00442                       "%lu to %lu (inclusive)",
00443                       segment.filename.c_str(),
00444                       segment.startIndex,
00445                       std::min(segment.endIndex,
00446                                nextIndex - 1));
00447             }
00448             nextIndex = segment.endIndex + 1;
00449         }
00450     }
00451 
00452     // Open a segment to write new entries into.
00453     uint64_t fileId = preparedSegments.waitForDemand();
00454     preparedSegments.submitOpenSegment(
00455         prepareNewSegment(fileId));
00456     openNewSegment();
00457 
00458     // Launch the segment preparer thread so that we'll have a source for
00459     // additional new segments.
00460     segmentPreparer = std::thread(&SegmentedLog::segmentPreparerMain, this);
00461 
00462     checkInvariants();
00463 }
00464 
00465 SegmentedLog::~SegmentedLog()
00466 {
00467     NOTICE("Closing open segment");
00468     closeSegment();
00469 
00470     // Stop preparing segments and delete the extras.
00471     preparedSegments.exit();
00472     if (segmentPreparer.joinable())
00473         segmentPreparer.join();
00474     auto prepared = preparedSegments.releaseAll();
00475     while (!prepared.empty()) {
00476         std::string filename = prepared.front().first;
00477         NOTICE("Removing unused open segment: %s",
00478                filename.c_str());
00479         FS::removeFile(dir, filename);
00480         prepared.pop_front();
00481     }
00482     FS::fsync(dir);
00483 
00484     // Keep assertion in Log.h happy. No need to "take" and "complete" this
00485     // sync since no operations were performed.
00486     if (currentSync->ops.empty())
00487         currentSync->completed = true;
00488 }
00489 
00490 std::pair<uint64_t, uint64_t>
00491 SegmentedLog::append(const std::vector<const Entry*>& entries)
00492 {
00493     Segment* openSegment = &getOpenSegment();
00494     uint64_t startIndex = openSegment->endIndex + 1;
00495     uint64_t index = startIndex;
00496     for (auto it = entries.begin(); it != entries.end(); ++it) {
00497         Segment::Record record(openSegment->bytes);
00498         // Note that record.offset may change later, if this entry doesn't fit.
00499         record.entry = **it;
00500         if (record.entry.has_index()) {
00501             assert(index == record.entry.index());
00502         } else {
00503             record.entry.set_index(index);
00504         }
00505         Core::Buffer buf = serializeProto(record.entry);
00506 
00507         // See if we need to roll over to a new head segment. If someone is
00508         // writing an entry that is bigger than MAX_SEGMENT_SIZE, just put it
00509         // in its own segment. This duplicates some code from closeSegment(),
00510         // but queues up the operations into 'currentSync'.
00511         if (openSegment->bytes > sizeof(SegmentHeader) &&
00512             openSegment->bytes + buf.getLength() > MAX_SEGMENT_SIZE) {
00513             NOTICE("Rolling over to new head segment: trying to append new "
00514                    "entry that is %lu bytes long, but open segment is already "
00515                    "%lu of %lu bytes large",
00516                    buf.getLength(),
00517                    openSegment->bytes,
00518                    MAX_SEGMENT_SIZE);
00519 
00520             // Truncate away any extra 0 bytes at the end from when
00521             // MAX_SEGMENT_SIZE was allocated.
00522             currentSync->ops.emplace_back(openSegmentFile.fd,
00523                                           Sync::Op::TRUNCATE);
00524             currentSync->ops.back().size = openSegment->bytes;
00525             currentSync->ops.emplace_back(openSegmentFile.fd,
00526                                           Sync::Op::FSYNC);
00527             currentSync->ops.emplace_back(openSegmentFile.release(),
00528                                           Sync::Op::CLOSE);
00529 
00530             // Rename the file.
00531             std::string newFilename = openSegment->makeClosedFilename();
00532             NOTICE("Closing full segment (was %s, renaming to %s)",
00533                    openSegment->filename.c_str(),
00534                    newFilename.c_str());
00535             currentSync->ops.emplace_back(dir.fd, Sync::Op::RENAME);
00536             currentSync->ops.back().filename1 = openSegment->filename;
00537             currentSync->ops.back().filename2 = newFilename;
00538             currentSync->ops.emplace_back(dir.fd, Sync::Op::FSYNC);
00539             openSegment->filename = newFilename;
00540 
00541             // Bookkeeping.
00542             openSegment->isOpen = false;
00543             totalClosedSegmentBytes += openSegment->bytes;
00544 
00545             // Open new segment.
00546             openNewSegment();
00547             openSegment = &getOpenSegment();
00548             record.offset = openSegment->bytes;
00549         }
00550 
00551         if (buf.getLength() > MAX_SEGMENT_SIZE) {
00552             WARNING("Trying to append an entry of %lu bytes when the maximum "
00553                     "segment size is %lu bytes. Placing this entry in its own "
00554                     "segment. Consider adjusting 'storageSegmentBytes' in the "
00555                     "config.",
00556                     buf.getLength(),
00557                     MAX_SEGMENT_SIZE);
00558         }
00559 
00560         openSegment->entries.emplace_back(std::move(record));
00561         openSegment->bytes += buf.getLength();
00562         currentSync->ops.emplace_back(openSegmentFile.fd, Sync::Op::WRITE);
00563         currentSync->ops.back().writeData = std::move(buf);
00564         ++openSegment->endIndex;
00565         ++index;
00566     }
00567 
00568     currentSync->ops.emplace_back(openSegmentFile.fd, Sync::Op::FDATASYNC);
00569     currentSync->lastIndex = getLastLogIndex();
00570     checkInvariants();
00571     return {startIndex, getLastLogIndex()};
00572 }
00573 
00574 const SegmentedLog::Entry&
00575 SegmentedLog::getEntry(uint64_t index) const
00576 {
00577     if (index < getLogStartIndex() ||
00578         index > getLastLogIndex()) {
00579         PANIC("Attempted to access entry %lu outside of log "
00580               "(start index is %lu, last index is %lu)",
00581               index, getLogStartIndex(), getLastLogIndex());
00582     }
00583     auto it = segmentsByStartIndex.upper_bound(index);
00584     --it;
00585     const Segment& segment = it->second;
00586     assert(segment.startIndex <= index);
00587     assert(index <= segment.endIndex);
00588     return segment.entries.at(index - segment.startIndex).entry;
00589 }
00590 
00591 uint64_t
00592 SegmentedLog::getLogStartIndex() const
00593 {
00594     return logStartIndex;
00595 }
00596 
00597 uint64_t
00598 SegmentedLog::getLastLogIndex() const
00599 {
00600     // Although it's a class invariant that there's always an open segment,
00601     // it's convenient to be able to call this as a helper function when there
00602     // are no segments.
00603     if (segmentsByStartIndex.empty())
00604         return logStartIndex - 1;
00605     else
00606         return getOpenSegment().endIndex;
00607 }
00608 
00609 std::string
00610 SegmentedLog::getName() const
00611 {
00612     if (encoding == Encoding::BINARY)
00613         return "Segmented-Binary";
00614     else
00615         return "Segmented-Text";
00616 }
00617 
00618 uint64_t
00619 SegmentedLog::getSizeBytes() const
00620 {
00621     return totalClosedSegmentBytes + getOpenSegment().bytes;
00622 }
00623 
00624 std::unique_ptr<Log::Sync>
00625 SegmentedLog::takeSync()
00626 {
00627     std::unique_ptr<SegmentedLog::Sync> other(
00628             new SegmentedLog::Sync(getLastLogIndex(),
00629                                    diskWriteDurationThreshold));
00630     std::swap(other, currentSync);
00631     return std::move(other);
00632 }
00633 
00634 void
00635 SegmentedLog::syncCompleteVirtual(std::unique_ptr<Log::Sync> sync)
00636 {
00637     static_cast<SegmentedLog::Sync*>(sync.get())->
00638         updateStats(filesystemOpsNanos);
00639 }
00640 
00641 void
00642 SegmentedLog::truncatePrefix(uint64_t newStartIndex)
00643 {
00644     if (newStartIndex <= logStartIndex)
00645         return;
00646 
00647     NOTICE("Truncating log to start at index %lu (was %lu)",
00648            newStartIndex, logStartIndex);
00649     logStartIndex = newStartIndex;
00650     // update metadata before removing files in case of interruption
00651     updateMetadata();
00652 
00653     while (!segmentsByStartIndex.empty()) {
00654         Segment& segment = segmentsByStartIndex.begin()->second;
00655         if (logStartIndex <= segment.endIndex)
00656             break;
00657         NOTICE("Deleting unneeded segment %s (its end index is %lu)",
00658                segment.filename.c_str(),
00659                segment.endIndex);
00660         currentSync->ops.emplace_back(dir.fd, Sync::Op::UNLINKAT);
00661         currentSync->ops.back().filename1 = segment.filename;
00662         if (segment.isOpen) {
00663             currentSync->ops.emplace_back(openSegmentFile.release(),
00664                                           Sync::Op::CLOSE);
00665         } else {
00666             totalClosedSegmentBytes -= segment.bytes;
00667         }
00668         segmentsByStartIndex.erase(segmentsByStartIndex.begin());
00669     }
00670 
00671     if (segmentsByStartIndex.empty())
00672         openNewSegment();
00673     if (currentSync->lastIndex < logStartIndex - 1)
00674         currentSync->lastIndex = logStartIndex - 1;
00675     checkInvariants();
00676 }
00677 
00678 void
00679 SegmentedLog::truncateSuffix(uint64_t newEndIndex)
00680 {
00681     if (newEndIndex >= getLastLogIndex())
00682         return;
00683 
00684     NOTICE("Truncating log to end at index %lu (was %lu)",
00685            newEndIndex, getLastLogIndex());
00686     { // Check if the open segment has some entries we need. If so,
00687       // just truncate that segment, open a new one, and return.
00688         Segment& openSegment = getOpenSegment();
00689         if (newEndIndex >= openSegment.startIndex) {
00690             // Update in-memory segment
00691             uint64_t i = newEndIndex + 1 - openSegment.startIndex;
00692             openSegment.bytes = openSegment.entries.at(i).offset;
00693             openSegment.entries.erase(
00694                 openSegment.entries.begin() + int64_t(i),
00695                 openSegment.entries.end());
00696             openSegment.endIndex = newEndIndex;
00697             // Truncate and close the open segment, and open a new one.
00698             closeSegment();
00699             openNewSegment();
00700             checkInvariants();
00701             return;
00702         }
00703     }
00704 
00705     { // Remove the open segment.
00706         Segment& openSegment = getOpenSegment();
00707         openSegment.endIndex = openSegment.startIndex - 1;
00708         openSegment.bytes = 0;
00709         closeSegment();
00710     }
00711 
00712     // Remove and/or truncate closed segments.
00713     while (!segmentsByStartIndex.empty()) {
00714         auto it = segmentsByStartIndex.rbegin();
00715         Segment& segment = it->second;
00716         if (segment.endIndex == newEndIndex)
00717             break;
00718         if (segment.startIndex > newEndIndex) { // remove segment
00719             NOTICE("Removing closed segment %s", segment.filename.c_str());
00720             FS::removeFile(dir, segment.filename);
00721             FS::fsync(dir);
00722             totalClosedSegmentBytes -= segment.bytes;
00723             segmentsByStartIndex.erase(segment.startIndex);
00724         } else if (segment.endIndex > newEndIndex) { // truncate segment
00725             // Update in-memory segment
00726             uint64_t i = newEndIndex + 1 - segment.startIndex;
00727             uint64_t newBytes = segment.entries.at(i).offset;
00728             totalClosedSegmentBytes -= (segment.bytes - newBytes);
00729             segment.bytes = newBytes;
00730             segment.entries.erase(
00731                 segment.entries.begin() + int64_t(i),
00732                 segment.entries.end());
00733             segment.endIndex = newEndIndex;
00734 
00735             // Rename the file
00736             std::string newFilename = segment.makeClosedFilename();
00737             NOTICE("Truncating closed segment (was %s, renaming to %s)",
00738                    segment.filename.c_str(),
00739                    newFilename.c_str());
00740             FS::rename(dir, segment.filename,
00741                        dir, newFilename);
00742             FS::fsync(dir);
00743             segment.filename = newFilename;
00744 
00745             // Truncate the file
00746             FS::File f = FS::openFile(dir, segment.filename, O_WRONLY);
00747             FS::truncate(f, segment.bytes);
00748             FS::fsync(f);
00749         }
00750     }
00751 
00752     // Reopen a segment (so that we can write again)
00753     openNewSegment();
00754     checkInvariants();
00755 }
00756 
00757 void
00758 SegmentedLog::updateMetadata()
00759 {
00760     if (Log::metadata.ByteSize() == 0)
00761         metadata.clear_raft_metadata();
00762     else
00763         *metadata.mutable_raft_metadata() = Log::metadata;
00764     metadata.set_format_version(1);
00765     metadata.set_entries_start(logStartIndex);
00766     metadata.set_version(metadata.version() + 1);
00767     std::string filename;
00768     if (metadata.version() % 2 == 1) {
00769         filename = "metadata1";
00770     } else {
00771         filename = "metadata2";
00772     }
00773 
00774     TimePoint start = Clock::now();
00775 
00776     NOTICE("Writing new storage metadata (version %lu) to %s",
00777            metadata.version(),
00778            filename.c_str());
00779     FS::File file = FS::openFile(dir, filename, O_CREAT|O_WRONLY|O_TRUNC);
00780     Core::Buffer record = serializeProto(metadata);
00781     ssize_t written = FS::write(file.fd,
00782                                 record.getData(),
00783                                 record.getLength());
00784     if (written == -1) {
00785         PANIC("Failed to write to %s: %s",
00786               file.path.c_str(), strerror(errno));
00787     }
00788     FS::fsync(file);
00789 
00790     TimePoint end = Clock::now();
00791     std::chrono::nanoseconds elapsed = end - start;
00792     metadataWriteNanos.push(uint64_t(elapsed.count()));
00793     if (elapsed > diskWriteDurationThreshold) {
00794         WARNING("Writing metadata file took longer than expected "
00795                 "(%s for %lu bytes)",
00796                 Core::StringUtil::toString(elapsed).c_str(),
00797                 record.getLength());
00798         metadataWriteNanos.noteExceptional(start, uint64_t(elapsed.count()));
00799     }
00800 }
00801 
00802 void
00803 SegmentedLog::updateServerStats(Protocol::ServerStats& serverStats) const
00804 {
00805     Protocol::ServerStats::Storage& stats = *serverStats.mutable_storage();
00806     stats.set_num_segments(segmentsByStartIndex.size());
00807     stats.set_open_segment_bytes(getOpenSegment().bytes);
00808     stats.set_metadata_version(metadata.version());
00809     metadataWriteNanos.updateProtoBuf(*stats.mutable_metadata_write_nanos());
00810     filesystemOpsNanos.updateProtoBuf(*stats.mutable_filesystem_ops_nanos());
00811 }
00812 
00813 
00814 ////////// SegmentedLog initialization helper functions //////////
00815 
00816 
00817 std::vector<SegmentedLog::Segment>
00818 SegmentedLog::readSegmentFilenames()
00819 {
00820     std::vector<Segment> segments;
00821     std::vector<std::string> filenames = FS::ls(dir);
00822     // sorting isn't strictly necessary, but it helps with unit tests
00823     std::sort(filenames.begin(), filenames.end());
00824     for (auto it = filenames.begin(); it != filenames.end(); ++it) {
00825         const std::string& filename = *it;
00826         if (filename == "metadata1" ||
00827             filename == "metadata2") {
00828             continue;
00829         }
00830         Segment segment;
00831         segment.filename = filename;
00832         segment.bytes = 0;
00833         { // Closed segment: xxx-yyy
00834             uint64_t startIndex = 1;
00835             uint64_t endIndex = 0;
00836             unsigned bytesConsumed;
00837             int matched = sscanf(filename.c_str(),
00838                                  CLOSED_SEGMENT_FORMAT "%n",
00839                                  &startIndex, &endIndex,
00840                                  &bytesConsumed);
00841             if (matched == 2 && bytesConsumed == filename.length()) {
00842                 segment.isOpen = false;
00843                 segment.startIndex = startIndex;
00844                 segment.endIndex = endIndex;
00845                 segments.push_back(segment);
00846                 continue;
00847             }
00848         }
00849 
00850         { // Open segment: open-xxx
00851             uint64_t counter;
00852             unsigned bytesConsumed;
00853             int matched = sscanf(filename.c_str(),
00854                                  OPEN_SEGMENT_FORMAT "%n",
00855                                  &counter,
00856                                  &bytesConsumed);
00857             if (matched == 1 && bytesConsumed == filename.length()) {
00858                 segment.isOpen = true;
00859                 segment.startIndex = ~0UL;
00860                 segment.endIndex = ~0UL - 1;
00861                 segments.push_back(segment);
00862                 preparedSegments.foundFile(counter);
00863                 continue;
00864             }
00865         }
00866 
00867         // Neither
00868         WARNING("%s doesn't look like a valid segment filename (from %s)",
00869                 filename.c_str(),
00870                 (dir.path + "/" + filename).c_str());
00871     }
00872     return segments;
00873 }
00874 
00875 bool
00876 SegmentedLog::readMetadata(const std::string& filename,
00877                            SegmentedLogMetadata::Metadata& metadata,
00878                            bool quiet) const
00879 {
00880     std::string error;
00881     FS::File file = FS::tryOpenFile(dir, filename, O_RDONLY);
00882     if (file.fd == -1) {
00883         error = format("Could not open %s/%s: %s",
00884                        dir.path.c_str(), filename.c_str(), strerror(errno));
00885     } else {
00886         FS::FileContents reader(file);
00887         uint64_t offset = 0;
00888         error = readProtoFromFile(file, reader, &offset, &metadata);
00889     }
00890     if (error.empty()) {
00891         if (metadata.format_version() > 1) {
00892             PANIC("The format version found in %s is %lu but this code "
00893                   "only understands version 1",
00894                   filename.c_str(),
00895                   metadata.format_version());
00896         }
00897         NOTICE("Read metadata version %lu from %s",
00898                metadata.version(), filename.c_str());
00899         return true;
00900     } else {
00901         if (!quiet) {
00902             WARNING("Error reading metadata from %s: %s",
00903                     filename.c_str(), error.c_str());
00904         }
00905         return false;
00906     }
00907 }
00908 
00909 bool
00910 SegmentedLog::loadClosedSegment(Segment& segment, uint64_t logStartIndex)
00911 {
00912     assert(!segment.isOpen);
00913     FS::File file = FS::openFile(dir, segment.filename, O_RDWR);
00914     FS::FileContents reader(file);
00915     uint64_t offset = 0;
00916 
00917     if (reader.getFileLength() < 1) {
00918         PANIC("Found completely empty segment file %s (it doesn't even have "
00919               "a version field)",
00920               segment.filename.c_str());
00921     } else {
00922         uint8_t version = *reader.get<uint8_t>(0, 1);
00923         offset += 1;
00924         if (version != 1) {
00925             PANIC("Segment version read from %s was %u, but this code can "
00926                   "only read version 1",
00927                   segment.filename.c_str(),
00928                   version);
00929         }
00930     }
00931 
00932     if (segment.endIndex < logStartIndex) {
00933         NOTICE("Removing closed segment whose entries are no longer "
00934                "needed (last index is %lu but log start index is %lu): %s",
00935                segment.endIndex,
00936                logStartIndex,
00937                segment.filename.c_str());
00938         FS::removeFile(dir, segment.filename);
00939         FS::fsync(dir);
00940         return false;
00941     }
00942 
00943     for (uint64_t index = segment.startIndex;
00944          index <= segment.endIndex;
00945          ++index) {
00946         std::string error;
00947         if (offset >= reader.getFileLength()) {
00948             error = "File too short";
00949         } else {
00950             segment.entries.emplace_back(offset);
00951             error = readProtoFromFile(file, reader, &offset,
00952                                       &segment.entries.back().entry);
00953         }
00954         if (!error.empty()) {
00955             PANIC("Could not read entry %lu in log segment %s "
00956                   "(offset %lu bytes). This indicates the file was "
00957                   "somehow corrupted. Error was: %s",
00958                   index,
00959                   segment.filename.c_str(),
00960                   offset,
00961                   error.c_str());
00962         }
00963     }
00964     if (offset < reader.getFileLength()) {
00965         WARNING("Found an extra %lu bytes at the end of closed segment "
00966                 "%s. This can happen if the server crashed while "
00967                 "truncating the segment. Truncating these now.",
00968                 reader.getFileLength() - offset,
00969                 segment.filename.c_str());
00970         // TODO(ongaro): do we want to save these bytes somewhere?
00971         FS::truncate(file, offset);
00972         FS::fsync(file);
00973     }
00974     segment.bytes = offset;
00975     totalClosedSegmentBytes += segment.bytes;
00976     return true;
00977 }
00978 
00979 bool
00980 SegmentedLog::loadOpenSegment(Segment& segment, uint64_t logStartIndex)
00981 {
00982     assert(segment.isOpen);
00983     FS::File file = FS::openFile(dir, segment.filename, O_RDWR);
00984     FS::FileContents reader(file);
00985     uint64_t offset = 0;
00986 
00987     if (reader.getFileLength() < 1) {
00988         WARNING("Found completely empty segment file %s (it doesn't even have "
00989                 "a version field)",
00990                 segment.filename.c_str());
00991     } else {
00992         uint8_t version = *reader.get<uint8_t>(0, 1);
00993         offset += 1;
00994         if (version != 1) {
00995             uint64_t remainingBytes = reader.getFileLength() - offset;
00996             if (version == 0 &&
00997                 isAllZeros(reader.get(
00998                                offset, remainingBytes), remainingBytes)) {
00999                 // move the offset to the end of the file. allow the
01000                 // existing cleanup mechanism to remove this file.
01001                 offset = reader.getFileLength();
01002             } else {
01003                 PANIC("Segment version read from %s was %u, "
01004                       "but this code can only read version 1",
01005                       segment.filename.c_str(),
01006                       version);
01007             }
01008         }
01009     }
01010 
01011     uint64_t lastIndex = 0;
01012     while (offset < reader.getFileLength()) {
01013         segment.entries.emplace_back(offset);
01014         std::string error = readProtoFromFile(
01015                 file,
01016                 reader,
01017                 &offset,
01018                 &segment.entries.back().entry);
01019         if (!error.empty()) {
01020             segment.entries.pop_back();
01021             uint64_t remainingBytes = reader.getFileLength() - offset;
01022             if (isAllZeros(reader.get(offset, remainingBytes),
01023                            remainingBytes)) {
01024                 WARNING("Truncating %lu zero bytes at the end of log "
01025                         "segment %s (%lu bytes into the segment, "
01026                         "following  entry %lu). This is most likely "
01027                         "because the server shutdown uncleanly.",
01028                         remainingBytes,
01029                         segment.filename.c_str(),
01030                         offset,
01031                         lastIndex);
01032             } else {
01033                 // TODO(ongaro): do we want to save these bytes somewhere?
01034                 WARNING("Could not read entry in log segment %s "
01035                         "(%lu bytes into the segment, following "
01036                         "entry %lu), probably because it was being "
01037                         "written when the server crashed. Discarding the "
01038                         "remainder of the file (%lu bytes). Error was: %s",
01039                         segment.filename.c_str(),
01040                         offset,
01041                         lastIndex,
01042                         remainingBytes,
01043                         error.c_str());
01044             }
01045             FS::truncate(file, offset);
01046             FS::fsync(file);
01047             break;
01048         }
01049         lastIndex = segment.entries.back().entry.index();
01050     }
01051 
01052     bool remove = false;
01053     if (segment.entries.empty()) {
01054         NOTICE("Removing empty segment: %s", segment.filename.c_str());
01055         remove = true;
01056     } else if (segment.entries.back().entry.index() < logStartIndex) {
01057         NOTICE("Removing open segment whose entries are no longer "
01058                "needed (last index is %lu but log start index is %lu): %s",
01059                segment.entries.back().entry.index(),
01060                logStartIndex,
01061                segment.filename.c_str());
01062         remove = true;
01063     }
01064     if (remove) {
01065         FS::removeFile(dir, segment.filename);
01066         FS::fsync(dir);
01067         return false;
01068     } else {
01069         segment.bytes = offset;
01070         totalClosedSegmentBytes += segment.bytes;
01071         segment.isOpen = false;
01072         segment.startIndex = segment.entries.front().entry.index();
01073         segment.endIndex = segment.entries.back().entry.index();
01074         std::string newFilename = segment.makeClosedFilename();
01075         NOTICE("Closing open segment %s, renaming to %s",
01076                 segment.filename.c_str(),
01077                 newFilename.c_str());
01078         FS::rename(dir, segment.filename,
01079                    dir, newFilename);
01080         FS::fsync(dir);
01081         segment.filename = newFilename;
01082         return true;
01083     }
01084 }
01085 
01086 
01087 ////////// SegmentedLog normal operation helper functions //////////
01088 
01089 
01090 void
01091 SegmentedLog::checkInvariants()
01092 {
01093     if (!shouldCheckInvariants)
01094         return;
01095 #if DEBUG
01096     assert(openSegmentFile.fd >= 0);
01097     assert(!segmentsByStartIndex.empty());
01098     assert(logStartIndex >= segmentsByStartIndex.begin()->second.startIndex);
01099     assert(logStartIndex <= segmentsByStartIndex.begin()->second.endIndex + 1);
01100     assert(currentSync.get() != NULL);
01101     uint64_t closedBytes = 0;
01102     for (auto it = segmentsByStartIndex.begin();
01103          it != segmentsByStartIndex.end();
01104          ++it) {
01105         auto next = it;
01106         ++next;
01107         Segment& segment = it->second;
01108         assert(it->first == segment.startIndex);
01109         assert(segment.startIndex > 0);
01110         assert(segment.entries.size() ==
01111                segment.endIndex + 1 - segment.startIndex);
01112         uint64_t lastOffset = 0;
01113         for (uint64_t i = 0; i < segment.entries.size(); ++i) {
01114             assert(segment.entries.at(i).entry.index() ==
01115                    segment.startIndex + i);
01116             if (i == 0)
01117                 assert(segment.entries.at(0).offset == sizeof(SegmentHeader));
01118             else
01119                 assert(segment.entries.at(i).offset > lastOffset);
01120             lastOffset = segment.entries.at(i).offset;
01121         }
01122         if (next == segmentsByStartIndex.end()) {
01123             assert(segment.isOpen);
01124             assert(segment.endIndex >= segment.startIndex - 1);
01125             assert(Core::StringUtil::startsWith(segment.filename, "open-"));
01126             assert(segment.bytes >= sizeof(SegmentHeader));
01127         } else {
01128             assert(!segment.isOpen);
01129             assert(segment.endIndex >= segment.startIndex);
01130             assert(next->second.startIndex == segment.endIndex + 1);
01131             assert(segment.bytes > sizeof(SegmentHeader));
01132             closedBytes += segment.bytes;
01133             assert(segment.filename == segment.makeClosedFilename());
01134         }
01135     }
01136     assert(closedBytes == totalClosedSegmentBytes);
01137 #endif /* DEBUG */
01138 }
01139 
01140 void
01141 SegmentedLog::closeSegment()
01142 {
01143     if (openSegmentFile.fd < 0)
01144         return;
01145     Segment& openSegment = getOpenSegment();
01146     if (openSegment.startIndex > openSegment.endIndex) {
01147         // Segment is empty; just remove it.
01148         NOTICE("Removing empty open segment (start index %lu): %s",
01149                openSegment.startIndex,
01150                openSegment.filename.c_str());
01151         openSegmentFile.close();
01152         FS::removeFile(dir, openSegment.filename);
01153         FS::fsync(dir);
01154         segmentsByStartIndex.erase(openSegment.startIndex);
01155         return;
01156     }
01157 
01158     // Truncate away any extra 0 bytes at the end from when
01159     // MAX_SEGMENT_SIZE was allocated, or in the case of truncateSuffix,
01160     // truncate away actual entries that are no longer desired.
01161     FS::truncate(openSegmentFile, openSegment.bytes);
01162     FS::fsync(openSegmentFile);
01163     openSegmentFile.close();
01164 
01165     // Rename the file.
01166     std::string newFilename = openSegment.makeClosedFilename();
01167     NOTICE("Closing segment (was %s, renaming to %s)",
01168            openSegment.filename.c_str(),
01169            newFilename.c_str());
01170     FS::rename(dir, openSegment.filename,
01171                dir, newFilename);
01172     FS::fsync(dir);
01173     openSegment.filename = newFilename;
01174 
01175     openSegment.isOpen = false;
01176     totalClosedSegmentBytes += openSegment.bytes;
01177 }
01178 
01179 SegmentedLog::Segment&
01180 SegmentedLog::getOpenSegment()
01181 {
01182     assert(!segmentsByStartIndex.empty());
01183     return segmentsByStartIndex.rbegin()->second;
01184 }
01185 
01186 const SegmentedLog::Segment&
01187 SegmentedLog::getOpenSegment() const
01188 {
01189     assert(!segmentsByStartIndex.empty());
01190     return segmentsByStartIndex.rbegin()->second;
01191 }
01192 
01193 void
01194 SegmentedLog::openNewSegment()
01195 {
01196     assert(openSegmentFile.fd < 0);
01197     assert(segmentsByStartIndex.empty() ||
01198            !segmentsByStartIndex.rbegin()->second.isOpen);
01199 
01200     Segment newSegment;
01201     newSegment.isOpen = true;
01202     newSegment.startIndex = getLastLogIndex() + 1;
01203     newSegment.endIndex = newSegment.startIndex - 1;
01204     newSegment.bytes = sizeof(SegmentHeader);
01205     // This can throw ThreadInterruptedException, but it shouldn't ever, since
01206     // this class shouldn't have been destroyed yet.
01207     auto s = preparedSegments.waitForOpenSegment();
01208     newSegment.filename = s.first;
01209     openSegmentFile = std::move(s.second);
01210     segmentsByStartIndex.insert({newSegment.startIndex, newSegment});
01211 }
01212 
01213 std::string
01214 SegmentedLog::readProtoFromFile(const FS::File& file,
01215                                 FS::FileContents& reader,
01216                                 uint64_t* offset,
01217                                 google::protobuf::Message* out) const
01218 {
01219     uint64_t loffset = *offset;
01220     char checksum[Core::Checksum::MAX_LENGTH];
01221     uint64_t bytesRead = reader.copyPartial(loffset, checksum,
01222                                             sizeof(checksum));
01223     uint32_t checksumBytes = Core::Checksum::length(checksum,
01224                                                     uint32_t(bytesRead));
01225     if (checksumBytes == 0)
01226         return format("Missing checksum in file %s", file.path.c_str());
01227     loffset += checksumBytes;
01228 
01229     uint64_t dataLen;
01230     if (reader.copyPartial(loffset, &dataLen, sizeof(dataLen)) <
01231         sizeof(dataLen)) {
01232         return format("Record length truncated in file %s", file.path.c_str());
01233     }
01234     dataLen = be64toh(dataLen);
01235     if (reader.getFileLength() < loffset + sizeof(dataLen) + dataLen) {
01236         return format("ProtoBuf truncated in file %s", file.path.c_str());
01237     }
01238 
01239     const void* checksumCoverage = reader.get(loffset,
01240                                               sizeof(dataLen) + dataLen);
01241     std::string error = Core::Checksum::verify(checksum, checksumCoverage,
01242                                                sizeof(dataLen) + dataLen);
01243     if (!error.empty()) {
01244         return format("Checksum verification failure on %s: %s",
01245                       file.path.c_str(), error.c_str());
01246     }
01247     loffset += sizeof(dataLen);
01248     const void* data = reader.get(loffset, dataLen);
01249     loffset += dataLen;
01250 
01251     switch (encoding) {
01252         case SegmentedLog::Encoding::BINARY: {
01253             Core::Buffer contents(const_cast<void*>(data),
01254                                   dataLen,
01255                                   NULL);
01256             if (!Core::ProtoBuf::parse(contents, *out)) {
01257                 return format("Failed to parse protobuf in %s",
01258                               file.path.c_str());
01259             }
01260             break;
01261         }
01262         case SegmentedLog::Encoding::TEXT: {
01263             std::string contents(static_cast<const char*>(data), dataLen);
01264             Core::ProtoBuf::Internal::fromString(contents, *out);
01265             break;
01266         }
01267     }
01268     *offset = loffset;
01269     return "";
01270 }
01271 
01272 Core::Buffer
01273 SegmentedLog::serializeProto(const google::protobuf::Message& in) const
01274 {
01275     // TODO(ongaro): can the intermediate buffer be avoided?
01276     const void* data = NULL;
01277     uint64_t len = 0;
01278     Core::Buffer binaryContents;
01279     std::string asciiContents;
01280     switch (encoding) {
01281         case SegmentedLog::Encoding::BINARY: {
01282             Core::ProtoBuf::serialize(in, binaryContents);
01283             data = binaryContents.getData();
01284             len = binaryContents.getLength();
01285             break;
01286         }
01287         case SegmentedLog::Encoding::TEXT: {
01288             asciiContents = Core::ProtoBuf::dumpString(in);
01289             data = asciiContents.data();
01290             len = asciiContents.length();
01291             break;
01292         }
01293     }
01294     uint64_t netLen = htobe64(len);
01295     char checksum[Core::Checksum::MAX_LENGTH];
01296     uint32_t checksumLen = Core::Checksum::calculate(
01297         checksumAlgorithm.c_str(), {
01298             {&netLen, sizeof(netLen)},
01299             {data, len},
01300         },
01301         checksum);
01302 
01303     uint64_t totalLen = checksumLen + sizeof(netLen) + len;
01304     char* buf = new char[totalLen];
01305     Core::Buffer record(
01306         buf,
01307         totalLen,
01308         Core::Buffer::deleteArrayFn<char>);
01309     Core::Util::memcpy(buf, {
01310         {checksum, checksumLen},
01311         {&netLen, sizeof(netLen)},
01312         {data, len},
01313     });
01314     return record;
01315 }
01316 
01317 
01318 ////////// SegmentedLog segment preparer thread functions //////////
01319 
01320 std::pair<std::string, FS::File>
01321 SegmentedLog::prepareNewSegment(uint64_t id)
01322 {
01323     TimePoint start = Clock::now();
01324 
01325     std::string filename = format(OPEN_SEGMENT_FORMAT, id);
01326     FS::File file = FS::openFile(dir, filename,
01327                                  O_CREAT|O_EXCL|O_RDWR);
01328     FS::allocate(file, 0, MAX_SEGMENT_SIZE);
01329     SegmentHeader header;
01330     header.version = 1;
01331     ssize_t written = FS::write(file.fd,
01332                                 &header,
01333                                 sizeof(header));
01334     if (written == -1) {
01335         PANIC("Failed to write header to %s: %s",
01336               file.path.c_str(), strerror(errno));
01337     }
01338     FS::fsync(file);
01339     FS::fsync(dir);
01340 
01341     TimePoint end = Clock::now();
01342     std::chrono::nanoseconds elapsed = end - start;
01343     // TODO(ongaro): record elapsed times into RollingStat in a thread-safe way
01344     if (elapsed > diskWriteDurationThreshold) {
01345         WARNING("Preparing open segment file took longer than expected (%s)",
01346                 Core::StringUtil::toString(elapsed).c_str());
01347     }
01348     return {std::move(filename), std::move(file)};
01349 }
01350 
01351 
01352 void
01353 SegmentedLog::segmentPreparerMain()
01354 {
01355     Core::ThreadId::setName("SegmentPreparer");
01356     while (true) {
01357         uint64_t fileId = 0;
01358         try {
01359             fileId = preparedSegments.waitForDemand();
01360         } catch (const Core::Util::ThreadInterruptedException&) {
01361             VERBOSE("Exiting");
01362             break;
01363         }
01364         preparedSegments.submitOpenSegment(
01365             prepareNewSegment(fileId));
01366     }
01367 }
01368 
01369 } // namespace LogCabin::Storage
01370 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines