LogCabin
|
00001 /* Copyright (c) 2012-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #define _BSD_SOURCE 00018 #include <endian.h> 00019 00020 #include <algorithm> 00021 #include <fcntl.h> 00022 #include <sys/stat.h> 00023 #include <unistd.h> 00024 00025 #include "build/Protocol/Raft.pb.h" 00026 #include "Core/Checksum.h" 00027 #include "Core/Debug.h" 00028 #include "Core/ProtoBuf.h" 00029 #include "Core/StringUtil.h" 00030 #include "Core/ThreadId.h" 00031 #include "Core/Time.h" 00032 #include "Core/Util.h" 00033 #include "Storage/FilesystemUtil.h" 00034 #include "Storage/SegmentedLog.h" 00035 #include "Server/Globals.h" 00036 00037 namespace LogCabin { 00038 namespace Storage { 00039 00040 namespace FS = FilesystemUtil; 00041 using Core::StringUtil::format; 00042 00043 namespace { 00044 00045 /** 00046 * Format string for open segment filenames. 00047 * First param: incrementing counter. 00048 */ 00049 #define OPEN_SEGMENT_FORMAT "open-%lu" 00050 00051 /** 00052 * Format string for closed segment filenames. 00053 * First param: start index, inclusive. 00054 * Second param: end index, inclusive. 00055 */ 00056 #define CLOSED_SEGMENT_FORMAT "%020lu-%020lu" 00057 00058 /** 00059 * Return true if all the bytes in range [start, start + length) are zero. 00060 */ 00061 bool 00062 isAllZeros(const void* _start, size_t length) 00063 { 00064 const uint8_t* start = static_cast<const uint8_t*>(_start); 00065 for (size_t offset = 0; offset < length; ++offset) { 00066 if (start[offset] != 0) 00067 return false; 00068 } 00069 return true; 00070 } 00071 00072 } // anonymous namespace 00073 00074 00075 ////////// SegmentedLog::PreparedSegments ////////// 00076 00077 00078 SegmentedLog::PreparedSegments::PreparedSegments(uint64_t queueSize) 00079 : quietForUnitTests(false) 00080 , mutex() 00081 , consumed() 00082 , produced() 00083 , exiting(false) 00084 , demanded(queueSize) 00085 , filenameCounter(0) 00086 , openSegments() 00087 { 00088 } 00089 00090 SegmentedLog::PreparedSegments::~PreparedSegments() 00091 { 00092 } 00093 00094 void 00095 SegmentedLog::PreparedSegments::exit() 00096 { 00097 std::lock_guard<Core::Mutex> lockGuard(mutex); 00098 exiting = true; 00099 consumed.notify_all(); 00100 produced.notify_all(); 00101 } 00102 00103 void 00104 SegmentedLog::PreparedSegments::foundFile(uint64_t fileId) 00105 { 00106 std::lock_guard<Core::Mutex> lockGuard(mutex); 00107 if (filenameCounter < fileId) 00108 filenameCounter = fileId; 00109 } 00110 00111 std::deque<SegmentedLog::PreparedSegments::OpenSegment> 00112 SegmentedLog::PreparedSegments::releaseAll() 00113 { 00114 std::lock_guard<Core::Mutex> lockGuard(mutex); 00115 std::deque<OpenSegment> ret; 00116 std::swap(openSegments, ret); 00117 return ret; 00118 } 00119 00120 void 00121 SegmentedLog::PreparedSegments::submitOpenSegment(OpenSegment segment) 00122 { 00123 std::lock_guard<Core::Mutex> lockGuard(mutex); 00124 openSegments.push_back(std::move(segment)); 00125 produced.notify_one(); 00126 } 00127 00128 uint64_t 00129 SegmentedLog::PreparedSegments::waitForDemand() 00130 { 00131 std::unique_lock<Core::Mutex> lockGuard(mutex); 00132 while (!exiting) { 00133 if (demanded > 0) { 00134 --demanded; 00135 ++filenameCounter; 00136 return filenameCounter; 00137 } 00138 consumed.wait(lockGuard); 00139 } 00140 throw Core::Util::ThreadInterruptedException(); 00141 } 00142 00143 SegmentedLog::PreparedSegments::OpenSegment 00144 SegmentedLog::PreparedSegments::waitForOpenSegment() 00145 { 00146 std::unique_lock<Core::Mutex> lockGuard(mutex); 00147 uint64_t numWaits = 0; 00148 while (true) { 00149 if (exiting) { 00150 NOTICE("Exiting"); 00151 throw Core::Util::ThreadInterruptedException(); 00152 } 00153 if (!openSegments.empty()) { 00154 break; 00155 } 00156 if (numWaits == 0 && !quietForUnitTests) { 00157 WARNING("Prepared segment not ready, having to wait on it. " 00158 "This is perfectly safe but bad for performance. " 00159 "Consider increasing storageOpenSegments in the config."); 00160 } 00161 ++numWaits; 00162 produced.wait(lockGuard); 00163 } 00164 if (numWaits > 0 && !quietForUnitTests) { 00165 WARNING("Done waiting: prepared segment now ready"); 00166 } 00167 OpenSegment r = std::move(openSegments.front()); 00168 openSegments.pop_front(); 00169 consumed.notify_one(); 00170 ++demanded; 00171 return r; 00172 } 00173 00174 00175 ////////// SegmentedLog::Sync ////////// 00176 00177 00178 SegmentedLog::Sync::Sync(uint64_t lastIndex, 00179 std::chrono::nanoseconds diskWriteDurationThreshold) 00180 : Log::Sync(lastIndex) 00181 , diskWriteDurationThreshold(diskWriteDurationThreshold) 00182 , ops() 00183 , waitStart(TimePoint::max()) 00184 , waitEnd(TimePoint::max()) 00185 { 00186 } 00187 00188 SegmentedLog::Sync::~Sync() 00189 { 00190 } 00191 00192 void 00193 SegmentedLog::Sync::optimize() 00194 { 00195 if (ops.size() < 3) 00196 return; 00197 auto prev = ops.begin(); 00198 auto it = prev + 1; 00199 auto next = it + 1; 00200 while (next != ops.end()) { 00201 if (prev->opCode == Op::FDATASYNC && 00202 it->opCode == Op::WRITE && 00203 next->opCode == Op::FDATASYNC && 00204 prev->fd == it->fd && 00205 it->fd == next->fd) { 00206 prev->opCode = Op::NOOP; 00207 } 00208 prev = it; 00209 it = next; 00210 ++next; 00211 } 00212 } 00213 00214 void 00215 SegmentedLog::Sync::wait() 00216 { 00217 optimize(); 00218 00219 waitStart = Clock::now(); 00220 uint64_t writes = 0; 00221 uint64_t totalBytesWritten = 0; 00222 uint64_t truncates = 0; 00223 uint64_t renames = 0; 00224 uint64_t fdatasyncs = 0; 00225 uint64_t fsyncs = 0; 00226 uint64_t closes = 0; 00227 uint64_t unlinks = 0; 00228 00229 while (!ops.empty()) { 00230 Op& op = ops.front(); 00231 FS::File f(op.fd, "-unknown-"); 00232 switch (op.opCode) { 00233 case Op::WRITE: { 00234 ssize_t written = FS::write(op.fd, 00235 op.writeData.getData(), 00236 op.writeData.getLength()); 00237 if (written < 0) { 00238 PANIC("Failed to write to fd %d: %s", 00239 op.fd, 00240 strerror(errno)); 00241 } 00242 ++writes; 00243 totalBytesWritten += op.writeData.getLength(); 00244 break; 00245 } 00246 case Op::TRUNCATE: { 00247 FS::truncate(f, op.size); 00248 ++truncates; 00249 break; 00250 } 00251 case Op::RENAME: { 00252 FS::rename(f, op.filename1, 00253 f, op.filename2); 00254 ++renames; 00255 break; 00256 } 00257 case Op::FDATASYNC: { 00258 FS::fdatasync(f); 00259 ++fdatasyncs; 00260 break; 00261 } 00262 case Op::FSYNC: { 00263 FS::fsync(f); 00264 ++fsyncs; 00265 break; 00266 } 00267 case Op::CLOSE: { 00268 f.close(); 00269 ++closes; 00270 break; 00271 } 00272 case Op::UNLINKAT: { 00273 FS::removeFile(f, op.filename1); 00274 ++unlinks; 00275 break; 00276 } 00277 case Op::NOOP: { 00278 break; 00279 } 00280 } 00281 f.release(); 00282 ops.pop_front(); 00283 } 00284 00285 waitEnd = Clock::now(); 00286 std::chrono::nanoseconds elapsed = waitEnd - waitStart; 00287 if (elapsed > diskWriteDurationThreshold) { 00288 WARNING("Executing filesystem operations took longer than expected " 00289 "(%s for %lu writes totaling %lu bytes, %lu truncates, " 00290 "%lu renames, %lu fdatasyncs, %lu fsyncs, %lu closes, and " 00291 "%lu unlinks)", 00292 Core::StringUtil::toString(elapsed).c_str(), 00293 writes, 00294 totalBytesWritten, 00295 truncates, 00296 renames, 00297 fdatasyncs, 00298 fsyncs, 00299 closes, 00300 unlinks); 00301 } 00302 } 00303 00304 void 00305 SegmentedLog::Sync::updateStats(Core::RollingStat& nanos) const 00306 { 00307 std::chrono::nanoseconds elapsed = waitEnd - waitStart; 00308 nanos.push(uint64_t(elapsed.count())); 00309 if (elapsed > diskWriteDurationThreshold) 00310 nanos.noteExceptional(waitStart, uint64_t(elapsed.count())); 00311 } 00312 00313 00314 ////////// SegmentedLog::Segment::Record ////////// 00315 00316 00317 SegmentedLog::Segment::Record::Record(uint64_t offset) 00318 : offset(offset) 00319 , entry() 00320 { 00321 } 00322 00323 00324 ////////// SegmentedLog::Segment ////////// 00325 00326 SegmentedLog::Segment::Segment() 00327 : isOpen(false) 00328 , startIndex(~0UL) 00329 , endIndex(~0UL - 1) 00330 , bytes(0) 00331 , filename("--invalid--") 00332 , entries() 00333 { 00334 } 00335 00336 std::string 00337 SegmentedLog::Segment::makeClosedFilename() const 00338 { 00339 return format(CLOSED_SEGMENT_FORMAT, 00340 startIndex, endIndex); 00341 } 00342 00343 ////////// SegmentedLog public functions ////////// 00344 00345 00346 SegmentedLog::SegmentedLog(const FS::File& parentDir, 00347 Encoding encoding, 00348 const Core::Config& config) 00349 : encoding(encoding) 00350 , checksumAlgorithm(config.read<std::string>("storageChecksum", "CRC32")) 00351 , MAX_SEGMENT_SIZE(config.read<uint64_t>("storageSegmentBytes", 00352 8 * 1024 * 1024)) 00353 , shouldCheckInvariants(config.read<bool>("storageDebug", false)) 00354 , diskWriteDurationThreshold(config.read<uint64_t>( 00355 "electionTimeoutMilliseconds", 500) / 4) 00356 , metadata() 00357 , dir(FS::openDir(parentDir, 00358 (encoding == Encoding::BINARY 00359 ? "Segmented-Binary" 00360 : "Segmented-Text"))) 00361 , openSegmentFile() 00362 , logStartIndex(1) 00363 , segmentsByStartIndex() 00364 , totalClosedSegmentBytes(0) 00365 , preparedSegments( 00366 std::max(config.read<uint64_t>("storageOpenSegments", 3), 00367 1UL)) 00368 , currentSync(new SegmentedLog::Sync(0, diskWriteDurationThreshold)) 00369 , metadataWriteNanos() 00370 , filesystemOpsNanos() 00371 , segmentPreparer() 00372 { 00373 std::vector<Segment> segments = readSegmentFilenames(); 00374 00375 bool quiet = config.read<bool>("unittest-quiet", false); 00376 preparedSegments.quietForUnitTests = quiet; 00377 SegmentedLogMetadata::Metadata metadata1; 00378 SegmentedLogMetadata::Metadata metadata2; 00379 bool ok1 = readMetadata("metadata1", metadata1, quiet); 00380 bool ok2 = readMetadata("metadata2", metadata2, quiet); 00381 if (ok1 && ok2) { 00382 if (metadata1.version() > metadata2.version()) 00383 metadata = metadata1; 00384 else 00385 metadata = metadata2; 00386 } else if (ok1) { 00387 metadata = metadata1; 00388 } else if (ok2) { 00389 metadata = metadata2; 00390 } else { 00391 // Brand new servers won't have metadata, and that's ok. 00392 if (!segments.empty()) { 00393 PANIC("No readable metadata file but found segments in %s", 00394 dir.path.c_str()); 00395 } 00396 metadata.set_entries_start(logStartIndex); 00397 } 00398 00399 logStartIndex = metadata.entries_start(); 00400 Log::metadata = metadata.raft_metadata(); 00401 // Write both metadata files 00402 updateMetadata(); 00403 updateMetadata(); 00404 FS::fsync(dir); // in case metadata files didn't exist 00405 00406 00407 // Read data from segments, closing any open segments. 00408 for (auto it = segments.begin(); it != segments.end(); ++it) { 00409 Segment& segment = *it; 00410 bool keep = segment.isOpen ? loadOpenSegment(segment, logStartIndex) 00411 : loadClosedSegment(segment, logStartIndex); 00412 if (keep) { 00413 assert(!segment.isOpen); 00414 uint64_t startIndex = segment.startIndex; 00415 std::string filename = segment.filename; 00416 auto result = segmentsByStartIndex.insert({startIndex, 00417 std::move(segment)}); 00418 if (!result.second) { 00419 Segment& other = result.first->second; 00420 PANIC("Two segments contain entry %lu: %s and %s", 00421 startIndex, 00422 other.filename.c_str(), 00423 filename.c_str()); 00424 } 00425 } 00426 } 00427 00428 // Check to make sure no entry is present in more than one segment, 00429 // and that there's no gap in the numbering for entries we have. 00430 if (!segmentsByStartIndex.empty()) { 00431 uint64_t nextIndex = segmentsByStartIndex.begin()->first; 00432 for (auto it = segmentsByStartIndex.begin(); 00433 it != segmentsByStartIndex.end(); 00434 ++it) { 00435 Segment& segment = it->second; 00436 if (nextIndex < segment.startIndex) { 00437 PANIC("Did not find segment containing entries " 00438 "%lu to %lu (inclusive)", 00439 nextIndex, segment.startIndex - 1); 00440 } else if (segment.startIndex < nextIndex) { 00441 PANIC("Segment %s contains duplicate entries " 00442 "%lu to %lu (inclusive)", 00443 segment.filename.c_str(), 00444 segment.startIndex, 00445 std::min(segment.endIndex, 00446 nextIndex - 1)); 00447 } 00448 nextIndex = segment.endIndex + 1; 00449 } 00450 } 00451 00452 // Open a segment to write new entries into. 00453 uint64_t fileId = preparedSegments.waitForDemand(); 00454 preparedSegments.submitOpenSegment( 00455 prepareNewSegment(fileId)); 00456 openNewSegment(); 00457 00458 // Launch the segment preparer thread so that we'll have a source for 00459 // additional new segments. 00460 segmentPreparer = std::thread(&SegmentedLog::segmentPreparerMain, this); 00461 00462 checkInvariants(); 00463 } 00464 00465 SegmentedLog::~SegmentedLog() 00466 { 00467 NOTICE("Closing open segment"); 00468 closeSegment(); 00469 00470 // Stop preparing segments and delete the extras. 00471 preparedSegments.exit(); 00472 if (segmentPreparer.joinable()) 00473 segmentPreparer.join(); 00474 auto prepared = preparedSegments.releaseAll(); 00475 while (!prepared.empty()) { 00476 std::string filename = prepared.front().first; 00477 NOTICE("Removing unused open segment: %s", 00478 filename.c_str()); 00479 FS::removeFile(dir, filename); 00480 prepared.pop_front(); 00481 } 00482 FS::fsync(dir); 00483 00484 // Keep assertion in Log.h happy. No need to "take" and "complete" this 00485 // sync since no operations were performed. 00486 if (currentSync->ops.empty()) 00487 currentSync->completed = true; 00488 } 00489 00490 std::pair<uint64_t, uint64_t> 00491 SegmentedLog::append(const std::vector<const Entry*>& entries) 00492 { 00493 Segment* openSegment = &getOpenSegment(); 00494 uint64_t startIndex = openSegment->endIndex + 1; 00495 uint64_t index = startIndex; 00496 for (auto it = entries.begin(); it != entries.end(); ++it) { 00497 Segment::Record record(openSegment->bytes); 00498 // Note that record.offset may change later, if this entry doesn't fit. 00499 record.entry = **it; 00500 if (record.entry.has_index()) { 00501 assert(index == record.entry.index()); 00502 } else { 00503 record.entry.set_index(index); 00504 } 00505 Core::Buffer buf = serializeProto(record.entry); 00506 00507 // See if we need to roll over to a new head segment. If someone is 00508 // writing an entry that is bigger than MAX_SEGMENT_SIZE, just put it 00509 // in its own segment. This duplicates some code from closeSegment(), 00510 // but queues up the operations into 'currentSync'. 00511 if (openSegment->bytes > sizeof(SegmentHeader) && 00512 openSegment->bytes + buf.getLength() > MAX_SEGMENT_SIZE) { 00513 NOTICE("Rolling over to new head segment: trying to append new " 00514 "entry that is %lu bytes long, but open segment is already " 00515 "%lu of %lu bytes large", 00516 buf.getLength(), 00517 openSegment->bytes, 00518 MAX_SEGMENT_SIZE); 00519 00520 // Truncate away any extra 0 bytes at the end from when 00521 // MAX_SEGMENT_SIZE was allocated. 00522 currentSync->ops.emplace_back(openSegmentFile.fd, 00523 Sync::Op::TRUNCATE); 00524 currentSync->ops.back().size = openSegment->bytes; 00525 currentSync->ops.emplace_back(openSegmentFile.fd, 00526 Sync::Op::FSYNC); 00527 currentSync->ops.emplace_back(openSegmentFile.release(), 00528 Sync::Op::CLOSE); 00529 00530 // Rename the file. 00531 std::string newFilename = openSegment->makeClosedFilename(); 00532 NOTICE("Closing full segment (was %s, renaming to %s)", 00533 openSegment->filename.c_str(), 00534 newFilename.c_str()); 00535 currentSync->ops.emplace_back(dir.fd, Sync::Op::RENAME); 00536 currentSync->ops.back().filename1 = openSegment->filename; 00537 currentSync->ops.back().filename2 = newFilename; 00538 currentSync->ops.emplace_back(dir.fd, Sync::Op::FSYNC); 00539 openSegment->filename = newFilename; 00540 00541 // Bookkeeping. 00542 openSegment->isOpen = false; 00543 totalClosedSegmentBytes += openSegment->bytes; 00544 00545 // Open new segment. 00546 openNewSegment(); 00547 openSegment = &getOpenSegment(); 00548 record.offset = openSegment->bytes; 00549 } 00550 00551 if (buf.getLength() > MAX_SEGMENT_SIZE) { 00552 WARNING("Trying to append an entry of %lu bytes when the maximum " 00553 "segment size is %lu bytes. Placing this entry in its own " 00554 "segment. Consider adjusting 'storageSegmentBytes' in the " 00555 "config.", 00556 buf.getLength(), 00557 MAX_SEGMENT_SIZE); 00558 } 00559 00560 openSegment->entries.emplace_back(std::move(record)); 00561 openSegment->bytes += buf.getLength(); 00562 currentSync->ops.emplace_back(openSegmentFile.fd, Sync::Op::WRITE); 00563 currentSync->ops.back().writeData = std::move(buf); 00564 ++openSegment->endIndex; 00565 ++index; 00566 } 00567 00568 currentSync->ops.emplace_back(openSegmentFile.fd, Sync::Op::FDATASYNC); 00569 currentSync->lastIndex = getLastLogIndex(); 00570 checkInvariants(); 00571 return {startIndex, getLastLogIndex()}; 00572 } 00573 00574 const SegmentedLog::Entry& 00575 SegmentedLog::getEntry(uint64_t index) const 00576 { 00577 if (index < getLogStartIndex() || 00578 index > getLastLogIndex()) { 00579 PANIC("Attempted to access entry %lu outside of log " 00580 "(start index is %lu, last index is %lu)", 00581 index, getLogStartIndex(), getLastLogIndex()); 00582 } 00583 auto it = segmentsByStartIndex.upper_bound(index); 00584 --it; 00585 const Segment& segment = it->second; 00586 assert(segment.startIndex <= index); 00587 assert(index <= segment.endIndex); 00588 return segment.entries.at(index - segment.startIndex).entry; 00589 } 00590 00591 uint64_t 00592 SegmentedLog::getLogStartIndex() const 00593 { 00594 return logStartIndex; 00595 } 00596 00597 uint64_t 00598 SegmentedLog::getLastLogIndex() const 00599 { 00600 // Although it's a class invariant that there's always an open segment, 00601 // it's convenient to be able to call this as a helper function when there 00602 // are no segments. 00603 if (segmentsByStartIndex.empty()) 00604 return logStartIndex - 1; 00605 else 00606 return getOpenSegment().endIndex; 00607 } 00608 00609 std::string 00610 SegmentedLog::getName() const 00611 { 00612 if (encoding == Encoding::BINARY) 00613 return "Segmented-Binary"; 00614 else 00615 return "Segmented-Text"; 00616 } 00617 00618 uint64_t 00619 SegmentedLog::getSizeBytes() const 00620 { 00621 return totalClosedSegmentBytes + getOpenSegment().bytes; 00622 } 00623 00624 std::unique_ptr<Log::Sync> 00625 SegmentedLog::takeSync() 00626 { 00627 std::unique_ptr<SegmentedLog::Sync> other( 00628 new SegmentedLog::Sync(getLastLogIndex(), 00629 diskWriteDurationThreshold)); 00630 std::swap(other, currentSync); 00631 return std::move(other); 00632 } 00633 00634 void 00635 SegmentedLog::syncCompleteVirtual(std::unique_ptr<Log::Sync> sync) 00636 { 00637 static_cast<SegmentedLog::Sync*>(sync.get())-> 00638 updateStats(filesystemOpsNanos); 00639 } 00640 00641 void 00642 SegmentedLog::truncatePrefix(uint64_t newStartIndex) 00643 { 00644 if (newStartIndex <= logStartIndex) 00645 return; 00646 00647 NOTICE("Truncating log to start at index %lu (was %lu)", 00648 newStartIndex, logStartIndex); 00649 logStartIndex = newStartIndex; 00650 // update metadata before removing files in case of interruption 00651 updateMetadata(); 00652 00653 while (!segmentsByStartIndex.empty()) { 00654 Segment& segment = segmentsByStartIndex.begin()->second; 00655 if (logStartIndex <= segment.endIndex) 00656 break; 00657 NOTICE("Deleting unneeded segment %s (its end index is %lu)", 00658 segment.filename.c_str(), 00659 segment.endIndex); 00660 currentSync->ops.emplace_back(dir.fd, Sync::Op::UNLINKAT); 00661 currentSync->ops.back().filename1 = segment.filename; 00662 if (segment.isOpen) { 00663 currentSync->ops.emplace_back(openSegmentFile.release(), 00664 Sync::Op::CLOSE); 00665 } else { 00666 totalClosedSegmentBytes -= segment.bytes; 00667 } 00668 segmentsByStartIndex.erase(segmentsByStartIndex.begin()); 00669 } 00670 00671 if (segmentsByStartIndex.empty()) 00672 openNewSegment(); 00673 if (currentSync->lastIndex < logStartIndex - 1) 00674 currentSync->lastIndex = logStartIndex - 1; 00675 checkInvariants(); 00676 } 00677 00678 void 00679 SegmentedLog::truncateSuffix(uint64_t newEndIndex) 00680 { 00681 if (newEndIndex >= getLastLogIndex()) 00682 return; 00683 00684 NOTICE("Truncating log to end at index %lu (was %lu)", 00685 newEndIndex, getLastLogIndex()); 00686 { // Check if the open segment has some entries we need. If so, 00687 // just truncate that segment, open a new one, and return. 00688 Segment& openSegment = getOpenSegment(); 00689 if (newEndIndex >= openSegment.startIndex) { 00690 // Update in-memory segment 00691 uint64_t i = newEndIndex + 1 - openSegment.startIndex; 00692 openSegment.bytes = openSegment.entries.at(i).offset; 00693 openSegment.entries.erase( 00694 openSegment.entries.begin() + int64_t(i), 00695 openSegment.entries.end()); 00696 openSegment.endIndex = newEndIndex; 00697 // Truncate and close the open segment, and open a new one. 00698 closeSegment(); 00699 openNewSegment(); 00700 checkInvariants(); 00701 return; 00702 } 00703 } 00704 00705 { // Remove the open segment. 00706 Segment& openSegment = getOpenSegment(); 00707 openSegment.endIndex = openSegment.startIndex - 1; 00708 openSegment.bytes = 0; 00709 closeSegment(); 00710 } 00711 00712 // Remove and/or truncate closed segments. 00713 while (!segmentsByStartIndex.empty()) { 00714 auto it = segmentsByStartIndex.rbegin(); 00715 Segment& segment = it->second; 00716 if (segment.endIndex == newEndIndex) 00717 break; 00718 if (segment.startIndex > newEndIndex) { // remove segment 00719 NOTICE("Removing closed segment %s", segment.filename.c_str()); 00720 FS::removeFile(dir, segment.filename); 00721 FS::fsync(dir); 00722 totalClosedSegmentBytes -= segment.bytes; 00723 segmentsByStartIndex.erase(segment.startIndex); 00724 } else if (segment.endIndex > newEndIndex) { // truncate segment 00725 // Update in-memory segment 00726 uint64_t i = newEndIndex + 1 - segment.startIndex; 00727 uint64_t newBytes = segment.entries.at(i).offset; 00728 totalClosedSegmentBytes -= (segment.bytes - newBytes); 00729 segment.bytes = newBytes; 00730 segment.entries.erase( 00731 segment.entries.begin() + int64_t(i), 00732 segment.entries.end()); 00733 segment.endIndex = newEndIndex; 00734 00735 // Rename the file 00736 std::string newFilename = segment.makeClosedFilename(); 00737 NOTICE("Truncating closed segment (was %s, renaming to %s)", 00738 segment.filename.c_str(), 00739 newFilename.c_str()); 00740 FS::rename(dir, segment.filename, 00741 dir, newFilename); 00742 FS::fsync(dir); 00743 segment.filename = newFilename; 00744 00745 // Truncate the file 00746 FS::File f = FS::openFile(dir, segment.filename, O_WRONLY); 00747 FS::truncate(f, segment.bytes); 00748 FS::fsync(f); 00749 } 00750 } 00751 00752 // Reopen a segment (so that we can write again) 00753 openNewSegment(); 00754 checkInvariants(); 00755 } 00756 00757 void 00758 SegmentedLog::updateMetadata() 00759 { 00760 if (Log::metadata.ByteSize() == 0) 00761 metadata.clear_raft_metadata(); 00762 else 00763 *metadata.mutable_raft_metadata() = Log::metadata; 00764 metadata.set_format_version(1); 00765 metadata.set_entries_start(logStartIndex); 00766 metadata.set_version(metadata.version() + 1); 00767 std::string filename; 00768 if (metadata.version() % 2 == 1) { 00769 filename = "metadata1"; 00770 } else { 00771 filename = "metadata2"; 00772 } 00773 00774 TimePoint start = Clock::now(); 00775 00776 NOTICE("Writing new storage metadata (version %lu) to %s", 00777 metadata.version(), 00778 filename.c_str()); 00779 FS::File file = FS::openFile(dir, filename, O_CREAT|O_WRONLY|O_TRUNC); 00780 Core::Buffer record = serializeProto(metadata); 00781 ssize_t written = FS::write(file.fd, 00782 record.getData(), 00783 record.getLength()); 00784 if (written == -1) { 00785 PANIC("Failed to write to %s: %s", 00786 file.path.c_str(), strerror(errno)); 00787 } 00788 FS::fsync(file); 00789 00790 TimePoint end = Clock::now(); 00791 std::chrono::nanoseconds elapsed = end - start; 00792 metadataWriteNanos.push(uint64_t(elapsed.count())); 00793 if (elapsed > diskWriteDurationThreshold) { 00794 WARNING("Writing metadata file took longer than expected " 00795 "(%s for %lu bytes)", 00796 Core::StringUtil::toString(elapsed).c_str(), 00797 record.getLength()); 00798 metadataWriteNanos.noteExceptional(start, uint64_t(elapsed.count())); 00799 } 00800 } 00801 00802 void 00803 SegmentedLog::updateServerStats(Protocol::ServerStats& serverStats) const 00804 { 00805 Protocol::ServerStats::Storage& stats = *serverStats.mutable_storage(); 00806 stats.set_num_segments(segmentsByStartIndex.size()); 00807 stats.set_open_segment_bytes(getOpenSegment().bytes); 00808 stats.set_metadata_version(metadata.version()); 00809 metadataWriteNanos.updateProtoBuf(*stats.mutable_metadata_write_nanos()); 00810 filesystemOpsNanos.updateProtoBuf(*stats.mutable_filesystem_ops_nanos()); 00811 } 00812 00813 00814 ////////// SegmentedLog initialization helper functions ////////// 00815 00816 00817 std::vector<SegmentedLog::Segment> 00818 SegmentedLog::readSegmentFilenames() 00819 { 00820 std::vector<Segment> segments; 00821 std::vector<std::string> filenames = FS::ls(dir); 00822 // sorting isn't strictly necessary, but it helps with unit tests 00823 std::sort(filenames.begin(), filenames.end()); 00824 for (auto it = filenames.begin(); it != filenames.end(); ++it) { 00825 const std::string& filename = *it; 00826 if (filename == "metadata1" || 00827 filename == "metadata2") { 00828 continue; 00829 } 00830 Segment segment; 00831 segment.filename = filename; 00832 segment.bytes = 0; 00833 { // Closed segment: xxx-yyy 00834 uint64_t startIndex = 1; 00835 uint64_t endIndex = 0; 00836 unsigned bytesConsumed; 00837 int matched = sscanf(filename.c_str(), 00838 CLOSED_SEGMENT_FORMAT "%n", 00839 &startIndex, &endIndex, 00840 &bytesConsumed); 00841 if (matched == 2 && bytesConsumed == filename.length()) { 00842 segment.isOpen = false; 00843 segment.startIndex = startIndex; 00844 segment.endIndex = endIndex; 00845 segments.push_back(segment); 00846 continue; 00847 } 00848 } 00849 00850 { // Open segment: open-xxx 00851 uint64_t counter; 00852 unsigned bytesConsumed; 00853 int matched = sscanf(filename.c_str(), 00854 OPEN_SEGMENT_FORMAT "%n", 00855 &counter, 00856 &bytesConsumed); 00857 if (matched == 1 && bytesConsumed == filename.length()) { 00858 segment.isOpen = true; 00859 segment.startIndex = ~0UL; 00860 segment.endIndex = ~0UL - 1; 00861 segments.push_back(segment); 00862 preparedSegments.foundFile(counter); 00863 continue; 00864 } 00865 } 00866 00867 // Neither 00868 WARNING("%s doesn't look like a valid segment filename (from %s)", 00869 filename.c_str(), 00870 (dir.path + "/" + filename).c_str()); 00871 } 00872 return segments; 00873 } 00874 00875 bool 00876 SegmentedLog::readMetadata(const std::string& filename, 00877 SegmentedLogMetadata::Metadata& metadata, 00878 bool quiet) const 00879 { 00880 std::string error; 00881 FS::File file = FS::tryOpenFile(dir, filename, O_RDONLY); 00882 if (file.fd == -1) { 00883 error = format("Could not open %s/%s: %s", 00884 dir.path.c_str(), filename.c_str(), strerror(errno)); 00885 } else { 00886 FS::FileContents reader(file); 00887 uint64_t offset = 0; 00888 error = readProtoFromFile(file, reader, &offset, &metadata); 00889 } 00890 if (error.empty()) { 00891 if (metadata.format_version() > 1) { 00892 PANIC("The format version found in %s is %lu but this code " 00893 "only understands version 1", 00894 filename.c_str(), 00895 metadata.format_version()); 00896 } 00897 NOTICE("Read metadata version %lu from %s", 00898 metadata.version(), filename.c_str()); 00899 return true; 00900 } else { 00901 if (!quiet) { 00902 WARNING("Error reading metadata from %s: %s", 00903 filename.c_str(), error.c_str()); 00904 } 00905 return false; 00906 } 00907 } 00908 00909 bool 00910 SegmentedLog::loadClosedSegment(Segment& segment, uint64_t logStartIndex) 00911 { 00912 assert(!segment.isOpen); 00913 FS::File file = FS::openFile(dir, segment.filename, O_RDWR); 00914 FS::FileContents reader(file); 00915 uint64_t offset = 0; 00916 00917 if (reader.getFileLength() < 1) { 00918 PANIC("Found completely empty segment file %s (it doesn't even have " 00919 "a version field)", 00920 segment.filename.c_str()); 00921 } else { 00922 uint8_t version = *reader.get<uint8_t>(0, 1); 00923 offset += 1; 00924 if (version != 1) { 00925 PANIC("Segment version read from %s was %u, but this code can " 00926 "only read version 1", 00927 segment.filename.c_str(), 00928 version); 00929 } 00930 } 00931 00932 if (segment.endIndex < logStartIndex) { 00933 NOTICE("Removing closed segment whose entries are no longer " 00934 "needed (last index is %lu but log start index is %lu): %s", 00935 segment.endIndex, 00936 logStartIndex, 00937 segment.filename.c_str()); 00938 FS::removeFile(dir, segment.filename); 00939 FS::fsync(dir); 00940 return false; 00941 } 00942 00943 for (uint64_t index = segment.startIndex; 00944 index <= segment.endIndex; 00945 ++index) { 00946 std::string error; 00947 if (offset >= reader.getFileLength()) { 00948 error = "File too short"; 00949 } else { 00950 segment.entries.emplace_back(offset); 00951 error = readProtoFromFile(file, reader, &offset, 00952 &segment.entries.back().entry); 00953 } 00954 if (!error.empty()) { 00955 PANIC("Could not read entry %lu in log segment %s " 00956 "(offset %lu bytes). This indicates the file was " 00957 "somehow corrupted. Error was: %s", 00958 index, 00959 segment.filename.c_str(), 00960 offset, 00961 error.c_str()); 00962 } 00963 } 00964 if (offset < reader.getFileLength()) { 00965 WARNING("Found an extra %lu bytes at the end of closed segment " 00966 "%s. This can happen if the server crashed while " 00967 "truncating the segment. Truncating these now.", 00968 reader.getFileLength() - offset, 00969 segment.filename.c_str()); 00970 // TODO(ongaro): do we want to save these bytes somewhere? 00971 FS::truncate(file, offset); 00972 FS::fsync(file); 00973 } 00974 segment.bytes = offset; 00975 totalClosedSegmentBytes += segment.bytes; 00976 return true; 00977 } 00978 00979 bool 00980 SegmentedLog::loadOpenSegment(Segment& segment, uint64_t logStartIndex) 00981 { 00982 assert(segment.isOpen); 00983 FS::File file = FS::openFile(dir, segment.filename, O_RDWR); 00984 FS::FileContents reader(file); 00985 uint64_t offset = 0; 00986 00987 if (reader.getFileLength() < 1) { 00988 WARNING("Found completely empty segment file %s (it doesn't even have " 00989 "a version field)", 00990 segment.filename.c_str()); 00991 } else { 00992 uint8_t version = *reader.get<uint8_t>(0, 1); 00993 offset += 1; 00994 if (version != 1) { 00995 uint64_t remainingBytes = reader.getFileLength() - offset; 00996 if (version == 0 && 00997 isAllZeros(reader.get( 00998 offset, remainingBytes), remainingBytes)) { 00999 // move the offset to the end of the file. allow the 01000 // existing cleanup mechanism to remove this file. 01001 offset = reader.getFileLength(); 01002 } else { 01003 PANIC("Segment version read from %s was %u, " 01004 "but this code can only read version 1", 01005 segment.filename.c_str(), 01006 version); 01007 } 01008 } 01009 } 01010 01011 uint64_t lastIndex = 0; 01012 while (offset < reader.getFileLength()) { 01013 segment.entries.emplace_back(offset); 01014 std::string error = readProtoFromFile( 01015 file, 01016 reader, 01017 &offset, 01018 &segment.entries.back().entry); 01019 if (!error.empty()) { 01020 segment.entries.pop_back(); 01021 uint64_t remainingBytes = reader.getFileLength() - offset; 01022 if (isAllZeros(reader.get(offset, remainingBytes), 01023 remainingBytes)) { 01024 WARNING("Truncating %lu zero bytes at the end of log " 01025 "segment %s (%lu bytes into the segment, " 01026 "following entry %lu). This is most likely " 01027 "because the server shutdown uncleanly.", 01028 remainingBytes, 01029 segment.filename.c_str(), 01030 offset, 01031 lastIndex); 01032 } else { 01033 // TODO(ongaro): do we want to save these bytes somewhere? 01034 WARNING("Could not read entry in log segment %s " 01035 "(%lu bytes into the segment, following " 01036 "entry %lu), probably because it was being " 01037 "written when the server crashed. Discarding the " 01038 "remainder of the file (%lu bytes). Error was: %s", 01039 segment.filename.c_str(), 01040 offset, 01041 lastIndex, 01042 remainingBytes, 01043 error.c_str()); 01044 } 01045 FS::truncate(file, offset); 01046 FS::fsync(file); 01047 break; 01048 } 01049 lastIndex = segment.entries.back().entry.index(); 01050 } 01051 01052 bool remove = false; 01053 if (segment.entries.empty()) { 01054 NOTICE("Removing empty segment: %s", segment.filename.c_str()); 01055 remove = true; 01056 } else if (segment.entries.back().entry.index() < logStartIndex) { 01057 NOTICE("Removing open segment whose entries are no longer " 01058 "needed (last index is %lu but log start index is %lu): %s", 01059 segment.entries.back().entry.index(), 01060 logStartIndex, 01061 segment.filename.c_str()); 01062 remove = true; 01063 } 01064 if (remove) { 01065 FS::removeFile(dir, segment.filename); 01066 FS::fsync(dir); 01067 return false; 01068 } else { 01069 segment.bytes = offset; 01070 totalClosedSegmentBytes += segment.bytes; 01071 segment.isOpen = false; 01072 segment.startIndex = segment.entries.front().entry.index(); 01073 segment.endIndex = segment.entries.back().entry.index(); 01074 std::string newFilename = segment.makeClosedFilename(); 01075 NOTICE("Closing open segment %s, renaming to %s", 01076 segment.filename.c_str(), 01077 newFilename.c_str()); 01078 FS::rename(dir, segment.filename, 01079 dir, newFilename); 01080 FS::fsync(dir); 01081 segment.filename = newFilename; 01082 return true; 01083 } 01084 } 01085 01086 01087 ////////// SegmentedLog normal operation helper functions ////////// 01088 01089 01090 void 01091 SegmentedLog::checkInvariants() 01092 { 01093 if (!shouldCheckInvariants) 01094 return; 01095 #if DEBUG 01096 assert(openSegmentFile.fd >= 0); 01097 assert(!segmentsByStartIndex.empty()); 01098 assert(logStartIndex >= segmentsByStartIndex.begin()->second.startIndex); 01099 assert(logStartIndex <= segmentsByStartIndex.begin()->second.endIndex + 1); 01100 assert(currentSync.get() != NULL); 01101 uint64_t closedBytes = 0; 01102 for (auto it = segmentsByStartIndex.begin(); 01103 it != segmentsByStartIndex.end(); 01104 ++it) { 01105 auto next = it; 01106 ++next; 01107 Segment& segment = it->second; 01108 assert(it->first == segment.startIndex); 01109 assert(segment.startIndex > 0); 01110 assert(segment.entries.size() == 01111 segment.endIndex + 1 - segment.startIndex); 01112 uint64_t lastOffset = 0; 01113 for (uint64_t i = 0; i < segment.entries.size(); ++i) { 01114 assert(segment.entries.at(i).entry.index() == 01115 segment.startIndex + i); 01116 if (i == 0) 01117 assert(segment.entries.at(0).offset == sizeof(SegmentHeader)); 01118 else 01119 assert(segment.entries.at(i).offset > lastOffset); 01120 lastOffset = segment.entries.at(i).offset; 01121 } 01122 if (next == segmentsByStartIndex.end()) { 01123 assert(segment.isOpen); 01124 assert(segment.endIndex >= segment.startIndex - 1); 01125 assert(Core::StringUtil::startsWith(segment.filename, "open-")); 01126 assert(segment.bytes >= sizeof(SegmentHeader)); 01127 } else { 01128 assert(!segment.isOpen); 01129 assert(segment.endIndex >= segment.startIndex); 01130 assert(next->second.startIndex == segment.endIndex + 1); 01131 assert(segment.bytes > sizeof(SegmentHeader)); 01132 closedBytes += segment.bytes; 01133 assert(segment.filename == segment.makeClosedFilename()); 01134 } 01135 } 01136 assert(closedBytes == totalClosedSegmentBytes); 01137 #endif /* DEBUG */ 01138 } 01139 01140 void 01141 SegmentedLog::closeSegment() 01142 { 01143 if (openSegmentFile.fd < 0) 01144 return; 01145 Segment& openSegment = getOpenSegment(); 01146 if (openSegment.startIndex > openSegment.endIndex) { 01147 // Segment is empty; just remove it. 01148 NOTICE("Removing empty open segment (start index %lu): %s", 01149 openSegment.startIndex, 01150 openSegment.filename.c_str()); 01151 openSegmentFile.close(); 01152 FS::removeFile(dir, openSegment.filename); 01153 FS::fsync(dir); 01154 segmentsByStartIndex.erase(openSegment.startIndex); 01155 return; 01156 } 01157 01158 // Truncate away any extra 0 bytes at the end from when 01159 // MAX_SEGMENT_SIZE was allocated, or in the case of truncateSuffix, 01160 // truncate away actual entries that are no longer desired. 01161 FS::truncate(openSegmentFile, openSegment.bytes); 01162 FS::fsync(openSegmentFile); 01163 openSegmentFile.close(); 01164 01165 // Rename the file. 01166 std::string newFilename = openSegment.makeClosedFilename(); 01167 NOTICE("Closing segment (was %s, renaming to %s)", 01168 openSegment.filename.c_str(), 01169 newFilename.c_str()); 01170 FS::rename(dir, openSegment.filename, 01171 dir, newFilename); 01172 FS::fsync(dir); 01173 openSegment.filename = newFilename; 01174 01175 openSegment.isOpen = false; 01176 totalClosedSegmentBytes += openSegment.bytes; 01177 } 01178 01179 SegmentedLog::Segment& 01180 SegmentedLog::getOpenSegment() 01181 { 01182 assert(!segmentsByStartIndex.empty()); 01183 return segmentsByStartIndex.rbegin()->second; 01184 } 01185 01186 const SegmentedLog::Segment& 01187 SegmentedLog::getOpenSegment() const 01188 { 01189 assert(!segmentsByStartIndex.empty()); 01190 return segmentsByStartIndex.rbegin()->second; 01191 } 01192 01193 void 01194 SegmentedLog::openNewSegment() 01195 { 01196 assert(openSegmentFile.fd < 0); 01197 assert(segmentsByStartIndex.empty() || 01198 !segmentsByStartIndex.rbegin()->second.isOpen); 01199 01200 Segment newSegment; 01201 newSegment.isOpen = true; 01202 newSegment.startIndex = getLastLogIndex() + 1; 01203 newSegment.endIndex = newSegment.startIndex - 1; 01204 newSegment.bytes = sizeof(SegmentHeader); 01205 // This can throw ThreadInterruptedException, but it shouldn't ever, since 01206 // this class shouldn't have been destroyed yet. 01207 auto s = preparedSegments.waitForOpenSegment(); 01208 newSegment.filename = s.first; 01209 openSegmentFile = std::move(s.second); 01210 segmentsByStartIndex.insert({newSegment.startIndex, newSegment}); 01211 } 01212 01213 std::string 01214 SegmentedLog::readProtoFromFile(const FS::File& file, 01215 FS::FileContents& reader, 01216 uint64_t* offset, 01217 google::protobuf::Message* out) const 01218 { 01219 uint64_t loffset = *offset; 01220 char checksum[Core::Checksum::MAX_LENGTH]; 01221 uint64_t bytesRead = reader.copyPartial(loffset, checksum, 01222 sizeof(checksum)); 01223 uint32_t checksumBytes = Core::Checksum::length(checksum, 01224 uint32_t(bytesRead)); 01225 if (checksumBytes == 0) 01226 return format("Missing checksum in file %s", file.path.c_str()); 01227 loffset += checksumBytes; 01228 01229 uint64_t dataLen; 01230 if (reader.copyPartial(loffset, &dataLen, sizeof(dataLen)) < 01231 sizeof(dataLen)) { 01232 return format("Record length truncated in file %s", file.path.c_str()); 01233 } 01234 dataLen = be64toh(dataLen); 01235 if (reader.getFileLength() < loffset + sizeof(dataLen) + dataLen) { 01236 return format("ProtoBuf truncated in file %s", file.path.c_str()); 01237 } 01238 01239 const void* checksumCoverage = reader.get(loffset, 01240 sizeof(dataLen) + dataLen); 01241 std::string error = Core::Checksum::verify(checksum, checksumCoverage, 01242 sizeof(dataLen) + dataLen); 01243 if (!error.empty()) { 01244 return format("Checksum verification failure on %s: %s", 01245 file.path.c_str(), error.c_str()); 01246 } 01247 loffset += sizeof(dataLen); 01248 const void* data = reader.get(loffset, dataLen); 01249 loffset += dataLen; 01250 01251 switch (encoding) { 01252 case SegmentedLog::Encoding::BINARY: { 01253 Core::Buffer contents(const_cast<void*>(data), 01254 dataLen, 01255 NULL); 01256 if (!Core::ProtoBuf::parse(contents, *out)) { 01257 return format("Failed to parse protobuf in %s", 01258 file.path.c_str()); 01259 } 01260 break; 01261 } 01262 case SegmentedLog::Encoding::TEXT: { 01263 std::string contents(static_cast<const char*>(data), dataLen); 01264 Core::ProtoBuf::Internal::fromString(contents, *out); 01265 break; 01266 } 01267 } 01268 *offset = loffset; 01269 return ""; 01270 } 01271 01272 Core::Buffer 01273 SegmentedLog::serializeProto(const google::protobuf::Message& in) const 01274 { 01275 // TODO(ongaro): can the intermediate buffer be avoided? 01276 const void* data = NULL; 01277 uint64_t len = 0; 01278 Core::Buffer binaryContents; 01279 std::string asciiContents; 01280 switch (encoding) { 01281 case SegmentedLog::Encoding::BINARY: { 01282 Core::ProtoBuf::serialize(in, binaryContents); 01283 data = binaryContents.getData(); 01284 len = binaryContents.getLength(); 01285 break; 01286 } 01287 case SegmentedLog::Encoding::TEXT: { 01288 asciiContents = Core::ProtoBuf::dumpString(in); 01289 data = asciiContents.data(); 01290 len = asciiContents.length(); 01291 break; 01292 } 01293 } 01294 uint64_t netLen = htobe64(len); 01295 char checksum[Core::Checksum::MAX_LENGTH]; 01296 uint32_t checksumLen = Core::Checksum::calculate( 01297 checksumAlgorithm.c_str(), { 01298 {&netLen, sizeof(netLen)}, 01299 {data, len}, 01300 }, 01301 checksum); 01302 01303 uint64_t totalLen = checksumLen + sizeof(netLen) + len; 01304 char* buf = new char[totalLen]; 01305 Core::Buffer record( 01306 buf, 01307 totalLen, 01308 Core::Buffer::deleteArrayFn<char>); 01309 Core::Util::memcpy(buf, { 01310 {checksum, checksumLen}, 01311 {&netLen, sizeof(netLen)}, 01312 {data, len}, 01313 }); 01314 return record; 01315 } 01316 01317 01318 ////////// SegmentedLog segment preparer thread functions ////////// 01319 01320 std::pair<std::string, FS::File> 01321 SegmentedLog::prepareNewSegment(uint64_t id) 01322 { 01323 TimePoint start = Clock::now(); 01324 01325 std::string filename = format(OPEN_SEGMENT_FORMAT, id); 01326 FS::File file = FS::openFile(dir, filename, 01327 O_CREAT|O_EXCL|O_RDWR); 01328 FS::allocate(file, 0, MAX_SEGMENT_SIZE); 01329 SegmentHeader header; 01330 header.version = 1; 01331 ssize_t written = FS::write(file.fd, 01332 &header, 01333 sizeof(header)); 01334 if (written == -1) { 01335 PANIC("Failed to write header to %s: %s", 01336 file.path.c_str(), strerror(errno)); 01337 } 01338 FS::fsync(file); 01339 FS::fsync(dir); 01340 01341 TimePoint end = Clock::now(); 01342 std::chrono::nanoseconds elapsed = end - start; 01343 // TODO(ongaro): record elapsed times into RollingStat in a thread-safe way 01344 if (elapsed > diskWriteDurationThreshold) { 01345 WARNING("Preparing open segment file took longer than expected (%s)", 01346 Core::StringUtil::toString(elapsed).c_str()); 01347 } 01348 return {std::move(filename), std::move(file)}; 01349 } 01350 01351 01352 void 01353 SegmentedLog::segmentPreparerMain() 01354 { 01355 Core::ThreadId::setName("SegmentPreparer"); 01356 while (true) { 01357 uint64_t fileId = 0; 01358 try { 01359 fileId = preparedSegments.waitForDemand(); 01360 } catch (const Core::Util::ThreadInterruptedException&) { 01361 VERBOSE("Exiting"); 01362 break; 01363 } 01364 preparedSegments.submitOpenSegment( 01365 prepareNewSegment(fileId)); 01366 } 01367 } 01368 01369 } // namespace LogCabin::Storage 01370 } // namespace LogCabin