LogCabin
|
00001 /* Copyright (c) 2011-2014 Stanford University 00002 * Copyright (c) 2015 Diego Ongaro 00003 * 00004 * Permission to use, copy, modify, and distribute this software for any 00005 * purpose with or without fee is hereby granted, provided that the above 00006 * copyright notice and this permission notice appear in all copies. 00007 * 00008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES 00009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 00010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR 00011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 00012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 00013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 00014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 */ 00016 00017 #include <cassert> 00018 #include <cstring> 00019 #include <sys/epoll.h> 00020 #include <sys/timerfd.h> 00021 #include <unistd.h> 00022 00023 #include "Core/Debug.h" 00024 #include "Core/ThreadId.h" 00025 #include "Event/File.h" 00026 #include "Event/Loop.h" 00027 #include "Event/Timer.h" 00028 00029 namespace LogCabin { 00030 namespace Event { 00031 00032 namespace { 00033 00034 /// Helper for Loop constructor. 00035 int 00036 createEpollFd() 00037 { 00038 int epollfd = epoll_create1(0); 00039 if (epollfd < 0) 00040 PANIC("epoll_create1 failed: %s", strerror(errno)); 00041 return epollfd; 00042 } 00043 00044 } // anonymous namespace 00045 00046 ////////// Loop::Lock ////////// 00047 00048 Loop::Lock::Lock(Event::Loop& eventLoop) 00049 : eventLoop(eventLoop) 00050 { 00051 { 00052 std::unique_lock<std::mutex> lockGuard(eventLoop.mutex); 00053 ++eventLoop.numLocks; 00054 if (eventLoop.runningThread != Core::ThreadId::getId() && 00055 eventLoop.lockOwner != Core::ThreadId::getId()) { 00056 // This is an actual lock: we're not running inside the event loop, and 00057 // we're not recursively locking. 00058 if (eventLoop.runningThread != Core::ThreadId::NONE) 00059 eventLoop.breakTimer.schedule(0); 00060 while (eventLoop.runningThread != Core::ThreadId::NONE || 00061 eventLoop.lockOwner != Core::ThreadId::NONE) { 00062 eventLoop.safeToLock.wait(lockGuard); 00063 } 00064 // Take ownership of the lock 00065 eventLoop.lockOwner = Core::ThreadId::getId(); 00066 } 00067 ++eventLoop.numActiveLocks; 00068 } 00069 eventLoop.extraMutexToSatisfyRaceDetector.lock(); 00070 } 00071 00072 Loop::Lock::~Lock() 00073 { 00074 eventLoop.extraMutexToSatisfyRaceDetector.unlock(); 00075 std::lock_guard<std::mutex> lockGuard(eventLoop.mutex); 00076 --eventLoop.numLocks; 00077 --eventLoop.numActiveLocks; 00078 if (eventLoop.numActiveLocks == 0) { 00079 eventLoop.lockOwner = Core::ThreadId::NONE; 00080 if (eventLoop.numLocks == 0) 00081 eventLoop.unlocked.notify_one(); 00082 else 00083 eventLoop.safeToLock.notify_one(); 00084 } 00085 } 00086 00087 ////////// Loop::NullTimer ////////// 00088 00089 void 00090 Loop::NullTimer::handleTimerEvent() 00091 { 00092 // do nothing 00093 } 00094 00095 ////////// Loop ////////// 00096 00097 Loop::Loop() 00098 : epollfd(createEpollFd()) 00099 , breakTimer() 00100 , shouldExit(false) 00101 , mutex() 00102 , runningThread(Core::ThreadId::NONE) 00103 , numLocks(0) 00104 , numActiveLocks(0) 00105 , lockOwner(Core::ThreadId::NONE) 00106 , safeToLock() 00107 , unlocked() 00108 , extraMutexToSatisfyRaceDetector() 00109 , breakTimerMonitor(*this, breakTimer) 00110 { 00111 } 00112 00113 Loop::~Loop() 00114 { 00115 breakTimerMonitor.disableForever(); 00116 if (epollfd >= 0) { 00117 int r = close(epollfd); 00118 if (r != 0) 00119 PANIC("Could not close epollfd %d: %s", epollfd, strerror(errno)); 00120 } 00121 } 00122 00123 void 00124 Loop::runForever() 00125 { 00126 while (true) { 00127 { // Handle Loop::Lock requests and exiting. 00128 std::unique_lock<std::mutex> lockGuard(mutex); 00129 runningThread = Core::ThreadId::NONE; 00130 // Wait for all Locks to finish up 00131 while (numLocks > 0) { 00132 safeToLock.notify_one(); 00133 unlocked.wait(lockGuard); 00134 } 00135 if (shouldExit) { 00136 shouldExit = false; 00137 return; 00138 } 00139 runningThread = Core::ThreadId::getId(); 00140 } 00141 std::unique_lock<decltype(extraMutexToSatisfyRaceDetector)> 00142 extraLockGuard(extraMutexToSatisfyRaceDetector); 00143 00144 // Block in the kernel for events, then process them. 00145 // TODO(ongaro): It'd be more efficient to handle more than 1 event at 00146 // a time, but that complicates the interface: if a handler removes 00147 // itself from the poll set and deletes itself, we don't want further 00148 // events to call that same handler. For example, if a socket is 00149 // dup()ed so that the receive side handles events separately from the 00150 // send side, both are active events, and the first deletes the object, 00151 // this could cause trouble. 00152 enum { NUM_EVENTS = 1 }; 00153 struct epoll_event events[NUM_EVENTS]; 00154 int r = epoll_wait(epollfd, events, NUM_EVENTS, -1); 00155 if (r <= 0) { 00156 if (errno == EINTR) // caused by GDB 00157 continue; 00158 PANIC("epoll_wait failed: %s", strerror(errno)); 00159 } 00160 for (int i = 0; i < r; ++i) { 00161 Event::File& file = *static_cast<Event::File*>(events[i].data.ptr); 00162 file.handleFileEvent(events[i].events); 00163 } 00164 } 00165 } 00166 00167 void 00168 Loop::exit() 00169 { 00170 Event::Loop::Lock lock(*this); 00171 shouldExit = true; 00172 } 00173 00174 } // namespace LogCabin::Event 00175 } // namespace LogCabin