LogCabin
|
00001 /* Copyright (c) 2013 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <fcntl.h> 00018 #include <sys/mman.h> 00019 #include <sys/stat.h> 00020 #include <sys/types.h> 00021 #include <unistd.h> 00022 00023 #include "Core/Debug.h" 00024 #include "Core/StringUtil.h" 00025 #include "Core/Time.h" 00026 #include "Core/Util.h" 00027 #include "Storage/Layout.h" 00028 #include "Storage/SnapshotFile.h" 00029 00030 namespace LogCabin { 00031 namespace Storage { 00032 namespace SnapshotFile { 00033 00034 namespace FilesystemUtil = Storage::FilesystemUtil; 00035 using Core::StringUtil::format; 00036 00037 void 00038 discardPartialSnapshots(const Storage::Layout& layout) 00039 { 00040 std::vector<std::string> files = FilesystemUtil::ls(layout.snapshotDir); 00041 for (auto it = files.begin(); it != files.end(); ++it) { 00042 const std::string& filename = *it; 00043 if (Core::StringUtil::startsWith(filename, "partial")) { 00044 NOTICE("Removing incomplete snapshot %s. This was probably being " 00045 "written when the server crashed.", 00046 filename.c_str()); 00047 FilesystemUtil::removeFile(layout.snapshotDir, filename); 00048 } 00049 } 00050 } 00051 00052 Reader::Reader(const Storage::Layout& storageLayout) 00053 : file() 00054 , contents() 00055 , bytesRead(0) 00056 { 00057 file = FilesystemUtil::tryOpenFile(storageLayout.snapshotDir, 00058 "snapshot", 00059 O_RDONLY); 00060 if (file.fd < 0) { 00061 throw std::runtime_error(format( 00062 "Snapshot file not found in %s", 00063 storageLayout.snapshotDir.path.c_str())); 00064 } 00065 contents.reset(new FilesystemUtil::FileContents(file)); 00066 } 00067 00068 Reader::~Reader() 00069 { 00070 } 00071 00072 uint64_t 00073 Reader::getSizeBytes() 00074 { 00075 return contents->getFileLength(); 00076 } 00077 00078 00079 uint64_t 00080 Reader::getBytesRead() const 00081 { 00082 return bytesRead; 00083 } 00084 00085 std::string 00086 Reader::readMessage(google::protobuf::Message& message) 00087 { 00088 uint32_t length = 0; 00089 uint64_t r = readRaw(&length, sizeof(length)); 00090 if (r < sizeof(length)) { 00091 return format("Could only read %lu bytes of %lu-byte length field in " 00092 "file %s (at offset %lu of %lu-byte file)", 00093 r, 00094 sizeof(length), 00095 file.path.c_str(), 00096 bytesRead - r, 00097 getSizeBytes()); 00098 } 00099 length = be32toh(length); 00100 if (getSizeBytes() - bytesRead < length) { 00101 return format("ProtoBuf is %u bytes long but there are only %lu " 00102 "bytes remaining in file %s (at offset %lu)", 00103 length, 00104 getSizeBytes() - bytesRead, 00105 file.path.c_str(), 00106 bytesRead); 00107 } 00108 const Core::Buffer buf(const_cast<void*>(contents->get(bytesRead, length)), 00109 length, 00110 NULL); 00111 std::string error; 00112 if (!Core::ProtoBuf::parse(buf, message)) { 00113 error = format("Could not parse ProtoBuf at bytes %lu-%lu (inclusive) " 00114 "in file %s of length %lu", 00115 bytesRead, 00116 bytesRead + length -1, 00117 file.path.c_str(), 00118 getSizeBytes()); 00119 } 00120 bytesRead += length; 00121 if (getSizeBytes() > 1024 && // minimum to keep quiet during unit tests 00122 10 * bytesRead / getSizeBytes() != 00123 10 * (bytesRead - length) / getSizeBytes()) { 00124 NOTICE("Read %lu%% of snapshot", 00125 100 * bytesRead / getSizeBytes()); 00126 } 00127 return error; 00128 } 00129 00130 uint64_t 00131 Reader::readRaw(void* data, uint64_t length) 00132 { 00133 uint64_t r = contents->copyPartial(bytesRead, data, length); 00134 bytesRead += r; 00135 return r; 00136 } 00137 00138 template<typename T> 00139 Writer::SharedMMap<T>::SharedMMap() 00140 : value(NULL) 00141 { 00142 void* addr = mmap(NULL, 00143 sizeof(*value), 00144 PROT_READ|PROT_WRITE, 00145 MAP_SHARED|MAP_ANONYMOUS, 00146 -1, 0); 00147 if (addr == MAP_FAILED) { 00148 PANIC("Could not mmap anonymous shared page: %s", 00149 strerror(errno)); 00150 } 00151 value = new(addr) T(); 00152 } 00153 00154 template<typename T> 00155 Writer::SharedMMap<T>::~SharedMMap() 00156 { 00157 if (munmap(value, sizeof(*value)) != 0) { 00158 PANIC("Failed to munmap shared anonymous page: %s", 00159 strerror(errno)); 00160 } 00161 } 00162 00163 Writer::Writer(const Storage::Layout& storageLayout) 00164 : parentDir(FilesystemUtil::dup(storageLayout.snapshotDir)) 00165 , stagingName() 00166 , file() 00167 , bytesWritten(0) 00168 , sharedBytesWritten() 00169 { 00170 struct timespec now = 00171 Core::Time::makeTimeSpec(Core::Time::SystemClock::now()); 00172 stagingName = format("partial.%010lu.%06lu", 00173 now.tv_sec, now.tv_nsec / 1000); 00174 file = FilesystemUtil::openFile(parentDir, stagingName, 00175 O_WRONLY|O_CREAT|O_EXCL); 00176 } 00177 00178 Writer::~Writer() 00179 { 00180 if (file.fd >= 0) { 00181 WARNING("Discarding partial snapshot %s", file.path.c_str()); 00182 discard(); 00183 } 00184 } 00185 00186 void 00187 Writer::discard() 00188 { 00189 if (file.fd < 0) 00190 PANIC("File already closed"); 00191 FilesystemUtil::removeFile(parentDir, stagingName); 00192 file.close(); 00193 } 00194 00195 void 00196 Writer::flushToOS() 00197 { 00198 // Nothing to do. 00199 } 00200 00201 void 00202 Writer::seekToEnd() 00203 { 00204 off64_t r = lseek64(file.fd, 0, SEEK_END); 00205 if (r < 0) 00206 PANIC("lseek failed: %s", strerror(errno)); 00207 bytesWritten = Core::Util::downCast<uint64_t>(r); 00208 } 00209 00210 uint64_t 00211 Writer::save() 00212 { 00213 if (file.fd < 0) 00214 PANIC("File already closed"); 00215 FilesystemUtil::fsync(file); 00216 uint64_t fileSize = FilesystemUtil::getSize(file); 00217 file.close(); 00218 FilesystemUtil::rename(parentDir, stagingName, 00219 parentDir, "snapshot"); 00220 FilesystemUtil::fsync(parentDir); 00221 return fileSize; 00222 } 00223 00224 uint64_t 00225 Writer::getBytesWritten() const 00226 { 00227 return bytesWritten; 00228 } 00229 00230 void 00231 Writer::writeMessage(const google::protobuf::Message& message) 00232 { 00233 Core::Buffer buf; 00234 Core::ProtoBuf::serialize(message, buf); 00235 uint32_t beSize = htobe32(uint32_t(buf.getLength())); 00236 ssize_t r = FilesystemUtil::write(file.fd, { 00237 {&beSize, sizeof(beSize)}, 00238 {buf.getData(), buf.getLength()}, 00239 }); 00240 if (r < 0) { 00241 PANIC("Could not write ProtoBuf into %s: %s", 00242 file.path.c_str(), 00243 strerror(errno)); 00244 } 00245 bytesWritten += Core::Util::downCast<uint64_t>(r); 00246 *sharedBytesWritten.value += Core::Util::downCast<uint64_t>(r); 00247 } 00248 00249 void 00250 Writer::writeRaw(const void* data, uint64_t length) 00251 { 00252 ssize_t r = FilesystemUtil::write(file.fd, data, length); 00253 if (r < 0) { 00254 PANIC("Could not write ProtoBuf into %s: %s", 00255 file.path.c_str(), 00256 strerror(errno)); 00257 } 00258 bytesWritten += Core::Util::downCast<uint64_t>(r); 00259 *sharedBytesWritten.value += Core::Util::downCast<uint64_t>(r); 00260 } 00261 00262 } // namespace LogCabin::Storage::SnapshotFile 00263 } // namespace LogCabin::Storage 00264 } // namespace LogCabin