LogCabin
Storage/SnapshotFile.cc
Go to the documentation of this file.
00001 /* Copyright (c) 2013 Stanford University
00002  * Copyright (c) 2015 Diego Ongaro
00003  *
00004  * Permission to use, copy, modify, and distribute this software for any
00005  * purpose with or without fee is hereby granted, provided that the above
00006  * copyright notice and this permission notice appear in all copies.
00007  *
00008  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
00009  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00010  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
00011  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00012  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00013  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00014  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00015  */
00016 
00017 #include <fcntl.h>
00018 #include <sys/mman.h>
00019 #include <sys/stat.h>
00020 #include <sys/types.h>
00021 #include <unistd.h>
00022 
00023 #include "Core/Debug.h"
00024 #include "Core/StringUtil.h"
00025 #include "Core/Time.h"
00026 #include "Core/Util.h"
00027 #include "Storage/Layout.h"
00028 #include "Storage/SnapshotFile.h"
00029 
00030 namespace LogCabin {
00031 namespace Storage {
00032 namespace SnapshotFile {
00033 
00034 namespace FilesystemUtil = Storage::FilesystemUtil;
00035 using Core::StringUtil::format;
00036 
00037 void
00038 discardPartialSnapshots(const Storage::Layout& layout)
00039 {
00040     std::vector<std::string> files = FilesystemUtil::ls(layout.snapshotDir);
00041     for (auto it = files.begin(); it != files.end(); ++it) {
00042         const std::string& filename = *it;
00043         if (Core::StringUtil::startsWith(filename, "partial")) {
00044             NOTICE("Removing incomplete snapshot %s. This was probably being "
00045                    "written when the server crashed.",
00046                    filename.c_str());
00047             FilesystemUtil::removeFile(layout.snapshotDir, filename);
00048         }
00049     }
00050 }
00051 
00052 Reader::Reader(const Storage::Layout& storageLayout)
00053     : file()
00054     , contents()
00055     , bytesRead(0)
00056 {
00057     file = FilesystemUtil::tryOpenFile(storageLayout.snapshotDir,
00058                                        "snapshot",
00059                                        O_RDONLY);
00060     if (file.fd < 0) {
00061         throw std::runtime_error(format(
00062                 "Snapshot file not found in %s",
00063                 storageLayout.snapshotDir.path.c_str()));
00064     }
00065     contents.reset(new FilesystemUtil::FileContents(file));
00066 }
00067 
00068 Reader::~Reader()
00069 {
00070 }
00071 
00072 uint64_t
00073 Reader::getSizeBytes()
00074 {
00075     return contents->getFileLength();
00076 }
00077 
00078 
00079 uint64_t
00080 Reader::getBytesRead() const
00081 {
00082     return bytesRead;
00083 }
00084 
00085 std::string
00086 Reader::readMessage(google::protobuf::Message& message)
00087 {
00088     uint32_t length = 0;
00089     uint64_t r = readRaw(&length, sizeof(length));
00090     if (r < sizeof(length)) {
00091         return format("Could only read %lu bytes of %lu-byte length field in "
00092                       "file %s (at offset %lu of %lu-byte file)",
00093                       r,
00094                       sizeof(length),
00095                       file.path.c_str(),
00096                       bytesRead - r,
00097                       getSizeBytes());
00098     }
00099     length = be32toh(length);
00100     if (getSizeBytes() - bytesRead < length) {
00101         return format("ProtoBuf is %u bytes long but there are only %lu "
00102                       "bytes remaining in file %s (at offset %lu)",
00103                       length,
00104                       getSizeBytes() - bytesRead,
00105                       file.path.c_str(),
00106                       bytesRead);
00107     }
00108     const Core::Buffer buf(const_cast<void*>(contents->get(bytesRead, length)),
00109                            length,
00110                            NULL);
00111     std::string error;
00112     if (!Core::ProtoBuf::parse(buf, message)) {
00113         error = format("Could not parse ProtoBuf at bytes %lu-%lu (inclusive) "
00114                        "in file %s of length %lu",
00115                        bytesRead,
00116                        bytesRead + length -1,
00117                        file.path.c_str(),
00118                        getSizeBytes());
00119     }
00120     bytesRead += length;
00121     if (getSizeBytes() > 1024 && // minimum to keep quiet during unit tests
00122         10 * bytesRead / getSizeBytes() !=
00123         10 * (bytesRead - length) / getSizeBytes()) {
00124         NOTICE("Read %lu%% of snapshot",
00125                100 * bytesRead / getSizeBytes());
00126     }
00127     return error;
00128 }
00129 
00130 uint64_t
00131 Reader::readRaw(void* data, uint64_t length)
00132 {
00133     uint64_t r = contents->copyPartial(bytesRead, data, length);
00134     bytesRead += r;
00135     return r;
00136 }
00137 
00138 template<typename T>
00139 Writer::SharedMMap<T>::SharedMMap()
00140     : value(NULL)
00141 {
00142     void* addr = mmap(NULL,
00143                       sizeof(*value),
00144                       PROT_READ|PROT_WRITE,
00145                       MAP_SHARED|MAP_ANONYMOUS,
00146                       -1, 0);
00147     if (addr == MAP_FAILED) {
00148         PANIC("Could not mmap anonymous shared page: %s",
00149               strerror(errno));
00150     }
00151     value = new(addr) T();
00152 }
00153 
00154 template<typename T>
00155 Writer::SharedMMap<T>::~SharedMMap()
00156 {
00157     if (munmap(value, sizeof(*value)) != 0) {
00158         PANIC("Failed to munmap shared anonymous page: %s",
00159               strerror(errno));
00160     }
00161 }
00162 
00163 Writer::Writer(const Storage::Layout& storageLayout)
00164     : parentDir(FilesystemUtil::dup(storageLayout.snapshotDir))
00165     , stagingName()
00166     , file()
00167     , bytesWritten(0)
00168     , sharedBytesWritten()
00169 {
00170     struct timespec now =
00171         Core::Time::makeTimeSpec(Core::Time::SystemClock::now());
00172     stagingName = format("partial.%010lu.%06lu",
00173                          now.tv_sec, now.tv_nsec / 1000);
00174     file = FilesystemUtil::openFile(parentDir, stagingName,
00175                                     O_WRONLY|O_CREAT|O_EXCL);
00176 }
00177 
00178 Writer::~Writer()
00179 {
00180     if (file.fd >= 0) {
00181         WARNING("Discarding partial snapshot %s", file.path.c_str());
00182         discard();
00183     }
00184 }
00185 
00186 void
00187 Writer::discard()
00188 {
00189     if (file.fd < 0)
00190         PANIC("File already closed");
00191     FilesystemUtil::removeFile(parentDir, stagingName);
00192     file.close();
00193 }
00194 
00195 void
00196 Writer::flushToOS()
00197 {
00198     // Nothing to do.
00199 }
00200 
00201 void
00202 Writer::seekToEnd()
00203 {
00204     off64_t r = lseek64(file.fd, 0, SEEK_END);
00205     if (r < 0)
00206         PANIC("lseek failed: %s", strerror(errno));
00207     bytesWritten = Core::Util::downCast<uint64_t>(r);
00208 }
00209 
00210 uint64_t
00211 Writer::save()
00212 {
00213     if (file.fd < 0)
00214         PANIC("File already closed");
00215     FilesystemUtil::fsync(file);
00216     uint64_t fileSize = FilesystemUtil::getSize(file);
00217     file.close();
00218     FilesystemUtil::rename(parentDir, stagingName,
00219                            parentDir, "snapshot");
00220     FilesystemUtil::fsync(parentDir);
00221     return fileSize;
00222 }
00223 
00224 uint64_t
00225 Writer::getBytesWritten() const
00226 {
00227     return bytesWritten;
00228 }
00229 
00230 void
00231 Writer::writeMessage(const google::protobuf::Message& message)
00232 {
00233     Core::Buffer buf;
00234     Core::ProtoBuf::serialize(message, buf);
00235     uint32_t beSize = htobe32(uint32_t(buf.getLength()));
00236     ssize_t r = FilesystemUtil::write(file.fd, {
00237                                           {&beSize, sizeof(beSize)},
00238                                           {buf.getData(), buf.getLength()},
00239                                       });
00240     if (r < 0) {
00241         PANIC("Could not write ProtoBuf into %s: %s",
00242               file.path.c_str(),
00243               strerror(errno));
00244     }
00245     bytesWritten += Core::Util::downCast<uint64_t>(r);
00246     *sharedBytesWritten.value += Core::Util::downCast<uint64_t>(r);
00247 }
00248 
00249 void
00250 Writer::writeRaw(const void* data, uint64_t length)
00251 {
00252     ssize_t r = FilesystemUtil::write(file.fd, data, length);
00253     if (r < 0) {
00254         PANIC("Could not write ProtoBuf into %s: %s",
00255               file.path.c_str(),
00256               strerror(errno));
00257     }
00258     bytesWritten += Core::Util::downCast<uint64_t>(r);
00259     *sharedBytesWritten.value += Core::Util::downCast<uint64_t>(r);
00260 }
00261 
00262 } // namespace LogCabin::Storage::SnapshotFile
00263 } // namespace LogCabin::Storage
00264 } // namespace LogCabin
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines