Scheduler: Change ClockService implementation to utilize the POSIX CLOCK_MONOTONIC...
g0dil [Fri, 4 Jul 2008 12:32:20 +0000 (12:32 +0000)]
Scheduler: Fix all 'throw SystemException' statments to use SENF_THROW_SYSTEM_EXCEPTION
Scheduler: Implement TimerDispatcher

git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@888 270642c3-0616-0410-b53a-bc976706d245

16 files changed:
Scheduler/ClockService.cc
Scheduler/ClockService.cci
Scheduler/ClockService.hh
Scheduler/ClockService.test.cc
Scheduler/FIFORunner.cc
Scheduler/FdDispatcher.hh
Scheduler/Poller.ct
Scheduler/SignalDispatcher.cc
Scheduler/SignalDispatcher.cci
Scheduler/SignalDispatcher.hh
Scheduler/SignalDispatcher.test.cc
Scheduler/TimerDispatcher.cc [new file with mode: 0644]
Scheduler/TimerDispatcher.cci [new file with mode: 0644]
Scheduler/TimerDispatcher.hh [new file with mode: 0644]
Scheduler/TimerDispatcher.test.cc [new file with mode: 0644]
Utils/Logger/TimeSource.cc

index 97c5fbc..d9733d8 100644 (file)
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
-#define CheckError(op,args) if (op args < 0) SENF_THROW_SYSTEM_EXCEPTION(# op)
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ClockService::Impl
-
-struct senf::ClockService::Impl 
-{
-    Impl();
-
-    void block();
-    void unblock();
-
-    /// Internal: temporarily block signals (RAII idiom)
-    struct Blocker {
-        Blocker(Impl * i) : impl(i) { impl->block(); }
-        ~Blocker() { impl->unblock(); }
-        Impl * impl;
-    };
-
-    static void timer(int);
-
-    struct sigaction oldaction;
-    struct itimerval olditimer;
-    sigset_t alrm_set;
-};
-
-prefix_ senf::ClockService::Impl::Impl()
-{
-    CheckError( sigemptyset, (&alrm_set) );
-    CheckError( sigaddset, (&alrm_set, SIGALRM) );
-}
-
-prefix_ void senf::ClockService::Impl::block()
-{
-    CheckError( sigprocmask, (SIG_BLOCK, &alrm_set, 0) );
-}
-
-prefix_ void senf::ClockService::Impl::unblock()
-{
-    CheckError( sigprocmask, (SIG_UNBLOCK, &alrm_set, 0) );
-}
-
-prefix_ void senf::ClockService::Impl::timer(int)
-{
-    ClockService::instance().timer();
-}
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ClockService
-
-prefix_ senf::ClockService::~ClockService()
-{
-    setitimer(ITIMER_REAL, &impl_->olditimer, 0);
-    sigaction(SIGALRM, &impl_->oldaction, 0);
-}
-
-////////////////////////////////////////
-// private members
-
-prefix_ senf::ClockService::ClockService()
-    : impl_(new ClockService::Impl())
-{
-    restart_m(false);
-}
-
-prefix_ void senf::ClockService::timer()
-{
-    boost::posix_time::ptime time (boost::posix_time::microsec_clock::universal_time());
-    if (checkSkew(time))
-        clockSkew(time, heartbeat_ + boost::posix_time::seconds(
-                      ClockService::CheckInterval));
-    heartbeat_ = time;
-}
-
-prefix_ void senf::ClockService::restart_m(bool restart)
-{
-    if (restart)
-        // if any syscall fails, the alarm signal stays blocked which is correct
-        impl_->block(); 
-
-    base_ = boost::posix_time::microsec_clock::universal_time();
-    heartbeat_ = base_;
-
-    struct sigaction action;
-    action.sa_handler = & senf::ClockService::Impl::timer;
-    CheckError( sigemptyset, (&action.sa_mask) );
-    action.sa_flags = SA_RESTART;
-    CheckError( sigaction, (SIGALRM, &action, restart ? 0 : &impl_->oldaction) );
-
-    restartTimer(restart);
-    
-    impl_->unblock();
-}
-
-prefix_ void senf::ClockService::restartTimer(bool restart)
-{
-    struct itimerval itimer;
-    itimer.it_interval.tv_sec = CheckInterval;
-    itimer.it_interval.tv_usec = 0;
-    itimer.it_value.tv_sec = CheckInterval;
-    itimer.it_value.tv_usec = 0;
-    CheckError( setitimer, (ITIMER_REAL, &itimer, restart ? 0 : &impl_->olditimer) );
-}
-
-prefix_ void senf::ClockService::updateSkew(boost::posix_time::ptime time)
-{
-    Impl::Blocker alrmBlocker (impl_.get());
-    
-    // Make a second 'checkSkew' test, this time with SIGALRM blocked. See
-    // senf::ClockService::now_i()
-
-    if (checkSkew(time)) {
-        struct itimerval itimer;
-        CheckError( getitimer, (ITIMER_REAL, &itimer) );
-        clockSkew(time, (heartbeat_ 
-                         + boost::posix_time::seconds(CheckInterval) 
-                         - boost::posix_time::seconds(itimer.it_value.tv_sec)
-                         - boost::posix_time::microseconds(itimer.it_value.tv_usec)));
-        heartbeat_ = time;
-        restartTimer();
-    }
-}
-
 ///////////////////////////////cc.e////////////////////////////////////////
 #undef prefix_
 //#include "ClockService.mpp"
index 32c7748..c24b736 100644 (file)
@@ -24,7 +24,9 @@
     \brief ClockService inline non-template implementation */
 
 // Custom includes
+#include <time.h>
 #include <boost/date_time/posix_time/posix_time_types.hpp>
+#include "../Utils/Exception.hh"
 
 #define prefix_ inline
 ///////////////////////////////cci.p///////////////////////////////////////
 ///////////////////////////////////////////////////////////////////////////
 // senf::ClockService
 
-////////////////////////////////////////
-// private members
-
-prefix_ bool senf::ClockService::checkSkew(boost::posix_time::ptime time)
+prefix_ senf::ClockService::clock_type senf::ClockService::now()
 {
-    boost::posix_time::ptime h (heartbeat_); // reduce chance for race condition
-    return time < h || (time - h) > boost::posix_time::seconds(2*CheckInterval);
+    struct timespec spec;
+    if (clock_gettime(CLOCK_MONOTONIC, &spec) < 0)
+        SENF_THROW_SYSTEM_EXCEPTION("clock_gettime()");
+    return spec.tv_sec * 1000000000LL + spec.tv_nsec;
 }
 
-prefix_ void senf::ClockService::clockSkew(boost::posix_time::ptime time,
-                                           boost::posix_time::ptime expected)
-{
-    base_ += (time - expected);
-}
+////////////////////////////////////////
+// private members
 
 prefix_ senf::ClockService::clock_type senf::ClockService::clock_m(abstime_type time)
 {
-    ///\fixme What happens, if base_ is changed in SIGALRM while reading it here ?
-
-    // Idea: Have *two* base values: one is written by the SIGALRM handler, the other is only
-    // Written by synchronous code. If they differ, we block signals, copy over and continue.  If
-    // they transiently differ because we are reading the SIGALRM value while it is being changed
-    // this does not matter: We will then still copy it over.
-
-    boost::posix_time::time_duration delta (time - base_);
-    return clock_type( delta.ticks() )
+    boost::posix_time::time_duration delta (time - baseAbstime_);
+    return baseClock_ + clock_type( delta.ticks() )
         * clock_type( 1000000000UL / boost::posix_time::time_duration::ticks_per_second() );
 }
 
-prefix_ senf::ClockService::clock_type senf::ClockService::now_m()
-{
-    // We want to make the normal case (no skew) really fast. This first 'checkSkew' *might*
-    // transiently fail if a SIGALRM is delivered in the midst of the test. updateSkew will
-    // therefore block signals and do the check again to make sure.
-    //
-    // The opposite case (the test returns 'false' even though it should return 'true') is so highly
-    // improbable that it is treated as academic. (it will be catched by the next SIGALRM)
-
-    boost::posix_time::ptime time (boost::posix_time::microsec_clock::universal_time());
-    if (checkSkew(time)) 
-        updateSkew(time);
-    
-    // 'clock' will pick up the corrected base_ value if needed.
-    return clock_m(time);
-}
-
 prefix_ senf::ClockService::abstime_type senf::ClockService::abstime_m(clock_type clock)
 {
 #ifdef BOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
-    return base_ + boost::posix_time::nanoseconds(clock);
+    return baseAbstime_ + boost::posix_time::nanoseconds(clock-baseClock_);
 #else
-    return base_ + boost::posix_time::microseconds((clock+500)/1000);
+    return baseAbstime_ + boost::posix_time::microseconds((clock-baseClock_+500)/1000);
 #endif
 }
 
-// public members
+prefix_ senf::ClockService::ClockService()
+{
+    restart_m();
+}
 
-prefix_ senf::ClockService::clock_type senf::ClockService::now()
+prefix_ void senf::ClockService::restart_m()
 {
-    return instance().now_m();
+    baseAbstime_ = boost::posix_time::microsec_clock::universal_time();
+    baseClock_ = now();
 }
 
+// public members
+
 prefix_ senf::ClockService::abstime_type senf::ClockService::abstime(clock_type clock)
 {
     return instance().abstime_m(clock);
index 3be5a0a..da50573 100644 (file)
@@ -114,15 +114,6 @@ namespace senf {
          */
         typedef boost::posix_time::ptime abstime_type;
 
-        static unsigned const CheckInterval = 10;
-
-        ///////////////////////////////////////////////////////////////////////////
-        ///\name Structors and default members
-        ///@{
-
-        ~ClockService();
-
-        ///@}
         ///////////////////////////////////////////////////////////////////////////
 
         static clock_type now();  ///< Return current clock value
@@ -171,30 +162,16 @@ namespace senf {
     private:
         ClockService();
 
-        void timer();
-
-        clock_type now_m();
         abstime_type abstime_m(clock_type clock);
         clock_type clock_m(abstime_type time);
-        void restart_m(bool restart = true);
-
-        bool checkSkew(boost::posix_time::ptime time);
-        void updateSkew(boost::posix_time::ptime time);
-        void clockSkew(boost::posix_time::ptime time, boost::posix_time::ptime expected);
-
-        void restartTimer(bool restart = true);
+        void restart_m();
 
-        boost::posix_time::ptime base_;
-        boost::posix_time::ptime heartbeat_;
+        boost::posix_time::ptime baseAbstime_;
+        clock_type baseClock_;
 
-        // I don't want this header to depend on the legacy C headers.
         /// Internal: ClockService private data (PIMPL idiom)
-        struct Impl;
-        boost::scoped_ptr<Impl> impl_;
 
-        friend class Impl;
 #ifndef DOXYGEN
-        friend class senf::detail::ClockServiceTest;
         friend class singleton<ClockService>;
 #endif
     };
index 6885067..182755b 100644 (file)
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
-namespace senf {
-namespace detail {
-
-    struct ClockServiceTest
-    {
-        static boost::posix_time::ptime & base() 
-            { return senf::ClockService::instance().base_; }
-        static boost::posix_time::ptime & heartbeat()
-            { return senf::ClockService::instance().heartbeat_; }
-    };
-
-}}
-
 namespace {
 
     bool is_close_clock(senf::ClockService::clock_type a, senf::ClockService::clock_type b, 
@@ -85,35 +72,18 @@ BOOST_AUTO_UNIT_TEST(clockService)
                            (t2) );
 
     t1 = t2;
-    // We shift both heartbeat() and base() back 1 minute. This is the same as
-    // moving the current time forward 1 minute.
-    boost::posix_time::ptime b (senf::detail::ClockServiceTest::base());
-    boost::posix_time::ptime h (senf::detail::ClockServiceTest::heartbeat());
-    senf::detail::ClockServiceTest::heartbeat() -= boost::posix_time::minutes(1);
-    senf::detail::ClockServiceTest::base() -= boost::posix_time::minutes(1);
 
     // Wait for SIGALRM and let the signal handler do the clock-skew detection
-    delay(senf::ClockService::CheckInterval*1000);
-
-    BOOST_CHECK_PREDICATE( is_close_pt,
-                           (b)
-                           (senf::detail::ClockServiceTest::base()) );
-    BOOST_CHECK_PREDICATE( is_close_pt,
-                           (h+boost::posix_time::seconds(senf::ClockService::CheckInterval))
-                           (senf::detail::ClockServiceTest::heartbeat()) );
+    delay(1*1000);
 
     t2 = senf::ClockService::now();
     BOOST_CHECK_PREDICATE( is_close_clock,
-                           (t1 + senf::ClockService::seconds(senf::ClockService::CheckInterval))
+                           (t1 + senf::ClockService::seconds(1))
                            (t2)
                            (senf::ClockService::milliseconds(500)) );
 
     t1 = t2;
 
-    senf::detail::ClockServiceTest::heartbeat() -= boost::posix_time::minutes(1);
-    senf::detail::ClockServiceTest::base() -= boost::posix_time::minutes(1);
-
-    // Let now() do the clock skew detection using getitimer() ...
     delay(200);
     BOOST_CHECK_PREDICATE( is_close_clock,
                            (t1 + senf::ClockService::milliseconds(200))
index 8a1293b..169bcf8 100644 (file)
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
+// At the moment, the FIFORunner is not very efficient with many non-runnable tasks since the
+// complete list of tasks is traversed on each run().
+//
+// To optimize this, we woould need a way to find the relative ordering of two tasks in O(1) (at the
+// moment, this is an O)(N) operation by traversing the list).
+//
+// One idea is, to give each task an 'order' value. Whenever a task is added at the end, it's order
+// value is set to the order value of the last task + 1. Whenever the order value such added exceeds
+// some threshold (e.g. 2^31 -1 or some such), the task list is traversed from beginning to end to
+// assign new consecutive order values. This O(N) operation is so seldom, that it is amortized over
+// a very long time.
+//
+// With this value at hand, we can do several optimizations: One idea would be the following: The
+// runnable set always has two types of tasks: There are tasks, which are heavily active and are
+// signaled constantly and other tasks which lie dormant most of the time. Those dormant tasks will
+// end up at the beginning of the task queue.
+//
+// With the above defined 'ordering' field available, we can manage an iterator pointing to the
+// first and the last runnable task. This will often help a lot since the group of runnable tasks
+// will mostly be localized to the end of the queue. only occasionally one of the dormant tasks will
+// be runnable. This additional traversal time will be amortized over a larger time.
+
 prefix_ void senf::scheduler::FIFORunner::dequeue(TaskInfo * task)
 {
     TaskList::iterator i (TaskList::current(*task));
index bb947ea..e586592 100644 (file)
@@ -58,7 +58,7 @@ namespace scheduler {
         ///\name Structors and default members
         ///@{
         
-        explicit FdDispatcher(FdManager & manager, FIFORunner & runner);
+        FdDispatcher(FdManager & manager, FIFORunner & runner);
         ~FdDispatcher();
 
         ///@}
index d0737a8..3948a22 100644 (file)
@@ -43,7 +43,7 @@ prefix_ bool senf::scheduler::Poller<Value>::set(int fd, int events, Value * dat
             return true;
     if (errno == EPERM)
         return false;
-    throw SystemException("epolll_ctl");
+    SENF_THROW_SYSTEM_EXCEPTION("epolll_ctl()");
 }
 
 template <class Value>
@@ -51,7 +51,7 @@ prefix_ void senf::scheduler::Poller<Value>::remove(int fd)
 {
     if (epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, 0) == -1)
         if (errno != ENOENT && errno != EBADF)
-            throw SystemException("epoll_ctl");
+            SENF_THROW_SYSTEM_EXCEPTION("epoll_ctl()");
 }
 
 template <class Value>
@@ -64,7 +64,7 @@ prefix_ typename senf::scheduler::Poller<Value>::range senf::scheduler::Poller<V
         if (rv == -1) {
             if (errno == EINTR)
                 continue;
-            throw SystemException("epoll_wait");
+            SENF_THROW_SYSTEM_EXCEPTION("epoll_wait()");
         }
         break;
     }
index df23c1c..490c2c2 100644 (file)
@@ -41,7 +41,7 @@ prefix_ senf::scheduler::SignalDispatcher::SignalDispatcher(FdManager & manager,
 {
     SENF_ASSERT( !instance_ );
     if (pipe(sigPipe_) <0)
-        throw SystemException("pipe()");
+        SENF_THROW_SYSTEM_EXCEPTION("pipe()");
     sigemptyset(&sigSet_);
     instance_ = this;
     manager_.set(sigPipe_[0], FdManager::EV_READ, this);
@@ -49,11 +49,11 @@ prefix_ senf::scheduler::SignalDispatcher::SignalDispatcher(FdManager & manager,
 
 prefix_ senf::scheduler::SignalDispatcher::~SignalDispatcher()
 {
-    sigprocmask(SIG_UNBLOCK, & sigSet_, 0);
     for (HandlerMap::iterator i (handlers_.begin()); i != handlers_.end(); ++i) {
         ::signal(i->first, SIG_DFL);
         runner_.dequeue(&i->second);
     }
+    sigprocmask(SIG_UNBLOCK, &sigSet_, 0);
     manager_.remove(sigPipe_[0]);
     close(sigPipe_[0]);
     close(sigPipe_[1]);
@@ -86,7 +86,7 @@ prefix_ void senf::scheduler::SignalDispatcher::add(int signal, Callback const &
         if (j->first == SIGCLD)
             act.sa_flags |= SA_NOCLDSTOP;
         if (sigaction(j->first, &act, 0) < 0)
-            throw SystemException("sigaction()");
+            SENF_THROW_SYSTEM_EXCEPTION("sigaction()");
     }
 }
 
index b590200..088fc92 100644 (file)
@@ -52,9 +52,6 @@ prefix_ senf::scheduler::SignalDispatcher::SignalEvent::SignalEvent(Callback cb_
     : cb (cb_)
 {}
 
-prefix_ senf::scheduler::SignalDispatcher::SignalEvent::~SignalEvent()
-{}
-
 prefix_ void senf::scheduler::SignalDispatcher::SignalEvent::run()
 {
     cb(siginfo);
index 752f063..b2c44a4 100644 (file)
@@ -53,7 +53,7 @@ namespace scheduler {
         ///\name Structors and default members
         ///@{
 
-        explicit SignalDispatcher(FdManager & manager, FIFORunner & runner);
+        SignalDispatcher(FdManager & manager, FIFORunner & runner);
         ~SignalDispatcher();
 
         ///@}
@@ -72,7 +72,6 @@ namespace scheduler {
             : public FIFORunner::TaskInfo
         {
             explicit SignalEvent(Callback cb_);
-            virtual ~SignalEvent();
             virtual void run();
 
             siginfo_t siginfo;
index 0b97e18..d0bd0da 100644 (file)
@@ -42,6 +42,7 @@ namespace {
     {
         called = true;
     }
+
 }
 
 BOOST_AUTO_UNIT_TEST(signalDispatcher)
diff --git a/Scheduler/TimerDispatcher.cc b/Scheduler/TimerDispatcher.cc
new file mode 100644 (file)
index 0000000..225e608
--- /dev/null
@@ -0,0 +1,184 @@
+// $Id$
+//
+// Copyright (C) 2008 
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief TimerDispatcher non-inline non-template implementation */
+
+#include "TimerDispatcher.hh"
+//#include "TimerDispatcher.ih"
+
+// Custom includes
+
+//#include "TimerDispatcher.mpp"
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+unsigned senf::scheduler::TimerDispatcher::useCount_ (0);
+
+prefix_ senf::scheduler::TimerDispatcher::TimerDispatcher(FdManager & manager,
+                                                          FIFORunner & runner)
+    : manager_ (manager), runner_ (runner), lastId_ (0), blocked_ (true)
+{
+    if (pipe(timerPipe_) < 0)
+        SENF_THROW_SYSTEM_EXCEPTION("pipe()");
+    manager_.set(timerPipe_[0], FdManager::EV_READ, this);
+
+    sigemptyset(&sigSet_);
+    sigaddset(&sigSet_, SIGALRM);
+    sigprocmask(SIG_BLOCK, &sigSet_, 0);
+
+    if (useCount_ == 0) {
+        struct sigaction act;
+        act.sa_sigaction = &sigHandler;
+        act.sa_mask = sigSet_;
+        act.sa_flags = SA_SIGINFO | SA_RESTART;
+        if (sigaction(SIGALRM, &act, 0) < 0)
+            SENF_THROW_SYSTEM_EXCEPTION("sigaction()");
+    }
+
+    struct sigevent ev;
+    ev.sigev_notify = SIGEV_SIGNAL;
+    ev.sigev_signo = SIGALRM;
+    ev.sigev_value.sival_ptr = this;
+    if (timer_create(CLOCK_MONOTONIC, &ev, &timerId_) < 0)
+        SENF_THROW_SYSTEM_EXCEPTION("timer_create()");
+    
+    ++ useCount_;
+}
+
+prefix_ senf::scheduler::TimerDispatcher::~TimerDispatcher()
+{
+    -- useCount_;
+
+    TimerMap::iterator i (timers_.begin());
+    TimerMap::iterator const i_end (timers_.end());
+    for (; i != i_end; ++i)
+        runner_.dequeue(&(i->second));
+
+    timer_delete(timerId_);
+    if (useCount_ == 0)
+        ::signal(SIGALRM, SIG_IGN);
+    sigprocmask(SIG_UNBLOCK, &sigSet_, 0);
+    manager_.remove(timerPipe_[0]);
+    close(timerPipe_[0]);
+    close(timerPipe_[1]);
+}
+
+prefix_ senf::scheduler::TimerDispatcher::timer_id
+senf::scheduler::TimerDispatcher::add(ClockService::clock_type timeout, Callback const & cb)
+{
+    while (timerIdIndex_.find(++lastId_) != timerIdIndex_.end()) ;
+    TimerMap::iterator i (timers_.insert(std::make_pair(timeout, TimerEvent(lastId_, cb, *this))));
+    timerIdIndex_.insert(std::make_pair(lastId_, i));
+    runner_.enqueue(&(i->second));
+    if (! blocked_)
+        reschedule();
+    return lastId_;
+}
+
+prefix_ void senf::scheduler::TimerDispatcher::remove(timer_id id)
+{
+    TimerIdMap::iterator i (timerIdIndex_.find(id));
+    if (i == timerIdIndex_.end())
+        return;
+    runner_.dequeue(&(i->second->second));
+    timers_.erase(i->second);
+    timerIdIndex_.erase(i);
+    if (! blocked_)
+        reschedule();
+}
+
+prefix_ void senf::scheduler::TimerDispatcher::blockSignals()
+{
+    if (blocked_) 
+        return;
+    sigprocmask(SIG_BLOCK, &sigSet_, 0);
+    blocked_ = true;
+}
+
+prefix_ void senf::scheduler::TimerDispatcher::unblockSignals()
+{
+    if (! blocked_)
+        return;
+    reschedule();
+    sigprocmask(SIG_UNBLOCK, &sigSet_, 0);
+    blocked_ = false;
+}
+
+prefix_ void senf::scheduler::TimerDispatcher::signal(int events)
+{
+    siginfo_t info;
+    if (read(timerPipe_[0], &info, sizeof(info)) < int(sizeof(info)))
+        return;
+
+    TimerMap::iterator i (timers_.begin());
+    TimerMap::iterator const i_end (timers_.end());
+    ClockService::clock_type now (ClockService::now());
+    for (; i != i_end && i->first <= now ; ++i)
+        i->second.runnable = true;
+}
+
+prefix_ void senf::scheduler::TimerDispatcher::sigHandler(int signal, ::siginfo_t * siginfo,
+                                                          void *)
+{
+    // The manpage says, si_signo is unused in linux so we set it here
+    siginfo->si_signo = signal; 
+    // We can't do much on error anyway so we ignore errors here
+    if (siginfo->si_value.sival_ptr == 0)
+        return;
+    write(static_cast<TimerDispatcher*>(siginfo->si_value.sival_ptr)->timerPipe_[1], 
+          siginfo, sizeof(*siginfo));
+}
+
+prefix_ void senf::scheduler::TimerDispatcher::reschedule()
+{
+    struct itimerspec timer;
+    timer.it_interval.tv_sec = 0;
+    timer.it_interval.tv_nsec = 0;
+    if (timers_.empty()) {
+        timer.it_value.tv_sec = 0;
+        timer.it_value.tv_nsec = 0;
+    }
+    else {
+        ClockService::clock_type next (timers_.begin()->first);
+        timer.it_value.tv_sec = ClockService::in_seconds(next);
+        timer.it_value.tv_nsec = ClockService::in_nanoseconds(
+            next - ClockService::seconds(timer.it_value.tv_sec));
+    }
+    if (timer_settime(timerId_, TIMER_ABSTIME, &timer, 0)<0)
+        SENF_THROW_SYSTEM_EXCEPTION("timer_settime()");
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+//#include "TimerDispatcher.mpp"
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/Scheduler/TimerDispatcher.cci b/Scheduler/TimerDispatcher.cci
new file mode 100644 (file)
index 0000000..d0a3308
--- /dev/null
@@ -0,0 +1,58 @@
+// $Id$
+//
+// Copyright (C) 2008 
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief TimerDispatcher inline non-template implementation */
+
+//#include "TimerDispatcher.ih"
+
+// Custom includes
+#include <unistd.h>
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+prefix_ senf::scheduler::TimerDispatcher::TimerEvent::TimerEvent(timer_id id_,
+                                                                 Callback const & cb_,
+                                                                 TimerDispatcher & dispatcher_)
+    : id (id_), cb (cb_), dispatcher (dispatcher_)
+{}
+
+prefix_ void senf::scheduler::TimerDispatcher::TimerEvent::run()
+{
+    cb();
+    dispatcher.remove(id);
+}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/Scheduler/TimerDispatcher.hh b/Scheduler/TimerDispatcher.hh
new file mode 100644 (file)
index 0000000..1a90ed5
--- /dev/null
@@ -0,0 +1,124 @@
+// $Id$
+//
+// Copyright (C) 2008 
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief TimerDispatcher public header */
+
+#ifndef HH_TimerDispatcher_
+#define HH_TimerDispatcher_ 1
+
+// Custom includes
+#include <signal.h>
+#include <set>
+#include <map>
+#include "ClockService.hh"
+#include "FdManager.hh"
+#include "FIFORunner.hh"
+
+//#include "TimerDispatcher.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace scheduler {
+
+    /** \brief
+      */
+    class TimerDispatcher
+        : public FdManager::Event
+    {
+    public:
+        ///////////////////////////////////////////////////////////////////////////
+        // Types
+
+        typedef boost::function<void ()> Callback;
+        typedef unsigned timer_id;
+
+        ///////////////////////////////////////////////////////////////////////////
+        ///\name Structors and default members
+        ///@{
+
+        TimerDispatcher(FdManager & manager, FIFORunner & runner);
+        ~TimerDispatcher();
+
+        ///@}
+        ///////////////////////////////////////////////////////////////////////////
+
+        timer_id add(ClockService::clock_type timeout, Callback const & cb);
+        void remove(timer_id id);
+
+        void blockSignals();
+        void unblockSignals();
+
+    protected:
+
+    private:
+        struct TimerEvent
+            : public FIFORunner::TaskInfo
+        {
+            TimerEvent(timer_id id_, Callback const & cb_, TimerDispatcher & dispatcher_);
+            virtual void run();
+
+            timer_id id;
+            Callback cb;
+            TimerDispatcher & dispatcher;
+        };
+
+        virtual void signal(int events);
+        static void sigHandler(int signal, ::siginfo_t * siginfo, void *);
+        void reschedule();
+
+        FdManager & manager_;
+        FIFORunner & runner_;
+
+        typedef std::multimap<ClockService::clock_type, TimerEvent> TimerMap;
+        typedef std::map<int, TimerMap::iterator> TimerIdMap;
+
+        TimerMap timers_;
+        TimerIdMap timerIdIndex_;
+        timer_id lastId_;
+
+        int timerPipe_[2];
+        sigset_t sigSet_;
+        bool blocked_;
+        timer_t timerId_;
+
+        static unsigned useCount_;
+    };
+
+}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+#include "TimerDispatcher.cci"
+//#include "TimerDispatcher.ct"
+//#include "TimerDispatcher.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/Scheduler/TimerDispatcher.test.cc b/Scheduler/TimerDispatcher.test.cc
new file mode 100644 (file)
index 0000000..46e9a82
--- /dev/null
@@ -0,0 +1,87 @@
+// $Id$
+//
+// Copyright (C) 2008 
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief TimerDispatcher.test unit tests */
+
+//#include "TimerDispatcher.test.hh"
+//#include "TimerDispatcher.test.ih"
+
+// Custom includes
+#include "TimerDispatcher.hh"
+
+#include "../Utils//auto_unit_test.hh"
+#include <boost/test/test_tools.hpp>
+
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+namespace {
+
+    bool is_close(senf::ClockService::clock_type a, senf::ClockService::clock_type b)
+    {
+        return (a<b ? b-a : a-b) < senf::ClockService::milliseconds(100);
+    }
+    
+    bool called = false;
+    void handler()
+    {
+        called = true;
+    }
+
+}
+
+BOOST_AUTO_UNIT_TEST(timerDispatcher)
+{
+    senf::scheduler::FdManager manager;
+    senf::scheduler::FIFORunner runner;
+    senf::scheduler::TimerDispatcher dispatcher (manager, runner);
+    manager.timeout(1000);
+
+    senf::ClockService::clock_type t (senf::ClockService::now());
+    senf::scheduler::TimerDispatcher::timer_id id;
+    SENF_CHECK_NO_THROW(
+        id = dispatcher.add( t + senf::ClockService::milliseconds(500), &handler ) );
+    SENF_CHECK_NO_THROW( dispatcher.unblockSignals() );
+    SENF_CHECK_NO_THROW( manager.processOnce() );
+    SENF_CHECK_NO_THROW( dispatcher.blockSignals() );
+    SENF_CHECK_NO_THROW( runner.run() );
+    senf::ClockService::clock_type t2 (senf::ClockService::now());
+    BOOST_CHECK( called );
+    BOOST_CHECK_PREDICATE( is_close, (t2)(t + senf::ClockService::milliseconds(500)) );
+
+    SENF_CHECK_NO_THROW( dispatcher.remove(id) );
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
index 0cab0d9..61b7253 100644 (file)
@@ -48,7 +48,7 @@ prefix_ senf::log::time_type senf::log::SystemTimeSource::operator()()
 {
     struct ::timespec tm;
     if (::clock_gettime(CLOCK_MONOTONIC, &tm) < 0)
-        SENF_THROW_SYSTEM_EXCEPTION("::timer_gettime()");
+        SENF_THROW_SYSTEM_EXCEPTION("::clock_gettime()");
     return static_cast<time_type>(tm.tv_sec)*1000000000ll+tm.tv_nsec;
 }