}
private:
- void dumpSection(senf::FileHandle /* ignored */, senf::Scheduler::EventId event)
+ void dumpSection(senf::Scheduler::EventId event)
{
std::string data (handle.read());
senf::DatagramSection section (senf::DatagramSection::create(data));
// c-file-style: "senf"
// indent-tabs-mode: nil
// ispell-local-dictionary: "american"
-// compile-command: "scons -u test"
+// compile-command: "scons -u"
// comment-column: 40
// End:
this->priv_sndu_type_1 = false;
}
-void ULEdec::handleEvent(senf::FileHandle, senf::Scheduler::EventId event)
+void ULEdec::handleEvent(senf::Scheduler::EventId event)
{
senf::TransportPacket ts_packet (
senf::TransportPacket::create(188, senf::TransportPacket::noinit));
bool priv_sndu_type_1;
iterator snduPacketData_iter;
- void handleEvent(senf::FileHandle, senf::Scheduler::EventId event);
+ void handleEvent(senf::Scheduler::EventId event);
void handleTSPacket(senf::TransportPacket tsPacket);
void handleSNDUPacket();
// c-file-style: "senf"
// indent-tabs-mode: nil
// ispell-local-dictionary: "american"
-// compile-command: "scons -u test"
+// compile-command: "scons -u"
// comment-column: 40
// End:
}
private:
- void dumpPacket(senf::FileHandle /* ignored */, senf::Scheduler::EventId event)
+ void dumpPacket(senf::Scheduler::EventId event)
{
std::string data (sock.read());
senf::EthernetPacket packet (
}
private:
- void dumpPacket(senf::FileHandle /* ignored */, senf::Scheduler::EventId event)
+ void dumpPacket(senf::Scheduler::EventId event)
{
senf::EthernetPacket packet (
senf::EthernetPacket::create(senf::EthernetPacket::noinit));
}
private:
- void accept(senf::FileHandle /* ignored */, senf::Scheduler::EventId event)
+ void accept(senf::Scheduler::EventId event)
{
senf::TCPv4ClientSocketHandle clientSock (serverSock.accept());
senf::Scheduler::instance().add(
clientSock,
- senf::membind(&Server::readFromClient, this),
+ boost::bind(&Server::readFromClient, this, clientSock, _1),
senf::Scheduler::EV_READ);
}
prefix_ void senf::ppi::IOEvent::v_enable()
{
- Scheduler::instance().add(fd_, boost::bind(&IOEvent::cb,this,_1,_2),
+ Scheduler::instance().add(fd_, boost::bind(&IOEvent::cb,this,_1),
Scheduler::EventId(events_));
}
Scheduler::instance().remove(fd_, Scheduler::EventId(events_));
}
-prefix_ void senf::ppi::IOEvent::cb(int, Scheduler::EventId event)
+prefix_ void senf::ppi::IOEvent::cb(Scheduler::EventId event)
{
if ((event & ~events_) != 0) {
if (event & Err)
virtual void v_enable();
virtual void v_disable();
- void cb(int, Scheduler::EventId event);
+ void cb(Scheduler::EventId event);
int fd_;
unsigned events_;
// Here a basic concept of how to add signal support to the scheduler:
//
-// Every signal to be reported by the scheduler will be asigned a
-// generic signal handler by the scheduler. This signal handler will
-// use longjmp (juck) to report this signal back to the scheduler
-// main-loop.
-//
-// To make this safe, the main-loop will look something like:
-//
-// int signal = setjmp(jmpBuffer_);
-// if (signal == 0) {
-// // unblock all signals which are registered with the
-// // scheduler
-// // call epoll
-// // block all relevant signals again
-// }
-//
-// // now handle the event
-//
-// The signal handler is then simply defined as
-//
-// static void Scheduler::sigHandler(int signal)
-// {
-// // make sure to restore the signal handler here if
-// // necessary
-// longjmp(Scheduler::instance().jmpBuffer_,signal);
-// }
-//
-// You should use sigaction to register the signal handlers and define
-// a sa_mask so all Scheduler-registered signals are automatically
-// *blocked* whenever one of the signals is called (including the
-// called signal!) (This also means, we will have to re-register all
-// signals if we change the registration of some signal since the
-// sa_mask changes). This ensures, that no two signals can be
-// delivered on top of each other. And of course any signal registered
-// with the scheduler must be blocked as soon as it is registered with
-// the scheduler.
+// ... no, I had overlooked one race condition. So back to the signal-pipe approach ...
#include "Scheduler.hh"
//#include "Scheduler.ih"
// Custom includes
#include <errno.h>
#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
#include "../Utils/Exception.hh"
static const int EPollInitialSize = 16;
prefix_ senf::Scheduler::Scheduler()
: timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false),
- eventTime_(0)
+ eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0)
{
if (epollFd_<0)
throw SystemException(errno);
+
+ if (::pipe(sigpipe_) < 0)
+ throw SystemException(errno);
+
+ int flags (::fcntl(sigpipe_[1],F_GETFL));
+ if (flags < 0)
+ throw SystemException(errno);
+ flags |= O_NONBLOCK;
+ if (::fcntl(sigpipe_[1], F_SETFL, flags) < 0)
+ throw SystemException(errno);
+
+ ::epoll_event ev;
+ ::memset(&ev, 0, sizeof(ev));
+ ev.events = EV_READ;
+ ev.data.fd = sigpipe_[0];
+ if (::epoll_ctl(epollFd_, EPOLL_CTL_ADD, sigpipe_[0], &ev) < 0)
+ throw SystemException(errno);
+}
+
+prefix_ void senf::Scheduler::registerSignal(unsigned signal, SimpleCallback const & cb)
+{
+ ::sigset_t sig;
+ ::sigemptyset(&sig);
+ if (::sigaddset(&sig, signal) < 0)
+ throw InvalidSignalNumberException();
+ ::sigprocmask(SIG_BLOCK, &sig, 0);
+ ::sigaddset(&sigset_, signal);
+ if (sigHandlers_.size() <= signal)
+ sigHandlers_.resize(signal+1);
+ sigHandlers_[signal] = cb;
+
+ registerSigHandlers();
}
-prefix_ void senf::Scheduler::do_add(int fd, SimpleCallback const & cb, int eventMask)
+prefix_ void senf::Scheduler::unregisterSignal(unsigned signal)
+{
+ if (::sigdelset(&sigset_, signal) < 0)
+ throw InvalidSignalNumberException();
+ sigHandlers_[signal] = 0;
+ ::signal(signal, SIG_DFL);
+ registerSigHandlers();
+}
+
+prefix_ void senf::Scheduler::do_add(int fd, FdCallback const & cb, int eventMask)
{
FdTable::iterator i (fdTable_.find(fd));
int action (EPOLL_CTL_MOD);
throw SystemException(errno);
}
+prefix_ void senf::Scheduler::registerSigHandlers()
+{
+ for (unsigned signal; signal < sigHandlers_.size(); ++signal)
+ if (sigHandlers_[signal]) {
+ struct ::sigaction sa;
+ sa.sa_sigaction = & Scheduler::sigHandler;
+ sa.sa_mask = sigset_;
+ sa.sa_flags = SA_SIGINFO;
+ if (signal == SIGCHLD)
+ sa.sa_flags |= SA_NOCLDSTOP;
+ if (::sigaction(signal, &sa, 0) < 0)
+ throw SystemException(errno);
+ }
+}
+
+prefix_ void senf::Scheduler::sigHandler(int signal, ::siginfo_t * siginfo, void *)
+{
+ // This is a bit unsafe. Better write single bytes and place the siginfo into an explicit
+ // queue. Since signals are only unblocked during epoll_wait, we even wouldn't need to
+ // synchronize access to that queue any further.
+
+ ::write(instance().sigpipe_[1], siginfo, sizeof(siginfo));
+
+ // We ignore errors. The file handle is set to non-blocking IO. If any failure occurs (pipe
+ // full), the signal will be dropped. That's like kernel signal handling which may also drop
+ // signals.
+}
prefix_ int senf::Scheduler::EventSpec::epollMask()
const
}
else {
ClockService::clock_type delta (
- (timerQueue_.top()->second.timeout - eventTime_)/1000000UL);
+ (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL);
timeout = delta < 0 ? 0 : delta;
}
///\todo Handle more than one epoll_event per call
struct epoll_event ev;
- int events = epoll_wait(epollFd_, &ev, 1, timeout);
+
+ ::sigprocmask(SIG_UNBLOCK, &sigset_, 0);
+ int events (epoll_wait(epollFd_, &ev, 1, timeout));
+ ::sigprocmask(SIG_BLOCK, &sigset_, 0);
+
if (events<0)
if (errno != EINTR)
throw SystemException(errno);
eventTime_ = ClockService::now();
- // We always run event handlers. This is important, even if a file-descriptor is signaled
+ // We always run timeout handlers. This is important, even if a file-descriptor is signaled
// since some descriptors (e.g. real files) will *always* be ready and we still may want to
- // handle timers.
- // Time handlers are run before file events to not delay them unnecessarily.
+ // handle timers. Time handlers are run before file events to not delay them unnecessarily.
while (! timerQueue_.empty()) {
TimerMap::iterator i (timerQueue_.top());
if (i->second.canceled)
;
- else if (i->second.timeout <= eventTime_)
+ else if (i->second.timeout <= eventTime_ + eventEarly_)
i->second.cb();
else
break;
if (events <= 0)
continue;
+ // Check the signal queue
+ if (ev.data.fd == sigpipe_[0]) {
+ ::siginfo_t siginfo;
+ if (::read(sigpipe_[0], &siginfo, sizeof(siginfo)) < int(sizeof(siginfo)))
+ // We ignore truncated records which may only occur if the signal
+ // queue became filled up
+ continue;
+ if (siginfo.si_signo < int(sigHandlers_.size()) && sigHandlers_[siginfo.si_signo])
+ sigHandlers_[siginfo.si_signo]();
+ continue;
+ }
+
FdTable::iterator i = fdTable_.find(ev.data.fd);
BOOST_ASSERT (i != fdTable_.end() );
EventSpec spec (i->second);
}
prefix_ unsigned senf::Scheduler::timeout(ClockService::clock_type timeout,
- TimerCallback const & cb)
+ SimpleCallback const & cb)
{
++ timerIdCounter_;
TimerMap::iterator i (
///////////////////////////////cti.p///////////////////////////////////////
template <class Handle>
-prefix_ void senf::Scheduler::add(Handle const & handle,
- typename GenericCallback<Handle>::Callback const & cb,
- int eventMask)
+prefix_ void senf::Scheduler::add(Handle const & handle, FdCallback const & cb, int eventMask)
{
// retrieve_filehandle is found via ADL
- SimpleCallback scb (boost::bind(cb,handle,_1));
- int fd = retrieve_filehandle(handle);
- do_add(fd,scb,eventMask);
+ do_add(retrieve_filehandle(handle),cb,eventMask);
}
template <class Handle>
#define HH_Scheduler_ 1
// Custom includes
+#include <signal.h>
+#include <setjmp.h>
#include <map>
#include <queue>
#include <boost/function.hpp>
/** \brief Singleton class to manage the event loop
- This class manages a single select() type event loop. A customer of this class may register
- any number of file descriptors with this class and pass callback functions to be called on
- input, output or error. This functions are specified using boost::function objects (See <a
- href="http://www.boost.org/doc/html/function.html">Boost.Function</a>)
+ The Scheduler singleton manages the central event loop. It manages and dispatches all types
+ of events managed by the scheduler library:
+ \li File descriptor notifications
+ \li Timeouts
+ \li UNIX Signals
- The Scheduler is based on a generic handle representation. The only information needed from
- a handle, is the intrinsic file descriptor. Any object for which the statement
+ The scheduler is entered by calling it's process() member. This call will continue to run as
+ long as there is something to do, or until one of the handlers calls terminate(). The
+ Scheduler has 'something to do' as long as there is any file descriptor or timeout active.
+
+ The Scheduler only provides low level primitive scheduling capability. Additional helpers
+ are defined on top of this functionality (e.g. ReadHelper or WriteHelper or the interval
+ timers of the PPI).
+
+ \section sched_handlers Handlers
+
+ All handlers are managed as generic <a
+ href="http://www.boost.org/doc/html/function.html">Boost.Function</a> objects. This allows
+ to pass any callable as a handler. Depending on the type of handler, some additional
+ arguments may be passed to the handler by the scheduler. If you want to pass additional
+ arguments to the handler, use <a
+ href="http://www.boost.org/libs/bind/bind.html">Boost.Bind</a>)).
+
+
+ \section sched_fd File descriptors
+
+ File descriptors are managed using add() or remove()
\code
- int fd = retrieve_filehandle(object);
+ Scheduler::instance().add(handle, &callback);
+ Scheduler::instance().remove(handle);
\endcode
- is valid and places the relevant file descriptor into fd can be used as a Handle type. There
- is an implementation of retrieve_filehandle(int) within the library to handle explicit file
- descriptors. The <a href="../../../Socket/doc/html/index.html">Socket library</a> provides an
- implementation of <tt>retrieve_filehandle(FileHandle handle)</tt>. If you want to support
- some other handle type, just define an appropriate \c retrieve_filehandle function <em>in
- that types namespace</em>.
+ The callback will be called with one additional argument. This argument is the event mask of
+ type EventId. This mask will tell, which of the registered events are
+ signaled. Additionally, EV_HUP or EV_ERR on hangup or error condition on the handle.
+
+ There is always only one handler registered for a file descriptor (registering multiple
+ callbacks for a single fd does not make sense).
+
+ The scheduler will accept an almost arbitrary object as it's first argument. It will use
+ \code
+ int fd = retrieve_filehandle(handle);
+ \endcode
+ To fetch the file handle given some abstract handle type. The definition of
+ retrieve_filehandle() will be found using ADL.
+
- It is important to note, that for every combination of file descriptor and event, only a \e
- single handler may be installed. Installing more handlers does not make sense. If you need
- to distribute data to several interested parties, you must take care of this yourself.
+ \section sched_timers Timers
+
+ The Scheduler has very simple timer support. There is only one type of timer: A single-shot
+ deadline timer. More complex timers are built based on this. Timers are managed using
+ timeout() and cancelTimeout()
+ \code
+ int id = Scheduler::instance().timeout(Scheduler::instance().eventTime() + ClockService::milliseconds(100),
+ &callback);
+ Scheduler::instance().cancelTimeout(id);
+ \endcode
+ Timing is based on the ClockService, which provides a high resolution and strictly
+ monotonous time source. Registering a timeout will fire the callback when the target time is
+ reached. The timer may be canceled by passing the returned \a id to cancelTimeout().
+
+ There are two parameters which adjust the exact: \a timeoutEarly and \a timeoutAdjust. \a
+ timeoutEarly is the time, a callback may be called before the deadline time is
+ reached. Setting this value below the scheduling granularity of the kernel will have the
+ scheduler go into a <em>busy wait</em> (that is, an endless loop consuming 100% of CPU
+ recources) until the deadline time is reached! This is seldom desired. The default setting
+ of 11ms is adequate in most cases (it's slightly above the lowest linux scheduling
+ granularity).
+
+ The other timeout scheduling parameter is \a timeoutAdjust. This value will be added to the
+ timeout value before calculating the next delay value thereby compensating for \a
+ timeoutEarly. By default, this value is set to 0 but may be changed if needed.
+
+
+ \section sched_signals POSIX/UNIX signals
+
+ The Scheduler also incorporates standard POSIX/UNIX signals. Signals registered with the
+ scheduler will be handled \e synchronously within the event loop.
+ \code
+ Scheduler::instance().registerSignal(SIGUSR1, &callback);
+ Scheduler::instance().unregisterSignal(SIGUSR1);
+ \endcode
+ When registering a signal with the scheduler, that signal will automatically be blocked so
+ it can be handled within the scheduler.
+
+ A registered signal does \e not count as 'something to do'. It is therefore not possible to
+ wait for signals \e only.
\todo Fix EventId parameter (probably to int) to allow |-ing without casting ...
*/
sole member is a typedef symbol defining the callback type given the handle type.
The Callback is any callable object taking a \c Handle and an \c EventId as argument.
- */
template <class Handle>
struct GenericCallback {
typedef boost::function<void (typename boost::call_traits<Handle>::param_type,
EventId) > Callback;
};
+ */
+
+ typedef boost::function<void (EventId)> FdCallback;
/** \brief Callback type for timer events */
- typedef boost::function<void ()> TimerCallback;
+ typedef boost::function<void ()> SimpleCallback;
///////////////////////////////////////////////////////////////////////////
///\name Structors and default members
///@}
///////////////////////////////////////////////////////////////////////////
+ ///\name File Descriptors
+ ///\{
+
template <class Handle>
- void add(Handle const & handle,
- typename GenericCallback<Handle>::Callback const & cb,
+ void add(Handle const & handle, FdCallback const & cb,
int eventMask = EV_ALL); ///< Add file handle event callback
/**< add() will add a callback to the Scheduler. The
callback will be called for the given type of event on
\param[in] eventMask arbitrary combination via '|'
operator of EventId designators. */
- unsigned timeout(ClockService::clock_type timeout, TimerCallback const & cb);
+ ///\}
+
+ ///\name Timeouts
+ ///\{
+
+ unsigned timeout(ClockService::clock_type timeout, SimpleCallback const & cb);
///< Add timeout event
/**< \param[in] timeout timeout in nanoseconds
\param[in] cb callback to call after \a timeout
milliseconds */
- void cancelTimeout(unsigned id);
+ void cancelTimeout(unsigned id); ///< Cancel timeout \a id
+
+ ClockService::clock_type timeoutEarly() const;
+ ///< Fetch the \a timeoutEarly parameter
+ void timeoutEarly(ClockService::clock_type v);
+ ///< Set the \a timeoutEarly parameter
+
+ ClockService::clock_type timeoutAdjust() const;\
+ ///< Fetch the \a timeoutAdjust parameter
+ void timeoutAdjust(ClockService::clock_type v);
+ ///< Set the \a timeoutAdjust parameter
+
+ ///\}
+
+ ///\name Signal handlers
+ ///\{
+
+ void registerSignal(unsigned signal, SimpleCallback const & cb);
+ ///< Add signal handler
+ /**< \param[in] signal signal number to register handler for
+ \param[in] cb callback to call whenever \a signal is
+ delivered. */
+
+ void unregisterSignal(unsigned signal);
+ ///< Remove signal handler for \a signal
+
+ struct InvalidSignalNumberException : public std::exception
+ { virtual char const * what() const throw()
+ { return "senf::Scheduler::InvalidSignalNumberException"; } };
+
+
+ ///\}
void process(); ///< Event handler main loop
/**< This member must be called at some time to enter the
event handler main loop. Only while this function is
running any events are handled. The call will return
only, if any callback calls terminate(). */
+
void terminate(); ///< Called by callbacks to terminate the main loop
/**< This member may be called by any callback to tell the
main loop to terminate. The main loop will return to
protected:
private:
- typedef boost::function<void (EventId)> SimpleCallback;
-
Scheduler();
- void do_add(int fd, SimpleCallback const & cb, int eventMask = EV_ALL);
+ void do_add(int fd, FdCallback const & cb, int eventMask = EV_ALL);
void do_remove(int fd, int eventMask = EV_ALL);
+ void registerSigHandlers();
+ static void sigHandler(int signal, ::siginfo_t * siginfo, void *);
+
+# ifndef DOXYGEN
+
/** \brief Descriptor event specification
\internal */
struct EventSpec
{
- SimpleCallback cb_read;
- SimpleCallback cb_prio;
- SimpleCallback cb_write;
+ FdCallback cb_read;
+ FdCallback cb_prio;
+ FdCallback cb_write;
int epollMask() const;
};
struct TimerSpec
{
TimerSpec() : timeout(), cb() {}
- TimerSpec(ClockService::clock_type timeout_, TimerCallback cb_, unsigned id_)
+ TimerSpec(ClockService::clock_type timeout_, SimpleCallback cb_, unsigned id_)
: timeout(timeout_), cb(cb_), id(id_), canceled(false) {}
bool operator< (TimerSpec const & other) const
{ return timeout > other.timeout; }
ClockService::clock_type timeout;
- TimerCallback cb;
+ SimpleCallback cb;
unsigned id;
bool canceled;
};
+# endif
+
typedef std::map<int,EventSpec> FdTable;
typedef std::map<unsigned,TimerSpec> TimerMap; // sorted by id
+# ifndef DOXYGEN
+
struct TimerSpecCompare
{
typedef TimerMap::iterator first_argument_type;
result_type operator()(first_argument_type a, second_argument_type b);
};
+# endif
+
typedef std::priority_queue<TimerMap::iterator, std::vector<TimerMap::iterator>,
TimerSpecCompare> TimerQueue; // sorted by time
+ typedef std::vector<SimpleCallback> SigHandlers;
+
FdTable fdTable_;
+
unsigned timerIdCounter_;
TimerQueue timerQueue_;
TimerMap timerMap_;
+
+ SigHandlers sigHandlers_;
+ ::sigset_t sigset_;
+ int sigpipe_[2];
+
int epollFd_;
bool terminate_;
ClockService::clock_type eventTime_;
+ ClockService::clock_type eventEarly_;
+ ClockService::clock_type eventAdjust_;
};
/** \brief Default file descriptor accessor
bool is_close(ClockService::clock_type a, ClockService::clock_type b)
{
- return (a<b ? b-a : a-b) < 10100000UL; // a little bit over 10ms
+ return (a<b ? b-a : a-b) < ClockService::milliseconds(15);
}
+
+ ClockService::clock_type sigtime (0);
+ void sigusr()
+ {
+ sigtime = ClockService::now();
+ Scheduler::instance().terminate();
+ }
+
+ void delay(unsigned long milliseconds)
+ {
+ struct timespec ts;
+ ts.tv_sec = milliseconds / 1000;
+ ts.tv_nsec = (milliseconds % 1000) * 1000000;
+ while (nanosleep(&ts,&ts) < 0 && errno == EINTR) ;
+ }
}
BOOST_AUTO_UNIT_TEST(scheduler)
BOOST_CHECK_NO_THROW( Scheduler::instance() );
- BOOST_CHECK_NO_THROW( Scheduler::instance().add(sock,&callback,Scheduler::EV_READ) );
+ BOOST_CHECK_NO_THROW( Scheduler::instance().add(sock,boost::bind(&callback, sock, _1),
+ Scheduler::EV_READ) );
event = Scheduler::EV_NONE;
BOOST_CHECK_NO_THROW( Scheduler::instance().process() );
BOOST_CHECK_EQUAL( event, Scheduler::EV_READ );
BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+ClockService::milliseconds(200)) );
HandleWrapper handle(sock,"TheTag");
- BOOST_CHECK_NO_THROW( Scheduler::instance().add(handle,&handleCallback,Scheduler::EV_WRITE) );
+ BOOST_CHECK_NO_THROW( Scheduler::instance().add(handle,
+ boost::bind(&handleCallback,handle,_1),
+ Scheduler::EV_WRITE) );
strcpy(buffer,"WRITE");
size=5;
event = Scheduler::EV_NONE;
buffer[size]=0;
BOOST_CHECK_EQUAL( buffer, "OK" );
+ BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(
+ ClockService::now()+ClockService::milliseconds(200),&timeout) );
+ BOOST_CHECK_NO_THROW( Scheduler::instance().registerSignal(SIGUSR1, &sigusr) );
+ t = ClockService::now();
+ ::kill(::getpid(), SIGUSR1);
+ delay(100);
+ BOOST_CHECK_NO_THROW( Scheduler::instance().process() );
+ BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+ClockService::milliseconds(100)) );
+
///////////////////////////////////////////////////////////////////////////
close(sock);