X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=Scheduler%2FScheduler.cc;h=9f222bfd789304d1f8347afb8c41e3fd62e10f1a;hb=17e24d84603667395e9ffa786a9cdbb722bf9c1f;hp=adbb6a6b5f0fe5d3101084f4ee4f081973a23acb;hpb=82ad2ed94c12c3e53097fef92978de8c28239fab;p=senf.git diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index adbb6a6..9f222bf 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -36,41 +36,7 @@ // Here a basic concept of how to add signal support to the scheduler: // -// Every signal to be reported by the scheduler will be asigned a -// generic signal handler by the scheduler. This signal handler will -// use longjmp (juck) to report this signal back to the scheduler -// main-loop. -// -// To make this safe, the main-loop will look something like: -// -// int signal = setjmp(jmpBuffer_); -// if (signal == 0) { -// // unblock all signals which are registered with the -// // scheduler -// // call epoll -// // block all relevant signals again -// } -// -// // now handle the event -// -// The signal handler is then simply defined as -// -// static void Scheduler::sigHandler(int signal) -// { -// // make sure to restore the signal handler here if -// // necessary -// longjmp(Scheduler::instance().jmpBuffer_,signal); -// } -// -// You should use sigaction to register the signal handlers and define -// a sa_mask so all Scheduler-registered signals are automatically -// *blocked* whenever one of the signals is called (including the -// called signal!) (This also means, we will have to re-register all -// signals if we change the registration of some signal since the -// sa_mask changes). This ensures, that no two signals can be -// delivered on top of each other. And of course any signal registered -// with the scheduler must be blocked as soon as it is registered with -// the scheduler. +// ... no, I had overlooked one race condition. So back to the signal-pipe approach ... #include "Scheduler.hh" //#include "Scheduler.ih" @@ -78,6 +44,8 @@ // Custom includes #include #include +#include +#include #include "../Utils/Exception.hh" static const int EPollInitialSize = 16; @@ -87,13 +55,54 @@ static const int EPollInitialSize = 16; prefix_ senf::Scheduler::Scheduler() : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), - eventTime_(0) + eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0) { if (epollFd_<0) throw SystemException(errno); + + if (::pipe(sigpipe_) < 0) + throw SystemException(errno); + + int flags (::fcntl(sigpipe_[1],F_GETFL)); + if (flags < 0) + throw SystemException(errno); + flags |= O_NONBLOCK; + if (::fcntl(sigpipe_[1], F_SETFL, flags) < 0) + throw SystemException(errno); + + ::epoll_event ev; + ::memset(&ev, 0, sizeof(ev)); + ev.events = EV_READ; + ev.data.fd = sigpipe_[0]; + if (::epoll_ctl(epollFd_, EPOLL_CTL_ADD, sigpipe_[0], &ev) < 0) + throw SystemException(errno); +} + +prefix_ void senf::Scheduler::registerSignal(unsigned signal, SimpleCallback const & cb) +{ + ::sigset_t sig; + ::sigemptyset(&sig); + if (::sigaddset(&sig, signal) < 0) + throw InvalidSignalNumberException(); + ::sigprocmask(SIG_BLOCK, &sig, 0); + ::sigaddset(&sigset_, signal); + if (sigHandlers_.size() <= signal) + sigHandlers_.resize(signal+1); + sigHandlers_[signal] = cb; + + registerSigHandlers(); } -prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int eventMask) +prefix_ void senf::Scheduler::unregisterSignal(unsigned signal) +{ + if (::sigdelset(&sigset_, signal) < 0) + throw InvalidSignalNumberException(); + sigHandlers_[signal] = 0; + ::signal(signal, SIG_DFL); + registerSigHandlers(); +} + +prefix_ void senf::Scheduler::do_add(int fd, FdCallback const & cb, int eventMask) { FdTable::iterator i (fdTable_.find(fd)); int action (EPOLL_CTL_MOD); @@ -140,6 +149,33 @@ prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) throw SystemException(errno); } +prefix_ void senf::Scheduler::registerSigHandlers() +{ + for (unsigned signal; signal < sigHandlers_.size(); ++signal) + if (sigHandlers_[signal]) { + struct ::sigaction sa; + sa.sa_sigaction = & Scheduler::sigHandler; + sa.sa_mask = sigset_; + sa.sa_flags = SA_SIGINFO; + if (signal == SIGCHLD) + sa.sa_flags |= SA_NOCLDSTOP; + if (::sigaction(signal, &sa, 0) < 0) + throw SystemException(errno); + } +} + +prefix_ void senf::Scheduler::sigHandler(int signal, ::siginfo_t * siginfo, void *) +{ + // This is a bit unsafe. Better write single bytes and place the siginfo into an explicit + // queue. Since signals are only unblocked during epoll_wait, we even wouldn't need to + // synchronize access to that queue any further. + + ::write(instance().sigpipe_[1], siginfo, sizeof(*siginfo)); + + // We ignore errors. The file handle is set to non-blocking IO. If any failure occurs (pipe + // full), the signal will be dropped. That's like kernel signal handling which may also drop + // signals. +} prefix_ int senf::Scheduler::EventSpec::epollMask() const @@ -175,29 +211,32 @@ prefix_ void senf::Scheduler::process() } else { ClockService::clock_type delta ( - (timerQueue_.top()->second.timeout - eventTime_)/1000000UL); + (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL); timeout = delta < 0 ? 0 : delta; } ///\todo Handle more than one epoll_event per call struct epoll_event ev; - int events = epoll_wait(epollFd_, &ev, 1, timeout); + + ::sigprocmask(SIG_UNBLOCK, &sigset_, 0); + int events (epoll_wait(epollFd_, &ev, 1, timeout)); + ::sigprocmask(SIG_BLOCK, &sigset_, 0); + if (events<0) if (errno != EINTR) throw SystemException(errno); eventTime_ = ClockService::now(); - // We always run event handlers. This is important, even if a file-descriptor is signaled + // We always run timeout handlers. This is important, even if a file-descriptor is signaled // since some descriptors (e.g. real files) will *always* be ready and we still may want to - // handle timers. - // Time handlers are run before file events to not delay them unnecessarily. + // handle timers. Time handlers are run before file events to not delay them unnecessarily. while (! timerQueue_.empty()) { TimerMap::iterator i (timerQueue_.top()); if (i->second.canceled) ; - else if (i->second.timeout <= eventTime_) + else if (i->second.timeout <= eventTime_ + eventEarly_) i->second.cb(); else break; @@ -208,6 +247,20 @@ prefix_ void senf::Scheduler::process() if (events <= 0) continue; + // Check the signal queue + if (ev.data.fd == sigpipe_[0]) { + ::siginfo_t siginfo; + if (::read(sigpipe_[0], &siginfo, sizeof(siginfo)) < int(sizeof(siginfo))) { + // We ignore truncated records which may only occur if the signal + // queue became filled up + SENF_LOG((senf::log::IMPORTANT)("Truncated signal record!")); + continue; + } + if (siginfo.si_signo < int(sigHandlers_.size()) && sigHandlers_[siginfo.si_signo]) + sigHandlers_[siginfo.si_signo](); + continue; + } + FdTable::iterator i = fdTable_.find(ev.data.fd); BOOST_ASSERT (i != fdTable_.end() ); EventSpec spec (i->second);