From: g0dil Date: Mon, 5 Nov 2007 15:37:01 +0000 (+0000) Subject: Scheduler: Add POSIX/UNIX signal support X-Git-Url: http://g0dil.de/git?a=commitdiff_plain;h=e67faa588ac375105c0a6d12c79ef93d5499bcab;p=senf.git Scheduler: Add POSIX/UNIX signal support Scheduler: Simplify callback parameter (remove mostly ignored callback handle parameter) Scheduler: Add timeoutEarly and timeoutAdjust parameters git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@494 270642c3-0616-0410-b53a-bc976706d245 --- diff --git a/Examples/DVBAdapter/MPEdec.cc b/Examples/DVBAdapter/MPEdec.cc index b32417e..9abf13c 100644 --- a/Examples/DVBAdapter/MPEdec.cc +++ b/Examples/DVBAdapter/MPEdec.cc @@ -65,7 +65,7 @@ public: } private: - void dumpSection(senf::FileHandle /* ignored */, senf::Scheduler::EventId event) + void dumpSection(senf::Scheduler::EventId event) { std::string data (handle.read()); senf::DatagramSection section (senf::DatagramSection::create(data)); @@ -98,6 +98,6 @@ int main(int argc, char const * argv[]) // c-file-style: "senf" // indent-tabs-mode: nil // ispell-local-dictionary: "american" -// compile-command: "scons -u test" +// compile-command: "scons -u" // comment-column: 40 // End: diff --git a/Examples/DVBAdapter/ULEdec.cc b/Examples/DVBAdapter/ULEdec.cc index 395688c..8a00a30 100644 --- a/Examples/DVBAdapter/ULEdec.cc +++ b/Examples/DVBAdapter/ULEdec.cc @@ -59,7 +59,7 @@ ULEdec::ULEdec(unsigned short adapter, unsigned short device) this->priv_sndu_type_1 = false; } -void ULEdec::handleEvent(senf::FileHandle, senf::Scheduler::EventId event) +void ULEdec::handleEvent(senf::Scheduler::EventId event) { senf::TransportPacket ts_packet ( senf::TransportPacket::create(188, senf::TransportPacket::noinit)); diff --git a/Examples/DVBAdapter/ULEdec.hh b/Examples/DVBAdapter/ULEdec.hh index c6dc738..64c9867 100644 --- a/Examples/DVBAdapter/ULEdec.hh +++ b/Examples/DVBAdapter/ULEdec.hh @@ -51,7 +51,7 @@ private: bool priv_sndu_type_1; iterator snduPacketData_iter; - void handleEvent(senf::FileHandle, senf::Scheduler::EventId event); + void handleEvent(senf::Scheduler::EventId event); void handleTSPacket(senf::TransportPacket tsPacket); void handleSNDUPacket(); @@ -82,6 +82,6 @@ struct ULEdecException : public std::exception // c-file-style: "senf" // indent-tabs-mode: nil // ispell-local-dictionary: "american" -// compile-command: "scons -u test" +// compile-command: "scons -u" // comment-column: 40 // End: diff --git a/Examples/MCSniffer/MCSniffer.cc b/Examples/MCSniffer/MCSniffer.cc index 252734d..0cdf72c 100644 --- a/Examples/MCSniffer/MCSniffer.cc +++ b/Examples/MCSniffer/MCSniffer.cc @@ -57,7 +57,7 @@ public: } private: - void dumpPacket(senf::FileHandle /* ignored */, senf::Scheduler::EventId event) + void dumpPacket(senf::Scheduler::EventId event) { std::string data (sock.read()); senf::EthernetPacket packet ( diff --git a/Examples/Sniffer/Sniffer.cc b/Examples/Sniffer/Sniffer.cc index 8c384ff..7fdd19a 100644 --- a/Examples/Sniffer/Sniffer.cc +++ b/Examples/Sniffer/Sniffer.cc @@ -82,7 +82,7 @@ public: } private: - void dumpPacket(senf::FileHandle /* ignored */, senf::Scheduler::EventId event) + void dumpPacket(senf::Scheduler::EventId event) { senf::EthernetPacket packet ( senf::EthernetPacket::create(senf::EthernetPacket::noinit)); diff --git a/Examples/TCPClientServer/server.cc b/Examples/TCPClientServer/server.cc index 0504948..4f3e12f 100644 --- a/Examples/TCPClientServer/server.cc +++ b/Examples/TCPClientServer/server.cc @@ -50,12 +50,12 @@ public: } private: - void accept(senf::FileHandle /* ignored */, senf::Scheduler::EventId event) + void accept(senf::Scheduler::EventId event) { senf::TCPv4ClientSocketHandle clientSock (serverSock.accept()); senf::Scheduler::instance().add( clientSock, - senf::membind(&Server::readFromClient, this), + boost::bind(&Server::readFromClient, this, clientSock, _1), senf::Scheduler::EV_READ); } diff --git a/PPI/IOEvent.cc b/PPI/IOEvent.cc index b63ca24..e32b884 100644 --- a/PPI/IOEvent.cc +++ b/PPI/IOEvent.cc @@ -41,7 +41,7 @@ prefix_ void senf::ppi::IOEvent::v_enable() { - Scheduler::instance().add(fd_, boost::bind(&IOEvent::cb,this,_1,_2), + Scheduler::instance().add(fd_, boost::bind(&IOEvent::cb,this,_1), Scheduler::EventId(events_)); } @@ -50,7 +50,7 @@ prefix_ void senf::ppi::IOEvent::v_disable() Scheduler::instance().remove(fd_, Scheduler::EventId(events_)); } -prefix_ void senf::ppi::IOEvent::cb(int, Scheduler::EventId event) +prefix_ void senf::ppi::IOEvent::cb(Scheduler::EventId event) { if ((event & ~events_) != 0) { if (event & Err) diff --git a/PPI/IOEvent.hh b/PPI/IOEvent.hh index f4b2e77..d8a2b04 100644 --- a/PPI/IOEvent.hh +++ b/PPI/IOEvent.hh @@ -109,7 +109,7 @@ namespace ppi { virtual void v_enable(); virtual void v_disable(); - void cb(int, Scheduler::EventId event); + void cb(Scheduler::EventId event); int fd_; unsigned events_; diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index adbb6a6..09628f1 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -36,41 +36,7 @@ // Here a basic concept of how to add signal support to the scheduler: // -// Every signal to be reported by the scheduler will be asigned a -// generic signal handler by the scheduler. This signal handler will -// use longjmp (juck) to report this signal back to the scheduler -// main-loop. -// -// To make this safe, the main-loop will look something like: -// -// int signal = setjmp(jmpBuffer_); -// if (signal == 0) { -// // unblock all signals which are registered with the -// // scheduler -// // call epoll -// // block all relevant signals again -// } -// -// // now handle the event -// -// The signal handler is then simply defined as -// -// static void Scheduler::sigHandler(int signal) -// { -// // make sure to restore the signal handler here if -// // necessary -// longjmp(Scheduler::instance().jmpBuffer_,signal); -// } -// -// You should use sigaction to register the signal handlers and define -// a sa_mask so all Scheduler-registered signals are automatically -// *blocked* whenever one of the signals is called (including the -// called signal!) (This also means, we will have to re-register all -// signals if we change the registration of some signal since the -// sa_mask changes). This ensures, that no two signals can be -// delivered on top of each other. And of course any signal registered -// with the scheduler must be blocked as soon as it is registered with -// the scheduler. +// ... no, I had overlooked one race condition. So back to the signal-pipe approach ... #include "Scheduler.hh" //#include "Scheduler.ih" @@ -78,6 +44,8 @@ // Custom includes #include #include +#include +#include #include "../Utils/Exception.hh" static const int EPollInitialSize = 16; @@ -87,13 +55,54 @@ static const int EPollInitialSize = 16; prefix_ senf::Scheduler::Scheduler() : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), - eventTime_(0) + eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0) { if (epollFd_<0) throw SystemException(errno); + + if (::pipe(sigpipe_) < 0) + throw SystemException(errno); + + int flags (::fcntl(sigpipe_[1],F_GETFL)); + if (flags < 0) + throw SystemException(errno); + flags |= O_NONBLOCK; + if (::fcntl(sigpipe_[1], F_SETFL, flags) < 0) + throw SystemException(errno); + + ::epoll_event ev; + ::memset(&ev, 0, sizeof(ev)); + ev.events = EV_READ; + ev.data.fd = sigpipe_[0]; + if (::epoll_ctl(epollFd_, EPOLL_CTL_ADD, sigpipe_[0], &ev) < 0) + throw SystemException(errno); +} + +prefix_ void senf::Scheduler::registerSignal(unsigned signal, SimpleCallback const & cb) +{ + ::sigset_t sig; + ::sigemptyset(&sig); + if (::sigaddset(&sig, signal) < 0) + throw InvalidSignalNumberException(); + ::sigprocmask(SIG_BLOCK, &sig, 0); + ::sigaddset(&sigset_, signal); + if (sigHandlers_.size() <= signal) + sigHandlers_.resize(signal+1); + sigHandlers_[signal] = cb; + + registerSigHandlers(); } -prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int eventMask) +prefix_ void senf::Scheduler::unregisterSignal(unsigned signal) +{ + if (::sigdelset(&sigset_, signal) < 0) + throw InvalidSignalNumberException(); + sigHandlers_[signal] = 0; + ::signal(signal, SIG_DFL); + registerSigHandlers(); +} + +prefix_ void senf::Scheduler::do_add(int fd, FdCallback const & cb, int eventMask) { FdTable::iterator i (fdTable_.find(fd)); int action (EPOLL_CTL_MOD); @@ -140,6 +149,33 @@ prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) throw SystemException(errno); } +prefix_ void senf::Scheduler::registerSigHandlers() +{ + for (unsigned signal; signal < sigHandlers_.size(); ++signal) + if (sigHandlers_[signal]) { + struct ::sigaction sa; + sa.sa_sigaction = & Scheduler::sigHandler; + sa.sa_mask = sigset_; + sa.sa_flags = SA_SIGINFO; + if (signal == SIGCHLD) + sa.sa_flags |= SA_NOCLDSTOP; + if (::sigaction(signal, &sa, 0) < 0) + throw SystemException(errno); + } +} + +prefix_ void senf::Scheduler::sigHandler(int signal, ::siginfo_t * siginfo, void *) +{ + // This is a bit unsafe. Better write single bytes and place the siginfo into an explicit + // queue. Since signals are only unblocked during epoll_wait, we even wouldn't need to + // synchronize access to that queue any further. + + ::write(instance().sigpipe_[1], siginfo, sizeof(siginfo)); + + // We ignore errors. The file handle is set to non-blocking IO. If any failure occurs (pipe + // full), the signal will be dropped. That's like kernel signal handling which may also drop + // signals. +} prefix_ int senf::Scheduler::EventSpec::epollMask() const @@ -175,29 +211,32 @@ prefix_ void senf::Scheduler::process() } else { ClockService::clock_type delta ( - (timerQueue_.top()->second.timeout - eventTime_)/1000000UL); + (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL); timeout = delta < 0 ? 0 : delta; } ///\todo Handle more than one epoll_event per call struct epoll_event ev; - int events = epoll_wait(epollFd_, &ev, 1, timeout); + + ::sigprocmask(SIG_UNBLOCK, &sigset_, 0); + int events (epoll_wait(epollFd_, &ev, 1, timeout)); + ::sigprocmask(SIG_BLOCK, &sigset_, 0); + if (events<0) if (errno != EINTR) throw SystemException(errno); eventTime_ = ClockService::now(); - // We always run event handlers. This is important, even if a file-descriptor is signaled + // We always run timeout handlers. This is important, even if a file-descriptor is signaled // since some descriptors (e.g. real files) will *always* be ready and we still may want to - // handle timers. - // Time handlers are run before file events to not delay them unnecessarily. + // handle timers. Time handlers are run before file events to not delay them unnecessarily. while (! timerQueue_.empty()) { TimerMap::iterator i (timerQueue_.top()); if (i->second.canceled) ; - else if (i->second.timeout <= eventTime_) + else if (i->second.timeout <= eventTime_ + eventEarly_) i->second.cb(); else break; @@ -208,6 +247,18 @@ prefix_ void senf::Scheduler::process() if (events <= 0) continue; + // Check the signal queue + if (ev.data.fd == sigpipe_[0]) { + ::siginfo_t siginfo; + if (::read(sigpipe_[0], &siginfo, sizeof(siginfo)) < int(sizeof(siginfo))) + // We ignore truncated records which may only occur if the signal + // queue became filled up + continue; + if (siginfo.si_signo < int(sigHandlers_.size()) && sigHandlers_[siginfo.si_signo]) + sigHandlers_[siginfo.si_signo](); + continue; + } + FdTable::iterator i = fdTable_.find(ev.data.fd); BOOST_ASSERT (i != fdTable_.end() ); EventSpec spec (i->second); diff --git a/Scheduler/Scheduler.cci b/Scheduler/Scheduler.cci index f88a979..286d8ec 100644 --- a/Scheduler/Scheduler.cci +++ b/Scheduler/Scheduler.cci @@ -38,7 +38,7 @@ prefix_ senf::Scheduler::Scheduler & senf::Scheduler::instance() } prefix_ unsigned senf::Scheduler::timeout(ClockService::clock_type timeout, - TimerCallback const & cb) + SimpleCallback const & cb) { ++ timerIdCounter_; TimerMap::iterator i ( diff --git a/Scheduler/Scheduler.cti b/Scheduler/Scheduler.cti index e3cb21f..628f68d 100644 --- a/Scheduler/Scheduler.cti +++ b/Scheduler/Scheduler.cti @@ -33,14 +33,10 @@ ///////////////////////////////cti.p/////////////////////////////////////// template -prefix_ void senf::Scheduler::add(Handle const & handle, - typename GenericCallback::Callback const & cb, - int eventMask) +prefix_ void senf::Scheduler::add(Handle const & handle, FdCallback const & cb, int eventMask) { // retrieve_filehandle is found via ADL - SimpleCallback scb (boost::bind(cb,handle,_1)); - int fd = retrieve_filehandle(handle); - do_add(fd,scb,eventMask); + do_add(retrieve_filehandle(handle),cb,eventMask); } template diff --git a/Scheduler/Scheduler.hh b/Scheduler/Scheduler.hh index babc199..7a958e3 100644 --- a/Scheduler/Scheduler.hh +++ b/Scheduler/Scheduler.hh @@ -28,6 +28,8 @@ #define HH_Scheduler_ 1 // Custom includes +#include +#include #include #include #include @@ -45,26 +47,92 @@ namespace senf { /** \brief Singleton class to manage the event loop - This class manages a single select() type event loop. A customer of this class may register - any number of file descriptors with this class and pass callback functions to be called on - input, output or error. This functions are specified using boost::function objects (See Boost.Function) + The Scheduler singleton manages the central event loop. It manages and dispatches all types + of events managed by the scheduler library: + \li File descriptor notifications + \li Timeouts + \li UNIX Signals - The Scheduler is based on a generic handle representation. The only information needed from - a handle, is the intrinsic file descriptor. Any object for which the statement + The scheduler is entered by calling it's process() member. This call will continue to run as + long as there is something to do, or until one of the handlers calls terminate(). The + Scheduler has 'something to do' as long as there is any file descriptor or timeout active. + + The Scheduler only provides low level primitive scheduling capability. Additional helpers + are defined on top of this functionality (e.g. ReadHelper or WriteHelper or the interval + timers of the PPI). + + \section sched_handlers Handlers + + All handlers are managed as generic Boost.Function objects. This allows + to pass any callable as a handler. Depending on the type of handler, some additional + arguments may be passed to the handler by the scheduler. If you want to pass additional + arguments to the handler, use Boost.Bind)). + + + \section sched_fd File descriptors + + File descriptors are managed using add() or remove() \code - int fd = retrieve_filehandle(object); + Scheduler::instance().add(handle, &callback); + Scheduler::instance().remove(handle); \endcode - is valid and places the relevant file descriptor into fd can be used as a Handle type. There - is an implementation of retrieve_filehandle(int) within the library to handle explicit file - descriptors. The Socket library provides an - implementation of retrieve_filehandle(FileHandle handle). If you want to support - some other handle type, just define an appropriate \c retrieve_filehandle function in - that types namespace. + The callback will be called with one additional argument. This argument is the event mask of + type EventId. This mask will tell, which of the registered events are + signaled. Additionally, EV_HUP or EV_ERR on hangup or error condition on the handle. + + There is always only one handler registered for a file descriptor (registering multiple + callbacks for a single fd does not make sense). + + The scheduler will accept an almost arbitrary object as it's first argument. It will use + \code + int fd = retrieve_filehandle(handle); + \endcode + To fetch the file handle given some abstract handle type. The definition of + retrieve_filehandle() will be found using ADL. + - It is important to note, that for every combination of file descriptor and event, only a \e - single handler may be installed. Installing more handlers does not make sense. If you need - to distribute data to several interested parties, you must take care of this yourself. + \section sched_timers Timers + + The Scheduler has very simple timer support. There is only one type of timer: A single-shot + deadline timer. More complex timers are built based on this. Timers are managed using + timeout() and cancelTimeout() + \code + int id = Scheduler::instance().timeout(Scheduler::instance().eventTime() + ClockService::milliseconds(100), + &callback); + Scheduler::instance().cancelTimeout(id); + \endcode + Timing is based on the ClockService, which provides a high resolution and strictly + monotonous time source. Registering a timeout will fire the callback when the target time is + reached. The timer may be canceled by passing the returned \a id to cancelTimeout(). + + There are two parameters which adjust the exact: \a timeoutEarly and \a timeoutAdjust. \a + timeoutEarly is the time, a callback may be called before the deadline time is + reached. Setting this value below the scheduling granularity of the kernel will have the + scheduler go into a busy wait (that is, an endless loop consuming 100% of CPU + recources) until the deadline time is reached! This is seldom desired. The default setting + of 11ms is adequate in most cases (it's slightly above the lowest linux scheduling + granularity). + + The other timeout scheduling parameter is \a timeoutAdjust. This value will be added to the + timeout value before calculating the next delay value thereby compensating for \a + timeoutEarly. By default, this value is set to 0 but may be changed if needed. + + + \section sched_signals POSIX/UNIX signals + + The Scheduler also incorporates standard POSIX/UNIX signals. Signals registered with the + scheduler will be handled \e synchronously within the event loop. + \code + Scheduler::instance().registerSignal(SIGUSR1, &callback); + Scheduler::instance().unregisterSignal(SIGUSR1); + \endcode + When registering a signal with the scheduler, that signal will automatically be blocked so + it can be handled within the scheduler. + + A registered signal does \e not count as 'something to do'. It is therefore not possible to + wait for signals \e only. \todo Fix EventId parameter (probably to int) to allow |-ing without casting ... */ @@ -87,15 +155,17 @@ namespace senf { sole member is a typedef symbol defining the callback type given the handle type. The Callback is any callable object taking a \c Handle and an \c EventId as argument. - */ template struct GenericCallback { typedef boost::function::param_type, EventId) > Callback; }; + */ + + typedef boost::function FdCallback; /** \brief Callback type for timer events */ - typedef boost::function TimerCallback; + typedef boost::function SimpleCallback; /////////////////////////////////////////////////////////////////////////// ///\name Structors and default members @@ -123,9 +193,11 @@ namespace senf { ///@} /////////////////////////////////////////////////////////////////////////// + ///\name File Descriptors + ///\{ + template - void add(Handle const & handle, - typename GenericCallback::Callback const & cb, + void add(Handle const & handle, FdCallback const & cb, int eventMask = EV_ALL); ///< Add file handle event callback /**< add() will add a callback to the Scheduler. The callback will be called for the given type of event on @@ -148,19 +220,56 @@ namespace senf { \param[in] eventMask arbitrary combination via '|' operator of EventId designators. */ - unsigned timeout(ClockService::clock_type timeout, TimerCallback const & cb); + ///\} + + ///\name Timeouts + ///\{ + + unsigned timeout(ClockService::clock_type timeout, SimpleCallback const & cb); ///< Add timeout event /**< \param[in] timeout timeout in nanoseconds \param[in] cb callback to call after \a timeout milliseconds */ - void cancelTimeout(unsigned id); + void cancelTimeout(unsigned id); ///< Cancel timeout \a id + + ClockService::clock_type timeoutEarly() const; + ///< Fetch the \a timeoutEarly parameter + void timeoutEarly(ClockService::clock_type v); + ///< Set the \a timeoutEarly parameter + + ClockService::clock_type timeoutAdjust() const;\ + ///< Fetch the \a timeoutAdjust parameter + void timeoutAdjust(ClockService::clock_type v); + ///< Set the \a timeoutAdjust parameter + + ///\} + + ///\name Signal handlers + ///\{ + + void registerSignal(unsigned signal, SimpleCallback const & cb); + ///< Add signal handler + /**< \param[in] signal signal number to register handler for + \param[in] cb callback to call whenever \a signal is + delivered. */ + + void unregisterSignal(unsigned signal); + ///< Remove signal handler for \a signal + + struct InvalidSignalNumberException : public std::exception + { virtual char const * what() const throw() + { return "senf::Scheduler::InvalidSignalNumberException"; } }; + + + ///\} void process(); ///< Event handler main loop /**< This member must be called at some time to enter the event handler main loop. Only while this function is running any events are handled. The call will return only, if any callback calls terminate(). */ + void terminate(); ///< Called by callbacks to terminate the main loop /**< This member may be called by any callback to tell the main loop to terminate. The main loop will return to @@ -172,20 +281,23 @@ namespace senf { protected: private: - typedef boost::function SimpleCallback; - Scheduler(); - void do_add(int fd, SimpleCallback const & cb, int eventMask = EV_ALL); + void do_add(int fd, FdCallback const & cb, int eventMask = EV_ALL); void do_remove(int fd, int eventMask = EV_ALL); + void registerSigHandlers(); + static void sigHandler(int signal, ::siginfo_t * siginfo, void *); + +# ifndef DOXYGEN + /** \brief Descriptor event specification \internal */ struct EventSpec { - SimpleCallback cb_read; - SimpleCallback cb_prio; - SimpleCallback cb_write; + FdCallback cb_read; + FdCallback cb_prio; + FdCallback cb_write; int epollMask() const; }; @@ -195,21 +307,25 @@ namespace senf { struct TimerSpec { TimerSpec() : timeout(), cb() {} - TimerSpec(ClockService::clock_type timeout_, TimerCallback cb_, unsigned id_) + TimerSpec(ClockService::clock_type timeout_, SimpleCallback cb_, unsigned id_) : timeout(timeout_), cb(cb_), id(id_), canceled(false) {} bool operator< (TimerSpec const & other) const { return timeout > other.timeout; } ClockService::clock_type timeout; - TimerCallback cb; + SimpleCallback cb; unsigned id; bool canceled; }; +# endif + typedef std::map FdTable; typedef std::map TimerMap; // sorted by id +# ifndef DOXYGEN + struct TimerSpecCompare { typedef TimerMap::iterator first_argument_type; @@ -219,16 +335,28 @@ namespace senf { result_type operator()(first_argument_type a, second_argument_type b); }; +# endif + typedef std::priority_queue, TimerSpecCompare> TimerQueue; // sorted by time + typedef std::vector SigHandlers; + FdTable fdTable_; + unsigned timerIdCounter_; TimerQueue timerQueue_; TimerMap timerMap_; + + SigHandlers sigHandlers_; + ::sigset_t sigset_; + int sigpipe_[2]; + int epollFd_; bool terminate_; ClockService::clock_type eventTime_; + ClockService::clock_type eventEarly_; + ClockService::clock_type eventAdjust_; }; /** \brief Default file descriptor accessor diff --git a/Scheduler/Scheduler.test.cc b/Scheduler/Scheduler.test.cc index f3f498e..a9bd18e 100644 --- a/Scheduler/Scheduler.test.cc +++ b/Scheduler/Scheduler.test.cc @@ -186,9 +186,24 @@ namespace { bool is_close(ClockService::clock_type a, ClockService::clock_type b) { - return (a