LogCabin
|
00001 /* Copyright (c) 2012-2013 Stanford University 00002 * 00003 * Permission to use, copy, modify, and distribute this software for any 00004 * purpose with or without fee is hereby granted, provided that the above 00005 * copyright notice and this permission notice appear in all copies. 00006 * 00007 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00008 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00009 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00010 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00011 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00012 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00013 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00014 */ 00015 00016 #include <algorithm> 00017 #include <fcntl.h> 00018 #include <sys/stat.h> 00019 #include <unistd.h> 00020 00021 #include "build/Protocol/Raft.pb.h" 00022 #include "Core/Buffer.h" 00023 #include "Core/Checksum.h" 00024 #include "Core/Debug.h" 00025 #include "Core/ProtoBuf.h" 00026 #include "Core/StringUtil.h" 00027 #include "Core/Time.h" 00028 #include "Core/Util.h" 00029 #include "Storage/FilesystemUtil.h" 00030 #include "Storage/SimpleFileLog.h" 00031 00032 namespace LogCabin { 00033 namespace Storage { 00034 00035 using FilesystemUtil::File; 00036 using Core::StringUtil::format; 00037 00038 namespace { 00039 std::string 00040 fileToProto(const File& dir, const std::string& path, 00041 google::protobuf::Message& out) 00042 { 00043 FilesystemUtil::File file = 00044 FilesystemUtil::tryOpenFile(dir, path, O_RDONLY); 00045 if (file.fd == -1) { 00046 return format("Could not open %s/%s: %s", 00047 dir.path.c_str(), path.c_str(), strerror(errno)); 00048 } 00049 FilesystemUtil::FileContents reader(file); 00050 00051 char checksum[Core::Checksum::MAX_LENGTH]; 00052 uint64_t bytesRead = reader.copyPartial(0, checksum, sizeof(checksum)); 00053 uint32_t checksumBytes = Core::Checksum::length(checksum, 00054 uint32_t(bytesRead)); 00055 if (checksumBytes == 0) 00056 return format("File %s missing checksum", file.path.c_str()); 00057 00058 uint64_t dataLen = reader.getFileLength() - checksumBytes; 00059 const void* data = reader.get(checksumBytes, dataLen); 00060 std::string error = Core::Checksum::verify(checksum, data, dataLen); 00061 if (!error.empty()) { 00062 return format("Checksum verification failure on %s: %s", 00063 file.path.c_str(), error.c_str()); 00064 } 00065 00066 #if BINARY_FORMAT 00067 RPC::Buffer contents(const_cast<void*>(data), dataLen, NULL); 00068 if (!RPC::ProtoBuf::parse(contents, out)) 00069 return format("Failed to parse protobuf in %s", file.path.c_str()); 00070 #else 00071 std::string contents(static_cast<const char*>(data), dataLen); 00072 Core::ProtoBuf::Internal::fromString(contents, out); 00073 #endif 00074 return ""; 00075 } 00076 00077 FilesystemUtil::File 00078 protoToFile(const google::protobuf::Message& in, 00079 const File& dir, const std::string& path) 00080 { 00081 FilesystemUtil::File file = 00082 FilesystemUtil::openFile(dir, path, O_CREAT|O_WRONLY|O_TRUNC); 00083 const void* data = NULL; 00084 uint64_t len = 0; 00085 #if BINARY_FORMAT 00086 RPC::Buffer contents; 00087 RPC::ProtoBuf::serialize(in, contents); 00088 data = contents.getData(); 00089 len = contents.getLenhgt(); 00090 #else 00091 std::string contents(Core::ProtoBuf::dumpString(in)); 00092 contents = "\n" + contents; 00093 data = contents.data(); 00094 len = uint64_t(contents.length()); 00095 #endif 00096 char checksum[Core::Checksum::MAX_LENGTH]; 00097 uint32_t checksumLen = Core::Checksum::calculate("SHA-1", 00098 data, len, 00099 checksum); 00100 00101 ssize_t written = FilesystemUtil::write(file.fd, { 00102 {checksum, checksumLen}, 00103 {data, len}, 00104 }); 00105 if (written == -1) { 00106 PANIC("Failed to write to %s: %s", 00107 file.path.c_str(), strerror(errno)); 00108 } 00109 00110 return file; 00111 } 00112 } 00113 00114 ////////// SimpleFileLog::Sync ////////// 00115 SimpleFileLog::Sync::Sync(uint64_t lastIndex) 00116 : Log::Sync(lastIndex) 00117 , fds() 00118 { 00119 } 00120 00121 void 00122 SimpleFileLog::Sync::wait() 00123 { 00124 for (auto it = fds.begin(); it != fds.end(); ++it) { 00125 FilesystemUtil::File f(it->first, "-unknown-"); 00126 FilesystemUtil::fsync(f); 00127 if (it->second) 00128 f.close(); 00129 else 00130 f.release(); 00131 } 00132 } 00133 00134 ////////// SimpleFileLog ////////// 00135 00136 std::string 00137 SimpleFileLog::readMetadata(const std::string& filename, 00138 SimpleFileLogMetadata::Metadata& metadata) const 00139 { 00140 std::string error = fileToProto(dir, filename, metadata); 00141 if (!error.empty()) 00142 return error; 00143 return ""; 00144 } 00145 00146 SimpleFileLog::SimpleFileLog(const FilesystemUtil::File& parentDir) 00147 : memoryLog() 00148 , metadata() 00149 , dir(FilesystemUtil::openDir(parentDir, "SimpleFile")) 00150 , lostAndFound(FilesystemUtil::openDir(dir, "SimpleFile-unknown")) 00151 , currentSync(new Sync(0)) 00152 { 00153 std::vector<uint64_t> fsEntryIds = getEntryIds(); 00154 00155 SimpleFileLogMetadata::Metadata metadata1; 00156 SimpleFileLogMetadata::Metadata metadata2; 00157 std::string error1 = readMetadata("metadata1", metadata1); 00158 std::string error2 = readMetadata("metadata2", metadata2); 00159 if (error1.empty() && error2.empty()) { 00160 if (metadata1.version() > metadata2.version()) 00161 metadata = metadata1; 00162 else 00163 metadata = metadata2; 00164 } else if (error1.empty()) { 00165 metadata = metadata1; 00166 } else if (error2.empty()) { 00167 metadata = metadata2; 00168 } else { 00169 // Brand new servers won't have metadata. 00170 WARNING("Error reading metadata1: %s", error1.c_str()); 00171 WARNING("Error reading metadata2: %s", error2.c_str()); 00172 if (!fsEntryIds.empty()) { 00173 PANIC("No readable metadata file but found entries in %s", 00174 dir.path.c_str()); 00175 } 00176 metadata.set_entries_start(1); 00177 metadata.set_entries_end(0); 00178 } 00179 00180 std::vector<uint64_t> found; 00181 for (auto it = fsEntryIds.begin(); it != fsEntryIds.end(); ++it) { 00182 if (*it < metadata.entries_start() || *it > metadata.entries_end()) 00183 found.push_back(*it); 00184 } 00185 00186 std::string time; 00187 { 00188 struct timespec now = 00189 Core::Time::makeTimeSpec(Core::Time::SystemClock::now()); 00190 time = format("%010lu.%06lu", now.tv_sec, now.tv_nsec / 1000); 00191 } 00192 for (auto it = found.begin(); it != found.end(); ++it) { 00193 uint64_t entryId = *it; 00194 std::string oldName = format("%016lx", entryId); 00195 std::string newName = format("%s-%016lx", time.c_str(), entryId); 00196 WARNING("Moving extraneous file %s/%s to %s/%s", 00197 dir.path.c_str(), oldName.c_str(), 00198 lostAndFound.path.c_str(), newName.c_str()); 00199 FilesystemUtil::rename(dir, oldName, 00200 lostAndFound, newName); 00201 FilesystemUtil::fsync(lostAndFound); 00202 FilesystemUtil::fsync(dir); 00203 } 00204 00205 memoryLog.truncatePrefix(metadata.entries_start()); 00206 for (uint64_t id = metadata.entries_start(); 00207 id <= metadata.entries_end(); 00208 ++id) { 00209 Log::Entry e = read(format("%016lx", id)); 00210 memoryLog.append({&e}); 00211 } 00212 00213 Log::metadata = metadata.raft_metadata(); 00214 // Write both metadata files 00215 updateMetadata(); 00216 updateMetadata(); 00217 } 00218 00219 SimpleFileLog::~SimpleFileLog() 00220 { 00221 if (currentSync->fds.empty()) 00222 currentSync->completed = true; 00223 } 00224 00225 std::pair<uint64_t, uint64_t> 00226 SimpleFileLog::append(const std::vector<const Entry*>& entries) 00227 { 00228 std::pair<uint64_t, uint64_t> range = memoryLog.append(entries); 00229 for (uint64_t index = range.first; index <= range.second; ++index) { 00230 FilesystemUtil::File file = protoToFile(memoryLog.getEntry(index), 00231 dir, format("%016lx", index)); 00232 currentSync->fds.push_back({file.release(), true}); 00233 } 00234 FilesystemUtil::File mdfile = updateMetadataCallerSync(); 00235 currentSync->fds.push_back({dir.fd, false}); 00236 currentSync->fds.push_back({mdfile.release(), true}); 00237 currentSync->lastIndex = range.second; 00238 return range; 00239 } 00240 00241 00242 std::string 00243 SimpleFileLog::getName() const 00244 { 00245 return "SimpleFile"; 00246 } 00247 00248 std::unique_ptr<Log::Sync> 00249 SimpleFileLog::takeSync() 00250 { 00251 std::unique_ptr<Sync> other(new Sync(getLastLogIndex())); 00252 std::swap(other, currentSync); 00253 return std::move(other); 00254 } 00255 00256 void 00257 SimpleFileLog::truncatePrefix(uint64_t firstEntryId) 00258 { 00259 uint64_t old = getLogStartIndex(); 00260 memoryLog.truncatePrefix(firstEntryId); 00261 // update metadata before removing files in case of interruption 00262 updateMetadata(); 00263 for (uint64_t entryId = old; entryId < getLogStartIndex(); ++entryId) 00264 FilesystemUtil::removeFile(dir, format("%016lx", entryId)); 00265 // fsync(dir) not needed because of metadata 00266 } 00267 00268 void 00269 SimpleFileLog::truncateSuffix(uint64_t lastEntryId) 00270 { 00271 uint64_t old = getLastLogIndex(); 00272 memoryLog.truncateSuffix(lastEntryId); 00273 // update metadata before removing files in case of interruption 00274 updateMetadata(); 00275 for (uint64_t entryId = old; entryId > getLastLogIndex(); --entryId) 00276 FilesystemUtil::removeFile(dir, format("%016lx", entryId)); 00277 // fsync(dir) not needed because of metadata 00278 } 00279 00280 const SimpleFileLog::Entry& 00281 SimpleFileLog::getEntry(uint64_t i) const 00282 { 00283 return memoryLog.getEntry(i); 00284 } 00285 00286 uint64_t 00287 SimpleFileLog::getLogStartIndex() const 00288 { 00289 return memoryLog.getLogStartIndex(); 00290 } 00291 00292 uint64_t 00293 SimpleFileLog::getLastLogIndex() const 00294 { 00295 return memoryLog.getLastLogIndex(); 00296 } 00297 00298 uint64_t 00299 SimpleFileLog::getSizeBytes() const 00300 { 00301 return memoryLog.getSizeBytes(); 00302 } 00303 00304 void 00305 SimpleFileLog::updateMetadata() 00306 { 00307 // sync file to disk 00308 FilesystemUtil::fsync(updateMetadataCallerSync()); 00309 // sync directory entry to disk (needed if we created file) 00310 FilesystemUtil::fsync(dir); 00311 } 00312 00313 FilesystemUtil::File 00314 SimpleFileLog::updateMetadataCallerSync() 00315 { 00316 *metadata.mutable_raft_metadata() = Log::metadata; 00317 metadata.set_entries_start(memoryLog.getLogStartIndex()); 00318 metadata.set_entries_end(memoryLog.getLastLogIndex()); 00319 metadata.set_version(metadata.version() + 1); 00320 if (metadata.version() % 2 == 1) { 00321 return protoToFile(metadata, dir, "metadata1"); 00322 } else { 00323 return protoToFile(metadata, dir, "metadata2"); 00324 } 00325 } 00326 00327 std::vector<uint64_t> 00328 SimpleFileLog::getEntryIds() const 00329 { 00330 std::vector<std::string> filenames = FilesystemUtil::ls(dir); 00331 std::vector<uint64_t> entryIds; 00332 for (auto it = filenames.begin(); it != filenames.end(); ++it) { 00333 const std::string& filename = *it; 00334 if (filename == "metadata1" || 00335 filename == "metadata2" || 00336 filename == "unknown") { 00337 continue; 00338 } 00339 uint64_t entryId; 00340 unsigned bytesConsumed; 00341 int matched = sscanf(filename.c_str(), "%016lx%n", // NOLINT 00342 &entryId, &bytesConsumed); 00343 if (matched != 1 || bytesConsumed != filename.length()) { 00344 WARNING("%s doesn't look like a valid entry ID (from %s)", 00345 filename.c_str(), 00346 (dir.path + "/" + filename).c_str()); 00347 continue; 00348 } 00349 entryIds.push_back(entryId); 00350 } 00351 return entryIds; 00352 } 00353 00354 Log::Entry 00355 SimpleFileLog::read(const std::string& entryPath) const 00356 { 00357 Protocol::Raft::Entry entry; 00358 std::string error = fileToProto(dir, entryPath, entry); 00359 if (!error.empty()) 00360 PANIC("Could not parse file: %s", error.c_str()); 00361 return entry; 00362 } 00363 00364 } // namespace LogCabin::Storage 00365 } // namespace LogCabin