X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=Scheduler%2FScheduler.cc;h=9ea0e481ea8f57df5f98fe04ae167e537c557761;hb=532240d72e09e19e57fac9bb55c2560b9c9e5b97;hp=fbc917d971843920f9bb8724d3907e865d074266;hpb=15c5e1939d77dfea97da38df7526bcb84a53460b;p=senf.git diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index fbc917d..9ea0e48 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -1,9 +1,9 @@ // $Id$ // -// Copyright (C) 2006 -// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) -// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) -// Stefan Bund +// Copyright (C) 2006 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Stefan Bund // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -21,214 +21,168 @@ // 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. /** \file - - \idea Implement signal handling (See source for more discussion - about this) + \brief Scheduler non-inline non-template implementation \idea Multithreading support: To support multithreading, the static member Scheduler::instance() must return a thread-local value (that is Scheduler::instance() must allocate one Scheduler - instance per thread) + instance per thread). Another possibility would be to distribute + the async load unto several threads (one scheduler for multiple + threads) */ -// Here a basic concept of how to add signal support to the scheduler: -// -// Every signal to be reported by the scheduler will be asigned a -// generic signal handler by the scheduler. This signal handler will -// use longjmp (juck) to report this signal back to the scheduler -// main-loop. -// -// To make this safe, the main-loop will look something like: -// -// int signal = setjmp(jmpBuffer_); -// if (signal == 0) { -// // unblock all signals which are registered with the -// // scheduler -// // call epoll -// // block all relevant signals again -// } -// -// // now handle the event -// -// The signal handler is then simply defined as -// -// static void Scheduler::sigHandler(int signal) -// { -// // make sure to restore the signal handler here if -// // necessary -// longjmp(Scheduler::instance().jmpBuffer_,signal); -// } -// -// You should use sigaction to register the signal handlers and define -// a sa_mask so all Scheduler-registered signals are automatically -// *blocked* whenever one of the signals is called (including the -// called signal!) (This also means, we will have to re-register all -// signals if we change the registration of some signal since the -// sa_mask changes). This ensures, that no two signals can be -// delivered on top of each other. And of course any signal registered -// with the scheduler must be blocked as soon as it is registered with -// the scheduler. - -// Definition of non-inline non-template functions - #include "Scheduler.hh" //#include "Scheduler.ih" // Custom includes -#include -#include -#include "Utils/Exception.hh" -#include "Utils/MicroTime.hh" - -static const int EPollInitialSize = 16; #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// -prefix_ senf::Scheduler::Scheduler & senf::Scheduler::instance() +namespace { + bool terminate_ (false); + bool running_ (false); +} + +prefix_ void senf::scheduler::terminate() { - static Scheduler instance; - return instance; + terminate_ = true; } -prefix_ void senf::Scheduler::timeout(unsigned long timeout, TimerCallback const & cb) +prefix_ bool senf::scheduler::running() { - timerQueue_.push(TimerSpec(now()+1000*timeout,cb)); + return running_; } -prefix_ senf::Scheduler::Scheduler() - : epollFd_(epoll_create(EPollInitialSize)) +prefix_ senf::ClockService::clock_type senf::scheduler::now() { - if (epollFd_<0) - throw SystemException(errno); + return running() ? eventTime() : ClockService::now(); +} + +namespace { + + // We don't want try { } catch(...) { ... throw; } since that will make debugging more + // difficult: the stack backtrace for an unexpected exception would always end here. + struct SchedulerScopedInit + { + SchedulerScopedInit() + { + senf::scheduler::detail::FIFORunner::instance().startWatchdog(); + senf::scheduler::detail::SignalDispatcher::instance().unblockSignals(); + senf::scheduler::detail::TimerDispatcher::instance().enable(); + running_ = true; + } + + ~SchedulerScopedInit() + { + senf::scheduler::detail::TimerDispatcher::instance().disable(); + senf::scheduler::detail::SignalDispatcher::instance().blockSignals(); + senf::scheduler::detail::FIFORunner::instance().stopWatchdog(); + running_ = false; + } + }; } -prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int eventMask) +prefix_ void senf::scheduler::process() { - FdTable::iterator i (fdTable_.find(fd)); - int action (EPOLL_CTL_MOD); - if (i == fdTable_.end()) { - action = EPOLL_CTL_ADD; - i = fdTable_.insert(std::make_pair(fd, EventSpec())).first; + SchedulerScopedInit initScheduler; + terminate_ = false; + running_ = true; + detail::TimerDispatcher::instance().reschedule(); + while(! terminate_ && ! (detail::FdDispatcher::instance().empty() && + detail::TimerDispatcher::instance().empty() && + detail::FileDispatcher::instance().empty())) { + detail::FdManager::instance().processOnce(); + detail::FileDispatcher::instance().prepareRun(); + detail::EventHookDispatcher::instance().prepareRun(); + detail::TimerDispatcher::instance().prepareRun(); + detail::FIFORunner::instance().run(); + detail::TimerDispatcher::instance().reschedule(); } +} - if (eventMask & EV_READ) i->second.cb_read = cb; - if (eventMask & EV_PRIO) i->second.cb_prio = cb; - if (eventMask & EV_WRITE) i->second.cb_write = cb; - if (eventMask & EV_HUP) i->second.cb_hup = cb; - if (eventMask & EV_ERR) i->second.cb_err = cb; - - epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = i->second.epollMask(); - ev.data.fd = fd; +prefix_ void senf::scheduler::restart() +{ + detail::FdManager* fdm (&detail::FdManager::instance()); + detail::FIFORunner* ffr (&detail::FIFORunner::instance()); + detail::FdDispatcher* fdd (&detail::FdDispatcher::instance()); + detail::TimerDispatcher* tdd (&detail::TimerDispatcher::instance()); + detail::SignalDispatcher* sdd (&detail::SignalDispatcher::instance()); + detail::FileDispatcher* fld (&detail::FileDispatcher::instance()); + detail::EventHookDispatcher* eed (&detail::EventHookDispatcher::instance()); + + eed->~EventHookDispatcher(); + fld->~FileDispatcher(); + sdd->~SignalDispatcher(); + tdd->~TimerDispatcher(); + fdd->~FdDispatcher(); + ffr->~FIFORunner(); + fdm->~FdManager(); - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); + new (fdm) detail::FdManager(); + new (ffr) detail::FIFORunner(); + new (fdd) detail::FdDispatcher(); + new (tdd) detail::TimerDispatcher(); + new (sdd) detail::SignalDispatcher(); + new (fld) detail::FileDispatcher(); + new (eed) detail::EventHookDispatcher(); } -prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) +prefix_ bool senf::scheduler::empty() { - FdTable::iterator i (fdTable_.find(fd)); - if (i == fdTable_.end()) - return; + return detail::FdDispatcher::instance().empty() + && detail::TimerDispatcher::instance().empty() + && detail::FileDispatcher::instance().empty() + && detail::SignalDispatcher::instance().empty() + && detail::EventHookDispatcher::instance().empty(); +} - if (eventMask & EV_READ) i->second.cb_read = 0; - if (eventMask & EV_PRIO) i->second.cb_prio = 0; - if (eventMask & EV_WRITE) i->second.cb_write = 0; - if (eventMask & EV_HUP) i->second.cb_hup = 0; - if (eventMask & EV_ERR) i->second.cb_err = 0; +prefix_ void senf::scheduler::hiresTimers() +{ +#ifdef HAVE_TIMERFD + if (haveScalableHiresTimers()) + detail::TimerDispatcher::instance().timerSource( + std::auto_ptr(new detail::TimerFDTimerSource())); + else +#endif + detail::TimerDispatcher::instance().timerSource( + std::auto_ptr(new detail::POSIXTimerSource())); +} - epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = i->second.epollMask(); - ev.data.fd = fd; - - int action (EPOLL_CTL_MOD); - if (ev.events==0) { - action = EPOLL_CTL_DEL; - fdTable_.erase(i); - } +/////////////////////////////////////////////////////////////////////////// +// senf::schedulerLogTimeSource - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); +prefix_ senf::log::time_type senf::scheduler::LogTimeSource::operator()() + const +{ + return eventTime(); } +/////////////////////////////////////////////////////////////////////////// +// senf::scheduler::BlockSignals -prefix_ int senf::Scheduler::EventSpec::epollMask() - const +prefix_ senf::scheduler::BlockSignals::BlockSignals(bool initiallyBlocked) + : blocked_ (false) { - int mask (0); - if (cb_read) mask |= EPOLLIN; - if (cb_prio) mask |= EPOLLPRI; - if (cb_write) mask |= EPOLLOUT; - if (cb_hup) mask |= EPOLLHUP; - if (cb_err) mask |= EPOLLERR; - return mask; + ::sigfillset(&allSigs_); + if (initiallyBlocked) + block(); } -prefix_ void senf::Scheduler::process() +prefix_ void senf::scheduler::BlockSignals::block() { - terminate_ = false; - while (! terminate_) { - - MicroTime timeNow = now(); - while ( ! timerQueue_.empty() && timerQueue_.top().timeout <= timeNow ) { - timerQueue_.top().cb(); - timerQueue_.pop(); - } - if (terminate_) - return; - int timeout = timerQueue_.empty() ? -1 : int((timerQueue_.top().timeout - timeNow)/1000); - - struct epoll_event ev; - int events = epoll_wait(epollFd_, &ev, 1, timeout); - if (events<0) - // Hmm ... man epoll says, it will NOT return with EINTR ?? - throw SystemException(errno); - if (events==0) - // Timeout .. it will be run when reachiung the top of the loop - continue; - - FdTable::iterator i = fdTable_.find(ev.data.fd); - BOOST_ASSERT (i != fdTable_.end() ); - EventSpec const & spec (i->second); - - if (ev.events & EPOLLIN) { - BOOST_ASSERT(spec.cb_read); - spec.cb_read(EV_READ); - } - else if (ev.events & EPOLLPRI) { - BOOST_ASSERT(spec.cb_prio); - spec.cb_prio(EV_PRIO); - } - else if (ev.events & EPOLLOUT) { - BOOST_ASSERT(spec.cb_write); - spec.cb_write(EV_WRITE); - } - - else if (ev.events & EPOLLHUP) { - if (spec.cb_hup) - spec.cb_hup(EV_HUP); - else if (ev.events & EPOLLERR) { - /// \fixme This is stupid, if cb_write and cb_read are - /// the same. The same below. We really have to - /// exactly define sane semantics of what to do on - /// EPOLLHUP and EPOLLERR. - if (spec.cb_write) spec.cb_write(EV_HUP); - if (spec.cb_read) spec.cb_read(EV_HUP); - } - } - else if (ev.events & EPOLLERR && ! ev.events & EPOLLHUP) { - if (spec.cb_err) - spec.cb_err(EV_ERR); - else { - if (spec.cb_write) spec.cb_write(EV_ERR); - if (spec.cb_read) spec.cb_read(EV_ERR); - } - } + if (blocked_) + return; + ::sigprocmask(SIG_BLOCK, &allSigs_, &savedSigs_); + blocked_ = true; +} - } +prefix_ void senf::scheduler::BlockSignals::unblock() +{ + if (!blocked_) + return; + ::sigprocmask(SIG_SETMASK, &savedSigs_, 0); + blocked_ = false; } ///////////////////////////////cc.e//////////////////////////////////////// @@ -237,5 +191,10 @@ prefix_ void senf::Scheduler::process() // Local Variables: // mode: c++ +// fill-column: 100 // c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// comment-column: 40 // End: