LogCabin
Storage/SimpleFileLog.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2012-2013 Stanford University
00002  *
00003  * Permission to use, copy, modify, and distribute this software for any
00004  * purpose with or without fee is hereby granted, provided that the above
00005  * copyright notice and this permission notice appear in all copies.
00006  *
00007  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00008  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00009  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00010  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00011  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00012  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00013  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00014  */
00015 
00016 #include <algorithm>
00017 #include <fcntl.h>
00018 #include <sys/stat.h>
00019 #include <unistd.h>
00020 
00021 #include "build/Protocol/Raft.pb.h"
00022 #include "Core/Buffer.h"
00023 #include "Core/Checksum.h"
00024 #include "Core/Debug.h"
00025 #include "Core/ProtoBuf.h"
00026 #include "Core/StringUtil.h"
00027 #include "Core/Time.h"
00028 #include "Core/Util.h"
00029 #include "Storage/FilesystemUtil.h"
00030 #include "Storage/SimpleFileLog.h"
00031 
00032 namespace LogCabin {
00033 namespace Storage {
00034 
00035 using FilesystemUtil::File;
00036 using Core::StringUtil::format;
00037 
00038 namespace {
00039 std::string
00040 fileToProto(const File& dir, const std::string& path,
00041             google::protobuf::Message& out)
00042 {
00043     FilesystemUtil::File file =
00044         FilesystemUtil::tryOpenFile(dir, path, O_RDONLY);
00045     if (file.fd == -1) {
00046         return format("Could not open %s/%s: %s",
00047                       dir.path.c_str(), path.c_str(), strerror(errno));
00048     }
00049     FilesystemUtil::FileContents reader(file);
00050 
00051     char checksum[Core::Checksum::MAX_LENGTH];
00052     uint64_t bytesRead = reader.copyPartial(0, checksum, sizeof(checksum));
00053     uint32_t checksumBytes = Core::Checksum::length(checksum,
00054                                                     uint32_t(bytesRead));
00055     if (checksumBytes == 0)
00056         return format("File %s missing checksum", file.path.c_str());
00057 
00058     uint64_t dataLen = reader.getFileLength() - checksumBytes;
00059     const void* data = reader.get(checksumBytes, dataLen);
00060     std::string error = Core::Checksum::verify(checksum, data, dataLen);
00061     if (!error.empty()) {
00062         return format("Checksum verification failure on %s: %s",
00063                       file.path.c_str(), error.c_str());
00064     }
00065 
00066 #if BINARY_FORMAT
00067     RPC::Buffer contents(const_cast<void*>(data), dataLen, NULL);
00068     if (!RPC::ProtoBuf::parse(contents, out))
00069         return format("Failed to parse protobuf in %s", file.path.c_str());
00070 #else
00071     std::string contents(static_cast<const char*>(data), dataLen);
00072     Core::ProtoBuf::Internal::fromString(contents, out);
00073 #endif
00074     return "";
00075 }
00076 
00077 FilesystemUtil::File
00078 protoToFile(const google::protobuf::Message& in,
00079             const File& dir, const std::string& path)
00080 {
00081     FilesystemUtil::File file =
00082         FilesystemUtil::openFile(dir, path, O_CREAT|O_WRONLY|O_TRUNC);
00083     const void* data = NULL;
00084     uint64_t len = 0;
00085 #if BINARY_FORMAT
00086     RPC::Buffer contents;
00087     RPC::ProtoBuf::serialize(in, contents);
00088     data = contents.getData();
00089     len = contents.getLenhgt();
00090 #else
00091     std::string contents(Core::ProtoBuf::dumpString(in));
00092     contents = "\n" + contents;
00093     data = contents.data();
00094     len = uint64_t(contents.length());
00095 #endif
00096     char checksum[Core::Checksum::MAX_LENGTH];
00097     uint32_t checksumLen = Core::Checksum::calculate("SHA-1",
00098                                                      data, len,
00099                                                      checksum);
00100 
00101     ssize_t written = FilesystemUtil::write(file.fd, {
00102         {checksum, checksumLen},
00103         {data, len},
00104     });
00105     if (written == -1) {
00106         PANIC("Failed to write to %s: %s",
00107               file.path.c_str(), strerror(errno));
00108     }
00109 
00110     return file;
00111 }
00112 }
00113 
00114 ////////// SimpleFileLog::Sync //////////
00115 SimpleFileLog::Sync::Sync(uint64_t lastIndex)
00116     : Log::Sync(lastIndex)
00117     , fds()
00118 {
00119 }
00120 
00121 void
00122 SimpleFileLog::Sync::wait()
00123 {
00124     for (auto it = fds.begin(); it != fds.end(); ++it) {
00125         FilesystemUtil::File f(it->first, "-unknown-");
00126         FilesystemUtil::fsync(f);
00127         if (it->second)
00128             f.close();
00129         else
00130             f.release();
00131     }
00132 }
00133 
00134 ////////// SimpleFileLog //////////
00135 
00136 std::string
00137 SimpleFileLog::readMetadata(const std::string& filename,
00138                             SimpleFileLogMetadata::Metadata& metadata) const
00139 {
00140     std::string error = fileToProto(dir, filename, metadata);
00141     if (!error.empty())
00142         return error;
00143     return "";
00144 }
00145 
00146 SimpleFileLog::SimpleFileLog(const FilesystemUtil::File& parentDir)
00147     : memoryLog()
00148     , metadata()
00149     , dir(FilesystemUtil::openDir(parentDir, "SimpleFile"))
00150     , lostAndFound(FilesystemUtil::openDir(dir, "SimpleFile-unknown"))
00151     , currentSync(new Sync(0))
00152 {
00153     std::vector<uint64_t> fsEntryIds = getEntryIds();
00154 
00155     SimpleFileLogMetadata::Metadata metadata1;
00156     SimpleFileLogMetadata::Metadata metadata2;
00157     std::string error1 = readMetadata("metadata1", metadata1);
00158     std::string error2 = readMetadata("metadata2", metadata2);
00159     if (error1.empty() && error2.empty()) {
00160         if (metadata1.version() > metadata2.version())
00161             metadata = metadata1;
00162         else
00163             metadata = metadata2;
00164     } else if (error1.empty()) {
00165         metadata = metadata1;
00166     } else if (error2.empty()) {
00167         metadata = metadata2;
00168     } else {
00169         // Brand new servers won't have metadata.
00170         WARNING("Error reading metadata1: %s", error1.c_str());
00171         WARNING("Error reading metadata2: %s", error2.c_str());
00172         if (!fsEntryIds.empty()) {
00173             PANIC("No readable metadata file but found entries in %s",
00174                   dir.path.c_str());
00175         }
00176         metadata.set_entries_start(1);
00177         metadata.set_entries_end(0);
00178     }
00179 
00180     std::vector<uint64_t> found;
00181     for (auto it = fsEntryIds.begin(); it != fsEntryIds.end(); ++it) {
00182         if (*it < metadata.entries_start() || *it > metadata.entries_end())
00183             found.push_back(*it);
00184     }
00185 
00186     std::string time;
00187     {
00188         struct timespec now =
00189             Core::Time::makeTimeSpec(Core::Time::SystemClock::now());
00190         time = format("%010lu.%06lu", now.tv_sec, now.tv_nsec / 1000);
00191     }
00192     for (auto it = found.begin(); it != found.end(); ++it) {
00193         uint64_t entryId = *it;
00194         std::string oldName = format("%016lx", entryId);
00195         std::string newName = format("%s-%016lx", time.c_str(), entryId);
00196         WARNING("Moving extraneous file %s/%s to %s/%s",
00197                 dir.path.c_str(), oldName.c_str(),
00198                 lostAndFound.path.c_str(), newName.c_str());
00199         FilesystemUtil::rename(dir, oldName,
00200                                lostAndFound, newName);
00201         FilesystemUtil::fsync(lostAndFound);
00202         FilesystemUtil::fsync(dir);
00203     }
00204 
00205     memoryLog.truncatePrefix(metadata.entries_start());
00206     for (uint64_t id = metadata.entries_start();
00207          id <= metadata.entries_end();
00208          ++id) {
00209         Log::Entry e = read(format("%016lx", id));
00210         memoryLog.append({&e});
00211     }
00212 
00213     Log::metadata = metadata.raft_metadata();
00214     // Write both metadata files
00215     updateMetadata();
00216     updateMetadata();
00217 }
00218 
00219 SimpleFileLog::~SimpleFileLog()
00220 {
00221     if (currentSync->fds.empty())
00222         currentSync->completed = true;
00223 }
00224 
00225 std::pair<uint64_t, uint64_t>
00226 SimpleFileLog::append(const std::vector<const Entry*>& entries)
00227 {
00228     std::pair<uint64_t, uint64_t> range = memoryLog.append(entries);
00229     for (uint64_t index = range.first; index <= range.second; ++index) {
00230         FilesystemUtil::File file = protoToFile(memoryLog.getEntry(index),
00231                                                 dir, format("%016lx", index));
00232         currentSync->fds.push_back({file.release(), true});
00233     }
00234     FilesystemUtil::File mdfile = updateMetadataCallerSync();
00235     currentSync->fds.push_back({dir.fd, false});
00236     currentSync->fds.push_back({mdfile.release(), true});
00237     currentSync->lastIndex = range.second;
00238     return range;
00239 }
00240 
00241 
00242 std::string
00243 SimpleFileLog::getName() const
00244 {
00245     return "SimpleFile";
00246 }
00247 
00248 std::unique_ptr<Log::Sync>
00249 SimpleFileLog::takeSync()
00250 {
00251     std::unique_ptr<Sync> other(new Sync(getLastLogIndex()));
00252     std::swap(other, currentSync);
00253     return std::move(other);
00254 }
00255 
00256 void
00257 SimpleFileLog::truncatePrefix(uint64_t firstEntryId)
00258 {
00259     uint64_t old = getLogStartIndex();
00260     memoryLog.truncatePrefix(firstEntryId);
00261     // update metadata before removing files in case of interruption
00262     updateMetadata();
00263     for (uint64_t entryId = old; entryId < getLogStartIndex(); ++entryId)
00264         FilesystemUtil::removeFile(dir, format("%016lx", entryId));
00265     // fsync(dir) not needed because of metadata
00266 }
00267 
00268 void
00269 SimpleFileLog::truncateSuffix(uint64_t lastEntryId)
00270 {
00271     uint64_t old = getLastLogIndex();
00272     memoryLog.truncateSuffix(lastEntryId);
00273     // update metadata before removing files in case of interruption
00274     updateMetadata();
00275     for (uint64_t entryId = old; entryId > getLastLogIndex(); --entryId)
00276         FilesystemUtil::removeFile(dir, format("%016lx", entryId));
00277     // fsync(dir) not needed because of metadata
00278 }
00279 
00280 const SimpleFileLog::Entry&
00281 SimpleFileLog::getEntry(uint64_t i) const
00282 {
00283     return memoryLog.getEntry(i);
00284 }
00285 
00286 uint64_t
00287 SimpleFileLog::getLogStartIndex() const
00288 {
00289     return memoryLog.getLogStartIndex();
00290 }
00291 
00292 uint64_t
00293 SimpleFileLog::getLastLogIndex() const
00294 {
00295     return memoryLog.getLastLogIndex();
00296 }
00297 
00298 uint64_t
00299 SimpleFileLog::getSizeBytes() const
00300 {
00301     return memoryLog.getSizeBytes();
00302 }
00303 
00304 void
00305 SimpleFileLog::updateMetadata()
00306 {
00307     // sync file to disk
00308     FilesystemUtil::fsync(updateMetadataCallerSync());
00309     // sync directory entry to disk (needed if we created file)
00310     FilesystemUtil::fsync(dir);
00311 }
00312 
00313 FilesystemUtil::File
00314 SimpleFileLog::updateMetadataCallerSync()
00315 {
00316     *metadata.mutable_raft_metadata() = Log::metadata;
00317     metadata.set_entries_start(memoryLog.getLogStartIndex());
00318     metadata.set_entries_end(memoryLog.getLastLogIndex());
00319     metadata.set_version(metadata.version() + 1);
00320     if (metadata.version() % 2 == 1) {
00321         return protoToFile(metadata, dir, "metadata1");
00322     } else {
00323         return protoToFile(metadata, dir, "metadata2");
00324     }
00325 }
00326 
00327 std::vector<uint64_t>
00328 SimpleFileLog::getEntryIds() const
00329 {
00330     std::vector<std::string> filenames = FilesystemUtil::ls(dir);
00331     std::vector<uint64_t> entryIds;
00332     for (auto it = filenames.begin(); it != filenames.end(); ++it) {
00333         const std::string& filename = *it;
00334         if (filename == "metadata1" ||
00335             filename == "metadata2" ||
00336             filename == "unknown") {
00337             continue;
00338         }
00339         uint64_t entryId;
00340         unsigned bytesConsumed;
00341         int matched = sscanf(filename.c_str(), "%016lx%n", // NOLINT
00342                              &entryId, &bytesConsumed);
00343         if (matched != 1 || bytesConsumed != filename.length()) {
00344             WARNING("%s doesn't look like a valid entry ID (from %s)",
00345                     filename.c_str(),
00346                     (dir.path + "/" + filename).c_str());
00347             continue;
00348         }
00349         entryIds.push_back(entryId);
00350     }
00351     return entryIds;
00352 }
00353 
00354 Log::Entry
00355 SimpleFileLog::read(const std::string& entryPath) const
00356 {
00357     Protocol::Raft::Entry entry;
00358     std::string error = fileToProto(dir, entryPath, entry);
00359     if (!error.empty())
00360         PANIC("Could not parse file: %s", error.c_str());
00361     return entry;
00362 }
00363 
00364 } // namespace LogCabin::Storage
00365 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines