X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=Scheduler%2FScheduler.cc;h=1d0a10775a99d4920e70224ca1b1a21a77c29da7;hb=09010bdcf81888480d4d481a523f9714a89f2625;hp=5285fa7da8b2e77fd443843830e7b23d8619b80a;hpb=3d16600678b948ff3bd0e4fd2a1a800fcc629a03;p=senf.git diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index 5285fa7..1d0a107 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -1,9 +1,9 @@ // $Id$ // // Copyright (C) 2006 -// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) -// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) -// Stefan Bund +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Stefan Bund // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -23,9 +23,6 @@ /** \file \brief Scheduler non-inline non-template implementation - \idea Implement signal handling (See source for more discussion - about this) - \idea Multithreading support: To support multithreading, the static member Scheduler::instance() must return a thread-local value (that is Scheduler::instance() must allocate one Scheduler @@ -34,195 +31,62 @@ threads) */ -// Here a basic concept of how to add signal support to the scheduler: -// -// Every signal to be reported by the scheduler will be asigned a -// generic signal handler by the scheduler. This signal handler will -// use longjmp (juck) to report this signal back to the scheduler -// main-loop. -// -// To make this safe, the main-loop will look something like: -// -// int signal = setjmp(jmpBuffer_); -// if (signal == 0) { -// // unblock all signals which are registered with the -// // scheduler -// // call epoll -// // block all relevant signals again -// } -// -// // now handle the event -// -// The signal handler is then simply defined as -// -// static void Scheduler::sigHandler(int signal) -// { -// // make sure to restore the signal handler here if -// // necessary -// longjmp(Scheduler::instance().jmpBuffer_,signal); -// } -// -// You should use sigaction to register the signal handlers and define -// a sa_mask so all Scheduler-registered signals are automatically -// *blocked* whenever one of the signals is called (including the -// called signal!) (This also means, we will have to re-register all -// signals if we change the registration of some signal since the -// sa_mask changes). This ensures, that no two signals can be -// delivered on top of each other. And of course any signal registered -// with the scheduler must be blocked as soon as it is registered with -// the scheduler. - #include "Scheduler.hh" //#include "Scheduler.ih" // Custom includes -#include -#include -#include "Utils/Exception.hh" - -static const int EPollInitialSize = 16; +#include "SignalEvent.hh" #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// -prefix_ senf::Scheduler::Scheduler() - : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), - eventTime_(0) -{ - if (epollFd_<0) - throw SystemException(errno); -} - -prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int eventMask) +prefix_ void senf::Scheduler::process() { - FdTable::iterator i (fdTable_.find(fd)); - int action (EPOLL_CTL_MOD); - if (i == fdTable_.end()) { - action = EPOLL_CTL_ADD; - i = fdTable_.insert(std::make_pair(fd, EventSpec())).first; + terminate_ = false; + while(! terminate_ && ! (fdDispatcher_.empty() && + scheduler::detail::TimerDispatcher::instance().empty() && + fileDispatcher_.empty())) { + scheduler::detail::SignalDispatcher::instance().unblockSignals(); + scheduler::detail::TimerDispatcher::instance().unblockSignals(); + scheduler::FdManager::instance().processOnce(); + scheduler::detail::TimerDispatcher::instance().blockSignals(); + scheduler::detail::SignalDispatcher::instance().blockSignals(); + fileDispatcher_.prepareRun(); + scheduler::FIFORunner::instance().run(); } - - if (eventMask & EV_READ) i->second.cb_read = cb; - if (eventMask & EV_PRIO) i->second.cb_prio = cb; - if (eventMask & EV_WRITE) i->second.cb_write = cb; - - epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = i->second.epollMask(); - ev.data.fd = fd; - - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); } -prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) +prefix_ void senf::Scheduler::restart() { - FdTable::iterator i (fdTable_.find(fd)); - if (i == fdTable_.end()) - return; - - if (eventMask & EV_READ) i->second.cb_read = 0; - if (eventMask & EV_PRIO) i->second.cb_prio = 0; - if (eventMask & EV_WRITE) i->second.cb_write = 0; - - epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = i->second.epollMask(); - ev.data.fd = fd; - - int action (EPOLL_CTL_MOD); - if (ev.events==0) { - action = EPOLL_CTL_DEL; - fdTable_.erase(i); - } - - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); + scheduler::FdManager* fdm (&scheduler::FdManager::instance()); + scheduler::FIFORunner* ffr (&scheduler::FIFORunner::instance()); + scheduler::FdDispatcher* fdd (&fdDispatcher_); + scheduler::detail::TimerDispatcher* td (&scheduler::detail::TimerDispatcher::instance()); + scheduler::detail::SignalDispatcher* sd (&scheduler::detail::SignalDispatcher::instance()); + scheduler::FileDispatcher* fld (&fileDispatcher_); + + fld->~FileDispatcher(); + sd->~SignalDispatcher(); + td->~TimerDispatcher(); + fdd->~FdDispatcher(); + ffr->~FIFORunner(); + fdm->~FdManager(); + + new (fdm) scheduler::FdManager(); + new (ffr) scheduler::FIFORunner(); + new (fdd) scheduler::FdDispatcher(*fdm, *ffr); + new (td) scheduler::detail::TimerDispatcher(); + new (sd) scheduler::detail::SignalDispatcher(); + new (fld) scheduler::FileDispatcher(*fdm, *ffr); } +/////////////////////////////////////////////////////////////////////////// +// senf::SchedulerLogTimeSource -prefix_ int senf::Scheduler::EventSpec::epollMask() +prefix_ senf::log::time_type senf::SchedulerLogTimeSource::operator()() const { - int mask (0); - if (cb_read) mask |= EPOLLIN; - if (cb_prio) mask |= EPOLLPRI; - if (cb_write) mask |= EPOLLOUT; - return mask; -} - -prefix_ void senf::Scheduler::process() -{ - terminate_ = false; - eventTime_ = ClockService::now(); - while (! terminate_) { - while ( ! timerQueue_.empty() && timerQueue_.top()->second.timeout <= eventTime_ ) { - TimerMap::iterator i (timerQueue_.top()); - if (! i->second.canceled) - i->second.cb(); - timerMap_.erase(i); - timerQueue_.pop(); - } - - if (terminate_) - return; - - int timeout (MinTimeout); - if (! timerQueue_.empty()) { - ClockService::clock_type delta ( - (timerQueue_.top()->second.timeout - eventTime_)/1000000UL); - if (deltasecond); - - unsigned extraFlags (0); - if (ev.events & EPOLLHUP) extraFlags |= EV_HUP; - if (ev.events & EPOLLERR) extraFlags |= EV_ERR; - - if (ev.events & EPOLLIN) { - BOOST_ASSERT(spec.cb_read); - spec.cb_read(EventId(EV_READ | extraFlags)); - } - else if (ev.events & EPOLLPRI) { - BOOST_ASSERT(spec.cb_prio); - spec.cb_prio(EventId(EV_PRIO | extraFlags)); - } - else if (ev.events & EPOLLOUT) { - BOOST_ASSERT(spec.cb_write); - spec.cb_write(EventId(EV_WRITE | extraFlags)); - } - else { - // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. - // In this case we will signal all registered callbacks. The callbacks must be - // prepared to be called multiple times if they are registered to more than - // one event. - if (spec.cb_write) - spec.cb_write(EventId(extraFlags)); - if (spec.cb_prio) - spec.cb_prio(EventId(extraFlags)); - if (spec.cb_read) - spec.cb_read(EventId(extraFlags)); - } - } + return Scheduler::instance().eventTime(); } ///////////////////////////////cc.e////////////////////////////////////////