X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=Scheduler%2FScheduler.cc;h=adbb6a6b5f0fe5d3101084f4ee4f081973a23acb;hb=82ad2ed94c12c3e53097fef92978de8c28239fab;hp=01e33c89b6c2390a63c659af273509ceee02d370;hpb=032707d24b1059febe83ce56b11fd79df106c6e2;p=senf.git diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index 01e33c8..adbb6a6 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -1,6 +1,6 @@ // $Id$ // -// Copyright (C) 2006 +// Copyright (C) 2006 // Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) // Kompetenzzentrum fuer Satelitenkommunikation (SatCom) // Stefan Bund @@ -21,16 +21,17 @@ // 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. /** \file + \brief Scheduler non-inline non-template implementation \idea Implement signal handling (See source for more discussion - about this) + about this) \idea Multithreading support: To support multithreading, the static member Scheduler::instance() must return a thread-local value (that is Scheduler::instance() must allocate one Scheduler - instance per thread) - - \fixme Test2 + instance per thread). Another possibility would be to distribute + the async load unto several threads (one scheduler for multiple + threads) */ // Here a basic concept of how to add signal support to the scheduler: @@ -49,7 +50,7 @@ // // call epoll // // block all relevant signals again // } -// +// // // now handle the event // // The signal handler is then simply defined as @@ -64,12 +65,12 @@ // You should use sigaction to register the signal handlers and define // a sa_mask so all Scheduler-registered signals are automatically // *blocked* whenever one of the signals is called (including the -// called signal!). This ensures, that no two signals can be delivered -// on top of each other. And of course any signal registered with the -// scheduler must be blocked as soon as it is registered with the -// scheduler. - -// Definition of non-inline non-template functions +// called signal!) (This also means, we will have to re-register all +// signals if we change the registration of some signal since the +// sa_mask changes). This ensures, that no two signals can be +// delivered on top of each other. And of course any signal registered +// with the scheduler must be blocked as soon as it is registered with +// the scheduler. #include "Scheduler.hh" //#include "Scheduler.ih" @@ -77,27 +78,16 @@ // Custom includes #include #include -#include "Utils/Exception.hh" -#include "Utils/MicroTime.hh" +#include "../Utils/Exception.hh" static const int EPollInitialSize = 16; #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// -prefix_ senf::Scheduler::Scheduler & senf::Scheduler::instance() -{ - static Scheduler instance; - return instance; -} - -prefix_ void senf::Scheduler::timeout(unsigned long timeout, TimerCallback const & cb) -{ - timerQueue_.push(TimerSpec(now()+1000*timeout,cb)); -} - prefix_ senf::Scheduler::Scheduler() - : epollFd_(epoll_create(EPollInitialSize)) + : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), + eventTime_(0) { if (epollFd_<0) throw SystemException(errno); @@ -115,14 +105,12 @@ prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int even if (eventMask & EV_READ) i->second.cb_read = cb; if (eventMask & EV_PRIO) i->second.cb_prio = cb; if (eventMask & EV_WRITE) i->second.cb_write = cb; - if (eventMask & EV_HUP) i->second.cb_hup = cb; - if (eventMask & EV_ERR) i->second.cb_err = cb; epoll_event ev; memset(&ev,0,sizeof(ev)); ev.events = i->second.epollMask(); ev.data.fd = fd; - + if (epoll_ctl(epollFd_, action, fd, &ev)<0) throw SystemException(errno); } @@ -130,20 +118,18 @@ prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int even prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) { FdTable::iterator i (fdTable_.find(fd)); - if (i == fdTable_.end()) + if (i == fdTable_.end()) return; if (eventMask & EV_READ) i->second.cb_read = 0; if (eventMask & EV_PRIO) i->second.cb_prio = 0; if (eventMask & EV_WRITE) i->second.cb_write = 0; - if (eventMask & EV_HUP) i->second.cb_hup = 0; - if (eventMask & EV_ERR) i->second.cb_err = 0; epoll_event ev; memset(&ev,0,sizeof(ev)); ev.events = i->second.epollMask(); ev.data.fd = fd; - + int action (EPOLL_CTL_MOD); if (ev.events==0) { action = EPOLL_CTL_DEL; @@ -162,76 +148,120 @@ prefix_ int senf::Scheduler::EventSpec::epollMask() if (cb_read) mask |= EPOLLIN; if (cb_prio) mask |= EPOLLPRI; if (cb_write) mask |= EPOLLOUT; - if (cb_hup) mask |= EPOLLHUP; - if (cb_err) mask |= EPOLLERR; return mask; } prefix_ void senf::Scheduler::process() { terminate_ = false; + eventTime_ = ClockService::now(); while (! terminate_) { - MicroTime timeNow = now(); - while ( ! timerQueue_.empty() && timerQueue_.top().timeout <= timeNow ) { - timerQueue_.top().cb(); - timerQueue_.pop(); - } - if (terminate_) - return; - int timeout = timerQueue_.empty() ? -1 : int((timerQueue_.top().timeout - timeNow)/1000); - + // Since a callback may have disabled further timers, we need to check for canceled timeouts + // again. + + while (! timerQueue_.empty()) { + TimerMap::iterator i (timerQueue_.top()); + if (! i->second.canceled) + break; + timerMap_.erase(i); + timerQueue_.pop(); + } + + int timeout (-1); + if (timerQueue_.empty()) { + if (fdTable_.empty()) + break; + } + else { + ClockService::clock_type delta ( + (timerQueue_.top()->second.timeout - eventTime_)/1000000UL); + timeout = delta < 0 ? 0 : delta; + } + + ///\todo Handle more than one epoll_event per call struct epoll_event ev; int events = epoll_wait(epollFd_, &ev, 1, timeout); if (events<0) - // Hmm ... man epoll says, it will NOT return with EINTR ?? - throw SystemException(errno); - if (events==0) - // Timeout .. it will be run when reachiung the top of the loop + if (errno != EINTR) + throw SystemException(errno); + + eventTime_ = ClockService::now(); + + // We always run event handlers. This is important, even if a file-descriptor is signaled + // since some descriptors (e.g. real files) will *always* be ready and we still may want to + // handle timers. + // Time handlers are run before file events to not delay them unnecessarily. + + while (! timerQueue_.empty()) { + TimerMap::iterator i (timerQueue_.top()); + if (i->second.canceled) + ; + else if (i->second.timeout <= eventTime_) + i->second.cb(); + else + break; + timerQueue_.pop(); + timerMap_.erase(i); + } + + if (events <= 0) continue; - + FdTable::iterator i = fdTable_.find(ev.data.fd); BOOST_ASSERT (i != fdTable_.end() ); - EventSpec const & spec (i->second); + EventSpec spec (i->second); + + unsigned extraFlags (0); + if (ev.events & EPOLLHUP) extraFlags |= EV_HUP; + if (ev.events & EPOLLERR) extraFlags |= EV_ERR; if (ev.events & EPOLLIN) { - BOOST_ASSERT(spec.cb_read); - spec.cb_read(EV_READ); + BOOST_ASSERT(spec.cb_read); + spec.cb_read(EventId(EV_READ | extraFlags)); } else if (ev.events & EPOLLPRI) { BOOST_ASSERT(spec.cb_prio); - spec.cb_prio(EV_PRIO); + spec.cb_prio(EventId(EV_PRIO | extraFlags)); } else if (ev.events & EPOLLOUT) { BOOST_ASSERT(spec.cb_write); - spec.cb_write(EV_WRITE); + spec.cb_write(EventId(EV_WRITE | extraFlags)); } - - else if (ev.events & EPOLLHUP) { - if (spec.cb_hup) - spec.cb_hup(EV_HUP); - else if (ev.events & EPOLLERR) { - if (spec.cb_write) spec.cb_write(EV_HUP); - if (spec.cb_read) spec.cb_read(EV_HUP); - } - } - else if (ev.events & EPOLLERR && ! ev.events & EPOLLHUP) { - if (spec.cb_err) - spec.cb_err(EV_ERR); - else { - if (spec.cb_write) spec.cb_write(EV_ERR); - if (spec.cb_read) spec.cb_read(EV_ERR); - } + else { + // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. + // In this case we will signal all registered callbacks. The callbacks must be + // prepared to be called multiple times if they are registered to more than + // one event. + if (spec.cb_write) + spec.cb_write(EventId(extraFlags)); + if (spec.cb_prio) + spec.cb_prio(EventId(extraFlags)); + if (spec.cb_read) + spec.cb_read(EventId(extraFlags)); } - } } +/////////////////////////////////////////////////////////////////////////// +// senf::SchedulerLogTimeSource + +prefix_ boost::posix_time::ptime senf::SchedulerLogTimeSource::operator()() + const +{ + return ClockService::abstime(Scheduler::instance().eventTime()); +} + ///////////////////////////////cc.e//////////////////////////////////////// #undef prefix_ // Local Variables: // mode: c++ +// fill-column: 100 // c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// comment-column: 40 // End: