X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=Scheduler%2FScheduler.cc;h=758b980de29c7ac51eddd6d3d4450bbc6f74236a;hb=be33ff96c5b89738694da272d8610564cce48bfb;hp=09971a84e29de57b8df910c748a80316cbeb6bea;hpb=271789888cd1ae6361607616f9f4e6e460e192c4;p=senf.git diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index 09971a8..758b980 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -1,9 +1,9 @@ // $Id$ // // Copyright (C) 2006 -// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) -// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) -// Stefan Bund +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Stefan Bund // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -23,9 +23,6 @@ /** \file \brief Scheduler non-inline non-template implementation - \idea Implement signal handling (See source for more discussion - about this) - \idea Multithreading support: To support multithreading, the static member Scheduler::instance() must return a thread-local value (that is Scheduler::instance() must allocate one Scheduler @@ -36,48 +33,17 @@ // Here a basic concept of how to add signal support to the scheduler: // -// Every signal to be reported by the scheduler will be asigned a -// generic signal handler by the scheduler. This signal handler will -// use longjmp (juck) to report this signal back to the scheduler -// main-loop. -// -// To make this safe, the main-loop will look something like: -// -// int signal = setjmp(jmpBuffer_); -// if (signal == 0) { -// // unblock all signals which are registered with the -// // scheduler -// // call epoll -// // block all relevant signals again -// } -// -// // now handle the event -// -// The signal handler is then simply defined as -// -// static void Scheduler::sigHandler(int signal) -// { -// // make sure to restore the signal handler here if -// // necessary -// longjmp(Scheduler::instance().jmpBuffer_,signal); -// } -// -// You should use sigaction to register the signal handlers and define -// a sa_mask so all Scheduler-registered signals are automatically -// *blocked* whenever one of the signals is called (including the -// called signal!) (This also means, we will have to re-register all -// signals if we change the registration of some signal since the -// sa_mask changes). This ensures, that no two signals can be -// delivered on top of each other. And of course any signal registered -// with the scheduler must be blocked as soon as it is registered with -// the scheduler. +// ... no, I had overlooked one race condition. So back to the signal-pipe approach ... #include "Scheduler.hh" //#include "Scheduler.ih" // Custom includes +#include "../Utils/senfassert.hh" #include #include +#include +#include #include "../Utils/Exception.hh" static const int EPollInitialSize = 16; @@ -86,21 +52,70 @@ static const int EPollInitialSize = 16; ///////////////////////////////cc.p//////////////////////////////////////// prefix_ senf::Scheduler::Scheduler() - : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), - eventTime_(0) + : files_(0), timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), + eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0) { if (epollFd_<0) - throw SystemException(errno); + SENF_THROW_SYSTEM_EXCEPTION(""); + + if (::pipe(sigpipe_) < 0) + SENF_THROW_SYSTEM_EXCEPTION(""); + + int flags (::fcntl(sigpipe_[1],F_GETFL)); + if (flags < 0) + SENF_THROW_SYSTEM_EXCEPTION(""); + flags |= O_NONBLOCK; + if (::fcntl(sigpipe_[1], F_SETFL, flags) < 0) + SENF_THROW_SYSTEM_EXCEPTION(""); + + ::epoll_event ev; + ::memset(&ev, 0, sizeof(ev)); + ev.events = EV_READ; + ev.data.fd = sigpipe_[0]; + if (::epoll_ctl(epollFd_, EPOLL_CTL_ADD, sigpipe_[0], &ev) < 0) + SENF_THROW_SYSTEM_EXCEPTION(""); +} + +prefix_ void senf::Scheduler::registerSignal(unsigned signal, SimpleCallback const & cb) +{ + ::sigset_t sig; + ::sigemptyset(&sig); + if (::sigaddset(&sig, signal) < 0) + throw InvalidSignalNumberException(); + ::sigprocmask(SIG_BLOCK, &sig, 0); + ::sigaddset(&sigset_, signal); + if (sigHandlers_.size() <= signal) + sigHandlers_.resize(signal+1); + sigHandlers_[signal] = cb; + + registerSigHandlers(); } -prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int eventMask) +prefix_ void senf::Scheduler::unregisterSignal(unsigned signal) { + if (::sigdelset(&sigset_, signal) < 0) + throw InvalidSignalNumberException(); + sigHandlers_[signal] = 0; + ::signal(signal, SIG_DFL); + registerSigHandlers(); +} + +prefix_ void senf::Scheduler::do_add(int fd, FdCallback const & cb, int eventMask) +{ + if (eventMask == 0) + return; + FdTable::iterator i (fdTable_.find(fd)); int action (EPOLL_CTL_MOD); if (i == fdTable_.end()) { action = EPOLL_CTL_ADD; i = fdTable_.insert(std::make_pair(fd, EventSpec())).first; } + if (i->second.epollMask() == 0) { + action = EPOLL_CTL_ADD; + fdErase_.erase( std::remove(fdErase_.begin(), fdErase_.end(), unsigned(fd)), + fdErase_.end() ); + } if (eventMask & EV_READ) i->second.cb_read = cb; if (eventMask & EV_PRIO) i->second.cb_prio = cb; @@ -111,12 +126,22 @@ prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int even ev.events = i->second.epollMask(); ev.data.fd = fd; - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); + if (! i->second.file && epoll_ctl(epollFd_, action, fd, &ev) < 0) { + if (errno == EPERM) { + // Argh ... epoll does not support ordinary files :-( :-( + i->second.file = true; + ++ files_; + } + else + SENF_THROW_SYSTEM_EXCEPTION("::epoll_ctl()"); + } } prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) { + if (eventMask == 0) + return; + FdTable::iterator i (fdTable_.find(fd)); if (i == fdTable_.end()) return; @@ -131,15 +156,46 @@ prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) ev.data.fd = fd; int action (EPOLL_CTL_MOD); + bool file (i->second.file); if (ev.events==0) { action = EPOLL_CTL_DEL; - fdTable_.erase(i); + fdErase_.push_back(fd); } - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); + if (! file && epoll_ctl(epollFd_, action, fd, &ev) < 0) + SENF_THROW_SYSTEM_EXCEPTION("::epoll_ctl()"); + if (file) + -- files_; +} + +prefix_ void senf::Scheduler::registerSigHandlers() +{ + for (unsigned signal (1); signal < sigHandlers_.size(); ++signal) { + if (sigHandlers_[signal]) { + struct ::sigaction sa; + sa.sa_sigaction = & Scheduler::sigHandler; + sa.sa_mask = sigset_; + sa.sa_flags = SA_SIGINFO; + if (signal == SIGCHLD) + sa.sa_flags |= SA_NOCLDSTOP; + if (::sigaction(signal, &sa, 0) < 0) + SENF_THROW_SYSTEM_EXCEPTION(""); + } + } } +prefix_ void senf::Scheduler::sigHandler(int signal, ::siginfo_t * siginfo, void *) +{ + // This is a bit unsafe. Better write single bytes and place the siginfo into an explicit + // queue. Since signals are only unblocked during epoll_wait, we even wouldn't need to + // synchronize access to that queue any further. + + ::write(instance().sigpipe_[1], siginfo, sizeof(*siginfo)); + + // We ignore errors. The file handle is set to non-blocking IO. If any failure occurs (pipe + // full), the signal will be dropped. That's like kernel signal handling which may also drop + // signals. +} prefix_ int senf::Scheduler::EventSpec::epollMask() const @@ -168,36 +224,47 @@ prefix_ void senf::Scheduler::process() timerQueue_.pop(); } + for (FdEraseList::iterator i (fdErase_.begin()); i != fdErase_.end(); ++i) + fdTable_.erase(*i); + fdErase_.clear(); + int timeout (-1); - if (timerQueue_.empty()) { - if (fdTable_.empty()) - break; - } + if (files_ > 0) + timeout = 0; else { - ClockService::clock_type delta ( - (timerQueue_.top()->second.timeout - eventTime_)/1000000UL); - timeout = delta < 0 ? 0 : delta; + if (timerQueue_.empty()) { + if (fdTable_.empty()) + break; + } + else { + ClockService::clock_type delta ( + (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL); + timeout = delta < 0 ? 0 : delta; + } } ///\todo Handle more than one epoll_event per call struct epoll_event ev; - int events = epoll_wait(epollFd_, &ev, 1, timeout); + + ::sigprocmask(SIG_UNBLOCK, &sigset_, 0); + int events (epoll_wait(epollFd_, &ev, 1, timeout)); + ::sigprocmask(SIG_BLOCK, &sigset_, 0); + if (events<0) if (errno != EINTR) - throw SystemException(errno); + SENF_THROW_SYSTEM_EXCEPTION(""); eventTime_ = ClockService::now(); - // We always run event handlers. This is important, even if a file-descriptor is signaled + // We always run timeout handlers. This is important, even if a file-descriptor is signaled // since some descriptors (e.g. real files) will *always* be ready and we still may want to - // handle timers. - // Time handlers are run before file events to not delay them unnecessarily. + // handle timers. Time handlers are run before file events to not delay them unnecessarily. while (! timerQueue_.empty()) { TimerMap::iterator i (timerQueue_.top()); if (i->second.canceled) ; - else if (i->second.timeout <= eventTime_) + else if (i->second.timeout <= eventTime_ + eventEarly_) i->second.cb(); else break; @@ -205,44 +272,69 @@ prefix_ void senf::Scheduler::process() timerMap_.erase(i); } - if (events <= 0) + // Check the signal queue + if (events > 0 && ev.data.fd == sigpipe_[0]) { + ::siginfo_t siginfo; + if (::read(sigpipe_[0], &siginfo, sizeof(siginfo)) < int(sizeof(siginfo))) { + // We ignore truncated records which may only occur if the signal + // queue became filled up + SENF_LOG((senf::log::IMPORTANT)("Truncated signal record!")); + continue; + } + if (siginfo.si_signo < int(sigHandlers_.size()) && sigHandlers_[siginfo.si_signo]) + sigHandlers_[siginfo.si_signo](); continue; - - FdTable::iterator i = fdTable_.find(ev.data.fd); - BOOST_ASSERT (i != fdTable_.end() ); - EventSpec spec (i->second); - - unsigned extraFlags (0); - if (ev.events & EPOLLHUP) extraFlags |= EV_HUP; - if (ev.events & EPOLLERR) extraFlags |= EV_ERR; - - if (ev.events & EPOLLIN) { - BOOST_ASSERT(spec.cb_read); - spec.cb_read(EventId(EV_READ | extraFlags)); - } - else if (ev.events & EPOLLPRI) { - BOOST_ASSERT(spec.cb_prio); - spec.cb_prio(EventId(EV_PRIO | extraFlags)); } - else if (ev.events & EPOLLOUT) { - BOOST_ASSERT(spec.cb_write); - spec.cb_write(EventId(EV_WRITE | extraFlags)); - } - else { - // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. - // In this case we will signal all registered callbacks. The callbacks must be - // prepared to be called multiple times if they are registered to more than - // one event. - if (spec.cb_write) - spec.cb_write(EventId(extraFlags)); - if (spec.cb_prio) - spec.cb_prio(EventId(extraFlags)); - if (spec.cb_read) - spec.cb_read(EventId(extraFlags)); + + for (FdTable::iterator i = fdTable_.begin(); i != fdTable_.end(); ++i) { + EventSpec & spec (i->second); + + if (! (spec.file || (events > 0 && i->first == ev.data.fd))) + continue; + + unsigned extraFlags (0); + unsigned mask (spec.file ? spec.epollMask() : ev.events); + + if (mask & EPOLLHUP) extraFlags |= EV_HUP; + if (mask & EPOLLERR) extraFlags |= EV_ERR; + + if (mask & EPOLLIN) { + SENF_ASSERT(spec.cb_read); + spec.cb_read(EventId(EV_READ | extraFlags)); + } + else if (mask & EPOLLPRI) { + SENF_ASSERT(spec.cb_prio); + spec.cb_prio(EventId(EV_PRIO | extraFlags)); + } + else if (mask & EPOLLOUT) { + SENF_ASSERT(spec.cb_write); + spec.cb_write(EventId(EV_WRITE | extraFlags)); + } + else { + // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. + // In this case we will signal all registered callbacks. The callbacks must be + // prepared to be called multiple times if they are registered to more than + // one event. + if (spec.cb_write) + spec.cb_write(EventId(extraFlags)); + if (spec.cb_prio) + spec.cb_prio(EventId(extraFlags)); + if (spec.cb_read) + spec.cb_read(EventId(extraFlags)); + } } } } +/////////////////////////////////////////////////////////////////////////// +// senf::SchedulerLogTimeSource + +prefix_ boost::posix_time::ptime senf::SchedulerLogTimeSource::operator()() + const +{ + return ClockService::abstime(Scheduler::instance().eventTime()); +} + ///////////////////////////////cc.e//////////////////////////////////////// #undef prefix_