X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=Scheduler%2FScheduler.cc;h=a79307ac5ca989e8ccc909552429e078672dcb0b;hb=bd9f9d3fd6fbcff0112a7bf48ab9284da9576b11;hp=3d61946a8fdae0f6c6b68bbbb895e71d29a96b16;hpb=bf6bdcb90de19ed474535d41c5519518921b717b;p=senf.git diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index 3d61946..a79307a 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -31,320 +31,163 @@ threads) */ -// Here a basic concept of how to add signal support to the scheduler: -// -// ... no, I had overlooked one race condition. So back to the signal-pipe approach ... - #include "Scheduler.hh" //#include "Scheduler.ih" // Custom includes -#include "../Utils/senfassert.hh" -#include -#include -#include -#include -#include "../Utils/Exception.hh" -#include "../Utils/Backtrace.hh" - -static const int EPollInitialSize = 16; #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// -prefix_ senf::Scheduler::Scheduler() - : files_(0), timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), - eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0) -{ - if (epollFd_<0) - SENF_THROW_SYSTEM_EXCEPTION("::epoll_create()"); - - if (::pipe(sigpipe_) < 0) - SENF_THROW_SYSTEM_EXCEPTION("::pipe()"); - - int flags (::fcntl(sigpipe_[1],F_GETFL)); - if (flags < 0) - SENF_THROW_SYSTEM_EXCEPTION("::fcntl(F_GETFL)"); - flags |= O_NONBLOCK; - if (::fcntl(sigpipe_[1], F_SETFL, flags) < 0) - SENF_THROW_SYSTEM_EXCEPTION("::fcntl(F_SETFL)"); - - ::epoll_event ev; - ::memset(&ev, 0, sizeof(ev)); - ev.events = EV_READ; - ev.data.fd = sigpipe_[0]; - if (::epoll_ctl(epollFd_, EPOLL_CTL_ADD, sigpipe_[0], &ev) < 0) - SENF_THROW_SYSTEM_EXCEPTION("::epoll_ctl(EPOLL_CTL_ADD)"); +namespace { + bool terminate_ (false); + bool running_ (false); } -prefix_ void senf::Scheduler::registerSignal(unsigned signal, SimpleCallback const & cb) +prefix_ void senf::scheduler::terminate() { - ::sigset_t sig; - ::sigemptyset(&sig); - if (::sigaddset(&sig, signal) < 0) - throw InvalidSignalNumberException(); - ::sigprocmask(SIG_BLOCK, &sig, 0); - ::sigaddset(&sigset_, signal); - if (sigHandlers_.size() <= signal) - sigHandlers_.resize(signal+1); - sigHandlers_[signal] = cb; - - registerSigHandlers(); + terminate_ = true; } -prefix_ void senf::Scheduler::unregisterSignal(unsigned signal) +prefix_ void senf::scheduler::yield() { - if (::sigdelset(&sigset_, signal) < 0) - throw InvalidSignalNumberException(); - sigHandlers_[signal] = 0; - ::signal(signal, SIG_DFL); - registerSigHandlers(); + senf::scheduler::detail::FIFORunner::instance().yield(); } -prefix_ void senf::Scheduler::do_add(int fd, FdCallback const & cb, int eventMask) +prefix_ bool senf::scheduler::running() { - if (eventMask == 0) - return; - - FdTable::iterator i (fdTable_.find(fd)); - int action (EPOLL_CTL_MOD); - if (i == fdTable_.end()) { - action = EPOLL_CTL_ADD; - i = fdTable_.insert(std::make_pair(fd, EventSpec())).first; - } - else if (i->second.epollMask() == 0) { - action = EPOLL_CTL_ADD; - fdErase_.erase( std::remove(fdErase_.begin(), fdErase_.end(), unsigned(fd)), - fdErase_.end() ); - } - - if (eventMask & EV_READ) i->second.cb_read = cb; - if (eventMask & EV_PRIO) i->second.cb_prio = cb; - if (eventMask & EV_WRITE) i->second.cb_write = cb; - - epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = i->second.epollMask(); - ev.data.fd = fd; - - for (;;) { - if ( (!i->second.file) && (epoll_ctl(epollFd_, action, fd, &ev) < 0) ) { - switch (errno) { - case EPERM : - // Argh ... epoll does not support ordinary files :-( :-( - i->second.file = true; - ++ files_; - return; - case ENOENT : - action = EPOLL_CTL_ADD; - break; - default: - SENF_THROW_SYSTEM_EXCEPTION("::epoll_ctl()"); - } - } - else - return; - } + return running_; } -prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) +prefix_ senf::ClockService::clock_type senf::scheduler::now() { - if (eventMask == 0) - return; - - FdTable::iterator i (fdTable_.find(fd)); - if (i == fdTable_.end()) - return; - - if (eventMask & EV_READ) i->second.cb_read = 0; - if (eventMask & EV_PRIO) i->second.cb_prio = 0; - if (eventMask & EV_WRITE) i->second.cb_write = 0; - - epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = i->second.epollMask(); - ev.data.fd = fd; + return running() ? eventTime() : ClockService::now(); +} - int action (EPOLL_CTL_MOD); - bool file (i->second.file); - if (ev.events==0) { - action = EPOLL_CTL_DEL; - fdErase_.push_back(fd); - } +namespace { + + // We don't want try { } catch(...) { ... throw; } since that will make debugging more + // difficult: the stack backtrace for an unexpected exception would always end here. + struct SchedulerScopedInit + { + SchedulerScopedInit() + { + senf::scheduler::detail::FIFORunner::instance().startWatchdog(); + senf::scheduler::detail::SignalDispatcher::instance().unblockSignals(); + senf::scheduler::detail::TimerDispatcher::instance().enable(); + running_ = true; + } - if (! file && epoll_ctl(epollFd_, action, fd, &ev) < 0 && errno != ENOENT) - SENF_THROW_SYSTEM_EXCEPTION("::epoll_ctl()"); - if (file) - -- files_; + ~SchedulerScopedInit() + { + senf::scheduler::detail::TimerDispatcher::instance().disable(); + senf::scheduler::detail::SignalDispatcher::instance().blockSignals(); + senf::scheduler::detail::FIFORunner::instance().stopWatchdog(); + running_ = false; + } + }; } -prefix_ void senf::Scheduler::registerSigHandlers() +prefix_ void senf::scheduler::process() { - for (unsigned signal (1); signal < sigHandlers_.size(); ++signal) { - if (sigHandlers_[signal]) { - struct ::sigaction sa; - sa.sa_sigaction = & Scheduler::sigHandler; - sa.sa_mask = sigset_; - sa.sa_flags = SA_SIGINFO; - if (signal == SIGCHLD) - sa.sa_flags |= SA_NOCLDSTOP; - if (::sigaction(signal, &sa, 0) < 0) - SENF_THROW_SYSTEM_EXCEPTION("::sigaction()"); - } + SchedulerScopedInit initScheduler; + terminate_ = false; + running_ = true; + detail::TimerDispatcher::instance().reschedule(); + while(! terminate_ && ! (detail::FdDispatcher::instance().empty() && + detail::TimerDispatcher::instance().empty() && + detail::FileDispatcher::instance().empty())) { + detail::FdManager::instance().processOnce(); + detail::FileDispatcher::instance().prepareRun(); + detail::EventHookDispatcher::instance().prepareRun(); + detail::TimerDispatcher::instance().prepareRun(); + detail::FIFORunner::instance().run(); + detail::TimerDispatcher::instance().reschedule(); } } -prefix_ void senf::Scheduler::sigHandler(int signal, ::siginfo_t * siginfo, void *) +prefix_ void senf::scheduler::restart() { - // This is a bit unsafe. Better write single bytes and place the siginfo into an explicit - // queue. Since signals are only unblocked during epoll_wait, we even wouldn't need to - // synchronize access to that queue any further. - - ::write(instance().sigpipe_[1], siginfo, sizeof(*siginfo)); - - // We ignore errors. The file handle is set to non-blocking IO. If any failure occurs (pipe - // full), the signal will be dropped. That's like kernel signal handling which may also drop - // signals. + detail::FdManager* fdm (&detail::FdManager::instance()); + detail::FIFORunner* ffr (&detail::FIFORunner::instance()); + detail::FdDispatcher* fdd (&detail::FdDispatcher::instance()); + detail::TimerDispatcher* tdd (&detail::TimerDispatcher::instance()); + detail::SignalDispatcher* sdd (&detail::SignalDispatcher::instance()); + detail::FileDispatcher* fld (&detail::FileDispatcher::instance()); + detail::EventHookDispatcher* eed (&detail::EventHookDispatcher::instance()); + + eed->~EventHookDispatcher(); + fld->~FileDispatcher(); + sdd->~SignalDispatcher(); + tdd->~TimerDispatcher(); + fdd->~FdDispatcher(); + ffr->~FIFORunner(); + fdm->~FdManager(); + + new (fdm) detail::FdManager(); + new (ffr) detail::FIFORunner(); + new (fdd) detail::FdDispatcher(); + new (tdd) detail::TimerDispatcher(); + new (sdd) detail::SignalDispatcher(); + new (fld) detail::FileDispatcher(); + new (eed) detail::EventHookDispatcher(); } -prefix_ int senf::Scheduler::EventSpec::epollMask() - const +prefix_ bool senf::scheduler::empty() { - int mask (0); - if (cb_read) mask |= EPOLLIN; - if (cb_prio) mask |= EPOLLPRI; - if (cb_write) mask |= EPOLLOUT; - return mask; + return detail::FdDispatcher::instance().empty() + && detail::TimerDispatcher::instance().empty() + && detail::FileDispatcher::instance().empty() + && detail::SignalDispatcher::instance().empty() + && detail::EventHookDispatcher::instance().empty(); } -prefix_ void senf::Scheduler::process() +prefix_ void senf::scheduler::hiresTimers() { - terminate_ = false; - eventTime_ = ClockService::now(); - while (! terminate_) { - - // Since a callback may have disabled further timers, we need to check for canceled timeouts - // again. - - while (! timerQueue_.empty()) { - TimerMap::iterator i (timerQueue_.top()); - if (! i->second.canceled) - break; - timerMap_.erase(i); - timerQueue_.pop(); - } - - for (FdEraseList::iterator i (fdErase_.begin()); i != fdErase_.end(); ++i) - fdTable_.erase(*i); - fdErase_.clear(); - - int timeout (-1); - if (files_ > 0) - timeout = 0; - else { - if (timerQueue_.empty()) { - if (fdTable_.empty()) - break; - } - else { - ClockService::clock_type delta ( - (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL); - timeout = delta < 0 ? 0 : delta; - } - } - - ///\todo Handle more than one epoll_event per call - struct epoll_event ev; - - if (::sigprocmask(SIG_UNBLOCK, &sigset_, 0) < 0) - SENF_THROW_SYSTEM_EXCEPTION("::sigprocmask(SIG_UNBLOCK)"); - int events (::epoll_wait(epollFd_, &ev, 1, timeout)); - if (::sigprocmask(SIG_BLOCK, &sigset_, 0) < 0) - SENF_THROW_SYSTEM_EXCEPTION("::sigprocmask(SIG_BLOCK)"); - - if (events<0) - if (errno != EINTR) - SENF_THROW_SYSTEM_EXCEPTION("::epoll_wait()"); - - eventTime_ = ClockService::now(); - - // We always run timeout handlers. This is important, even if a file-descriptor is signaled - // since some descriptors (e.g. real files) will *always* be ready and we still may want to - // handle timers. Time handlers are run before file events to not delay them unnecessarily. - - while (! timerQueue_.empty()) { - TimerMap::iterator i (timerQueue_.top()); - if (i->second.canceled) - ; - else if (i->second.timeout <= eventTime_ + eventEarly_) - i->second.cb(); - else - break; - timerQueue_.pop(); - timerMap_.erase(i); - } - - // Check the signal queue - if (events > 0 && ev.data.fd == sigpipe_[0]) { - ::siginfo_t siginfo; - if (::read(sigpipe_[0], &siginfo, sizeof(siginfo)) < int(sizeof(siginfo))) { - // We ignore truncated records which may only occur if the signal - // queue became filled up - SENF_LOG((senf::log::IMPORTANT)("Truncated signal record!")); - continue; - } - if (siginfo.si_signo < int(sigHandlers_.size()) && sigHandlers_[siginfo.si_signo]) - sigHandlers_[siginfo.si_signo](); - continue; - } +#ifdef HAVE_TIMERFD + if (haveScalableHiresTimers()) + detail::TimerDispatcher::instance().timerSource( + std::auto_ptr(new detail::TimerFDTimerSource())); + else +#endif + detail::TimerDispatcher::instance().timerSource( + std::auto_ptr(new detail::POSIXTimerSource())); +} - for (FdTable::iterator i = fdTable_.begin(); i != fdTable_.end(); ++i) { - EventSpec & spec (i->second); +/////////////////////////////////////////////////////////////////////////// +// senf::schedulerLogTimeSource - if (! (spec.file || (events > 0 && i->first == ev.data.fd))) - continue; - - unsigned extraFlags (0); - unsigned mask (spec.file ? spec.epollMask() : ev.events); +prefix_ senf::log::time_type senf::scheduler::LogTimeSource::operator()() + const +{ + return eventTime(); +} - if (mask & EPOLLHUP) extraFlags |= EV_HUP; - if (mask & EPOLLERR) extraFlags |= EV_ERR; +/////////////////////////////////////////////////////////////////////////// +// senf::scheduler::BlockSignals - if (mask & EPOLLIN) { - SENF_ASSERT(spec.cb_read); - spec.cb_read(EventId(EV_READ | extraFlags)); - } - else if (mask & EPOLLPRI) { - SENF_ASSERT(spec.cb_prio); - spec.cb_prio(EventId(EV_PRIO | extraFlags)); - } - else if (mask & EPOLLOUT) { - SENF_ASSERT(spec.cb_write); - spec.cb_write(EventId(EV_WRITE | extraFlags)); - } - else { - // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. - // In this case we will signal all registered callbacks. The callbacks must be - // prepared to be called multiple times if they are registered to more than - // one event. - if (spec.cb_write) - spec.cb_write(EventId(extraFlags)); - if (spec.cb_prio) - spec.cb_prio(EventId(extraFlags)); - if (spec.cb_read) - spec.cb_read(EventId(extraFlags)); - } - } - } +prefix_ senf::scheduler::BlockSignals::BlockSignals(bool initiallyBlocked) + : blocked_ (false) +{ + ::sigfillset(&allSigs_); + if (initiallyBlocked) + block(); } -/////////////////////////////////////////////////////////////////////////// -// senf::SchedulerLogTimeSource +prefix_ void senf::scheduler::BlockSignals::block() +{ + if (blocked_) + return; + ::sigprocmask(SIG_BLOCK, &allSigs_, &savedSigs_); + blocked_ = true; +} -prefix_ senf::log::time_type senf::SchedulerLogTimeSource::operator()() - const +prefix_ void senf::scheduler::BlockSignals::unblock() { - return Scheduler::instance().eventTime(); + if (!blocked_) + return; + ::sigprocmask(SIG_SETMASK, &savedSigs_, 0); + blocked_ = false; } ///////////////////////////////cc.e////////////////////////////////////////