PPI: Use ClockService instead of boost::posix_time
g0dil [Mon, 20 Aug 2007 11:21:00 +0000 (11:21 +0000)]
PPI: Implement IntervalTimer event
Scheduler: Change timeout() argyument to absolute time represented as ClockService::clock_type
Scheduler: Implement timer event cancelation
PPI: Implement RateStuffer in ppitest

git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@399 270642c3-0616-0410-b53a-bc976706d245

21 files changed:
PPI/EventManager.cci
PPI/EventManager.hh
PPI/Events.cti
PPI/Events.hh
PPI/IOEvent.hh
PPI/IntervalTimer.cc [new file with mode: 0644]
PPI/IntervalTimer.cci [new file with mode: 0644]
PPI/IntervalTimer.hh [new file with mode: 0644]
PPI/Module.cci
PPI/Module.hh
PPI/Module.test.cc
PPI/SocketReader.test.cc
PPI/SocketWriter.test.cc
PPI/detail/EventBinding.cc
PPI/detail/EventBinding.cti
PPI/detail/EventBinding.hh
PPI/ppitest/ppitest.cc
Scheduler/Scheduler.cc
Scheduler/Scheduler.cci
Scheduler/Scheduler.hh
Scheduler/Scheduler.test.cc

index 2d82a05..0829b38 100644 (file)
@@ -39,7 +39,7 @@ prefix_ senf::ppi::EventManager & senf::ppi::EventManager::instance()
     return manager;
 }
 
-prefix_ boost::posix_time::ptime senf::ppi::EventManager::eventTime()
+prefix_ senf::ClockService::clock_type senf::ppi::EventManager::eventTime()
 {
     return eventTime_;
 }
@@ -47,7 +47,7 @@ prefix_ boost::posix_time::ptime senf::ppi::EventManager::eventTime()
 ////////////////////////////////////////
 // private members
 
-prefix_ void senf::ppi::EventManager::eventTime(boost::posix_time::ptime time)
+prefix_ void senf::ppi::EventManager::eventTime(ClockService::clock_type time)
 {
     eventTime_ = time;
 }
index 18f41d1..5723de9 100644 (file)
@@ -27,8 +27,8 @@
 #define HH_EventManager_ 1
 
 // Custom includes
-#include <boost/date_time/posix_time/posix_time_types.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
+#include "Scheduler/ClockService.hh"
 #include "predecl.hh"
 #include "detail/Callback.hh"
 #include "detail/EventBinding.hh"
@@ -79,7 +79,7 @@ namespace ppi {
                            typename Callback<Descriptor>::type callback,
                            Descriptor & descriptor);
 
-        boost::posix_time::ptime eventTime();
+        ClockService::clock_type eventTime();
 
     protected:
 
@@ -89,9 +89,9 @@ namespace ppi {
         typedef boost::ptr_vector<detail::EventBindingBase> EventRegistrations;
         EventRegistrations registrations_;
 
-        void eventTime(boost::posix_time::ptime time);
+        void eventTime(ClockService::clock_type time);
 
-        boost::posix_time::ptime eventTime_;
+        ClockService::clock_type eventTime_;
 
         friend class detail::EventBindingBase;
         friend class module::Module;
index 428e2cd..4a128b4 100644 (file)
@@ -40,7 +40,7 @@
 template <class EventType, class Self>
 prefix_ void
 senf::ppi::EventImplementationHelper<EventType,Self>::callback(EventArg event,
-                                                               boost::posix_time::ptime time)
+                                                               ClockService::clock_type time)
 {
     binding().callback(event,time);
 }
@@ -70,7 +70,7 @@ senf::ppi::EventImplementationHelper<EventType,Self>::binding()
 
 template <class Self>
 prefix_ void
-senf::ppi::EventImplementationHelper<void,Self>::callback(boost::posix_time::ptime time)
+senf::ppi::EventImplementationHelper<void,Self>::callback(ClockService::clock_type time)
 {
     binding().callback(time);
 }
index 698a433..160d1b7 100644 (file)
@@ -26,7 +26,7 @@
 
 // Custom includes
 #include <vector>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include "Scheduler/ClockService.hh"
 #include "predecl.hh"
 
 //#include "Events.mpp"
@@ -84,7 +84,7 @@ namespace ppi {
     protected:
         typedef typename detail::EventArgType<EventType>::type EventArg;
 
-        void callback(EventArg event, boost::posix_time::ptime time);
+        void callback(EventArg event, ClockService::clock_type time);
         void callback(EventArg event);
 
     private:
@@ -95,7 +95,7 @@ namespace ppi {
     class EventImplementationHelper<void,Self>
     {
     protected:
-        void callback(boost::posix_time::ptime time);
+        void callback(ClockService::clock_type time);
         void callback();
 
     private:
index a61dec5..9157622 100644 (file)
@@ -27,8 +27,8 @@
 #define HH_IOEvent_ 1
 
 // Custom includes
-#include "Events.hh"
 #include "Scheduler/Scheduler.hh"
+#include "Events.hh"
 
 //#include "IOEvent.mpp"
 ///////////////////////////////hh.p////////////////////////////////////////
diff --git a/PPI/IntervalTimer.cc b/PPI/IntervalTimer.cc
new file mode 100644 (file)
index 0000000..ecda76d
--- /dev/null
@@ -0,0 +1,84 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief IntervalTimer non-inline non-template implementation */
+
+#include "IntervalTimer.hh"
+//#include "IntervalTimer.ih"
+
+// Custom includes
+#include "Scheduler/Scheduler.hh"
+
+//#include "IntervalTimer.mpp"
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::IntervalTimer
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::IntervalTimer::v_enable()
+{
+    info_.intervalStart = ClockService::now();
+    info_.number = 0;
+    schedule();
+}
+
+prefix_ void senf::ppi::IntervalTimer::v_disable()
+{
+    Scheduler::instance().cancelTimeout(id_);
+}
+
+prefix_ void senf::ppi::IntervalTimer::schedule()
+{
+    info_.expected = info_.intervalStart + ( interval_ * (info_.number+1) ) / eventsPerInterval_;
+    id_ = Scheduler::instance().timeout(info_.expected, boost::bind(&IntervalTimer::cb,this));
+}
+
+prefix_ void senf::ppi::IntervalTimer::cb()
+{
+    callback(info_, info_.expected);
+    ++ info_.number;
+    if (info_.number >= eventsPerInterval_) {
+        info_.number = 0;
+        info_.intervalStart += interval_;
+    }
+    schedule();
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+//#include "IntervalTimer.mpp"
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/PPI/IntervalTimer.cci b/PPI/IntervalTimer.cci
new file mode 100644 (file)
index 0000000..39d1ddb
--- /dev/null
@@ -0,0 +1,51 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief IntervalTimer inline non-template implementation */
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::IntervalTimer
+
+prefix_ senf::ppi::IntervalTimer::IntervalTimer(ClockService::clock_type interval,
+                                                unsigned eventsPerInterval)
+    : interval_ (interval), eventsPerInterval_ (eventsPerInterval)
+{}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/PPI/IntervalTimer.hh b/PPI/IntervalTimer.hh
new file mode 100644 (file)
index 0000000..f057473
--- /dev/null
@@ -0,0 +1,97 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief IntervalTimer public header */
+
+#ifndef HH_IntervalTimer_
+#define HH_IntervalTimer_ 1
+
+// Custom includes
+#include "Scheduler/ClockService.hh"
+#include "Events.hh"
+
+//#include "IntervalTimer.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+
+    struct IntervalTimerEventInfo
+    {
+        ClockService::clock_type expected;
+        ClockService::clock_type intervalStart;
+        unsigned number;
+    };
+
+    /** \brief
+      */
+    class IntervalTimer
+        : public EventImplementation<IntervalTimerEventInfo>
+    {
+    public:
+        ///////////////////////////////////////////////////////////////////////////
+        // Types
+
+        ///////////////////////////////////////////////////////////////////////////
+        ///\name Structors and default members
+        ///@{
+
+        explicit IntervalTimer(ClockService::clock_type interval, 
+                               unsigned eventsPerInterval=1);
+
+        ///@}
+        ///////////////////////////////////////////////////////////////////////////
+
+    protected:
+
+    private:
+        virtual void v_enable();
+        virtual void v_disable();
+
+        void schedule();
+        void cb();
+
+        ClockService::clock_type interval_;
+        unsigned eventsPerInterval_;
+        IntervalTimerEventInfo info_;
+        unsigned id_;
+    };
+
+}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+#include "IntervalTimer.cci"
+//#include "IntervalTimer.ct"
+//#include "IntervalTimer.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
index 533fbd3..f436070 100644 (file)
@@ -40,7 +40,7 @@ prefix_ senf::ppi::module::Module::~Module()
     moduleManager().unregisterModule(*this);
 }
 
-prefix_ boost::posix_time::ptime senf::ppi::module::Module::eventTime()
+prefix_ senf::ClockService::clock_type senf::ppi::module::Module::eventTime()
 {
     return eventManager().eventTime();
 }
index c1b0a98..e5551ed 100644 (file)
@@ -28,8 +28,8 @@
 // Custom includes
 #include <vector>
 #include <boost/utility.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
+#include "Scheduler/ClockService.hh"
 #include "predecl.hh"
 
 //#include "Module.mpp"
@@ -121,8 +121,8 @@ namespace module {
                                                  event is signaled
                                              \param[in] descriptor The type of event to register */
 
-        boost::posix_time::ptime eventTime(); ///< Return timestamp of the currently processing
-                                              ///< event
+        ClockService::clock_type eventTime(); ///< Return timestamp of the currently processing
+                                               ///< event
 
         void destroy();
 
index bfb2201..7c25134 100644 (file)
@@ -75,8 +75,7 @@ BOOST_AUTO_UNIT_TEST(module)
 
     tester.event.trigger();
     BOOST_CHECK_EQUAL( sink.size(), 1u );
-    BOOST_CHECK_EQUAL( (boost::posix_time::microsec_clock::universal_time() - 
-                        tester.eventTime()).total_seconds(), 0 );
+    BOOST_CHECK( senf::ClockService::now() - tester.eventTime() < 1000000000L );
 }
 
 ///////////////////////////////cc.e////////////////////////////////////////
index b17d6bc..14d283f 100644 (file)
@@ -64,7 +64,7 @@ BOOST_AUTO_UNIT_TEST(socketReader)
 
     senf::UDPv4ClientSocketHandle outputSocket;
     outputSocket.writeto(senf::INet4SocketAddress("localhost:44344"),data);
-    senf::Scheduler::instance().timeout(100000, &timeout);
+    senf::Scheduler::instance().timeout(senf::ClockService::now() + 100000000, &timeout);
     senf::ppi::run();
 
     BOOST_REQUIRE( ! sink.empty() );
index 64687ef..e053d43 100644 (file)
@@ -84,7 +84,8 @@ BOOST_AUTO_UNIT_TEST(activeSocketWriter)
 
     senf::UDPv4ClientSocketHandle inputSocket;
     inputSocket.bind(senf::INet4SocketAddress("localhost:44344"));
-    senf::Scheduler::instance().timeout(100000, &timeout);
+    senf::Scheduler::instance().timeout(
+        senf::ClockService::now() + 100000000, &timeout);
     source.submit(p);
     senf::ppi::run();
 
index 5a359ad..7a9ba01 100644 (file)
@@ -33,7 +33,7 @@
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
-prefix_ void senf::ppi::detail::EventBindingBase::eventTime(boost::posix_time::ptime time)
+prefix_ void senf::ppi::detail::EventBindingBase::eventTime(ClockService::clock_type time)
 {
     // It's hard to make this inline because of a circular header dependency ...
     manager_->eventTime(time);
index f37637e..b59906d 100644 (file)
@@ -36,7 +36,7 @@
 template <class EventType, class Self>
 prefix_ void
 senf::ppi::detail::EventBindingHelper<EventType,Self>::callback(EventArg event,
-                                                                boost::posix_time::ptime time)
+                                                                ClockService::clock_type time)
 {
     self().eventTime(time);
     self().callback_(event);
@@ -45,7 +45,7 @@ senf::ppi::detail::EventBindingHelper<EventType,Self>::callback(EventArg event,
 template <class EventType, class Self>
 prefix_ void senf::ppi::detail::EventBindingHelper<EventType,Self>::callback(EventArg event)
 {
-    callback(event, boost::posix_time::microsec_clock::universal_time());
+    callback(event, ClockService::now());
 }
 
 ////////////////////////////////////////
@@ -62,7 +62,7 @@ prefix_ Self & senf::ppi::detail::EventBindingHelper<EventType,Self>::self()
 
 template <class Self>
 prefix_ void
-senf::ppi::detail::EventBindingHelper<void,Self>::callback(boost::posix_time::ptime time)
+senf::ppi::detail::EventBindingHelper<void,Self>::callback(ClockService::clock_type time)
 {
     self().eventTime(time);
     self().callback_();
@@ -71,7 +71,7 @@ senf::ppi::detail::EventBindingHelper<void,Self>::callback(boost::posix_time::pt
 template <class Self>
 prefix_ void senf::ppi::detail::EventBindingHelper<void,Self>::callback()
 {
-    callback(boost::posix_time::microsec_clock::universal_time());
+    callback(ClockService::now());
 }
 
 ////////////////////////////////////////
index 468e713..4698c81 100644 (file)
@@ -27,7 +27,7 @@
 #define HH_EventBinding_ 1
 
 // Custom includes
-#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include "Scheduler/ClockService.hh"
 #include "../predecl.hh"
 #include "Callback.hh"
 
@@ -47,7 +47,7 @@ namespace detail {
         EventBindingBase(EventManager & manager, module::Module & module, 
                          EventDescriptor & descriptor);
 
-        void eventTime(boost::posix_time::ptime time);
+        void eventTime(ClockService::clock_type time);
 
     private:
         EventManager * manager_;
@@ -63,7 +63,7 @@ namespace detail {
     public:
         typedef typename detail::EventArgType<EventType>::type EventArg;
 
-        void callback(EventArg event, boost::posix_time::ptime time);
+        void callback(EventArg event, ClockService::clock_type time);
         void callback(EventArg event);
         
     private:
@@ -74,7 +74,7 @@ namespace detail {
     class EventBindingHelper<void,Self>
     {
     public:
-        void callback(boost::posix_time::ptime time);
+        void callback(ClockService::clock_type time);
         void callback();
         
     private:
index f38ad88..ea29427 100644 (file)
@@ -1,6 +1,6 @@
 // $Id$
 //
-// Copyright (C) 2007 
+// Copyright (C) 2007
 // Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
 // Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
 //     Stefan Bund <g0dil@berlios.de>
 #include "Socket/Protocols/INet/INetAddressing.hh"
 #include "PPI/SocketReader.hh"
 #include "PPI/SocketWriter.hh"
+#include "PPI/Module.hh"
+#include "PPI/IntervalTimer.hh"
+#include "PPI/Joins.hh"
+#include "PPI/PassiveQueue.hh"
 #include "PPI/Setup.hh"
 
 //#include "ppitest.mpp"
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
+namespace module = senf::ppi::module;
+namespace connector = senf::ppi::connector;
+namespace ppi = senf::ppi;
+
+namespace {
+
+    class RateFilter
+        : public module::Module
+    {
+        SENF_PPI_MODULE(RateFilter);
+    public:
+
+        connector::ActiveInput input;
+        connector::ActiveOutput output;
+
+        RateFilter(senf::ClockService::clock_type interval) : timer(interval) {
+            route(input,output);
+            route(input,timer);
+            registerEvent(&RateFilter::timeout, timer);
+        }
+
+    private:
+        void timeout() {
+            output(input());
+        }
+
+        ppi::IntervalTimer timer;
+    };
+
+    class CopyPacketGenerator
+        : public module::Module
+    {
+        SENF_PPI_MODULE(CopyPacketGenerator);
+    public:
+
+        connector::PassiveOutput output;
+
+        CopyPacketGenerator(senf::Packet p) : packet(p) {
+            noroute(output);
+            output.onRequest(&CopyPacketGenerator::request);
+        }
+
+    private:
+        void request() {
+            output(packet);
+        }
+
+        senf::Packet packet;
+    };
+}
+
+// Module setup:
+//
+// 'O'        = active connector
+// '>' or '<' = input connector
+//
+// [ udpReader ] O--> [ queue ] -->O [      ]
+//                                   [ join ] -->O [ rateFilter] O--> [ udpWriter ]
+//                [ generator ] -->O [      ]
+
 int main(int argc, char * argv[])
 {
-    namespace module = senf::ppi::module;
-    namespace ppi = senf::ppi;
-
     senf::UDPv4ClientSocketHandle inputSocket;
     inputSocket.bind(senf::INet4SocketAddress("0.0.0.0:44344"));
-    module::ActiveSocketReader<> udpReader (inputSocket);
 
     senf::ConnectedUDPv4ClientSocketHandle outputSocket(
         senf::INet4SocketAddress("localhost:44345"));
-    module::PassiveSocketWriter<> udpWriter (outputSocket);
-    
-    ppi::connect(udpReader.output, udpWriter.input);
+
+    module::ActiveSocketReader<>  udpReader  (inputSocket);
+    module::PassiveQueue          queue;
+    CopyPacketGenerator           generator  (senf::DataPacket::create(std::string("<idle>\n")));
+    module::PriorityJoin          join;
+    RateFilter                    rateFilter (1000000000ul);
+    module::PassiveSocketWriter<> udpWriter  (outputSocket);
+
+    ppi::connect( udpReader,  queue      );
+    ppi::connect( queue,      join       );
+    ppi::connect( generator,  join       );
+    ppi::connect( join,       rateFilter );
+    ppi::connect( rateFilter, udpWriter  );
 
     ppi::run();
-    
+
     return 0;
 }
 
index 816e663..177a874 100644 (file)
@@ -85,19 +85,8 @@ static const int EPollInitialSize = 16;
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
-prefix_ senf::Scheduler::Scheduler & senf::Scheduler::instance()
-{
-    static Scheduler instance;
-    return instance;
-}
-
-prefix_ void senf::Scheduler::timeout(ClockService::clock_type timeout, TimerCallback const & cb)
-{
-    timerQueue_.push(TimerSpec(ClockService::now()+timeout,cb));
-}
-
 prefix_ senf::Scheduler::Scheduler()
-    : epollFd_ (epoll_create(EPollInitialSize))
+    : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false)
 {
     if (epollFd_<0)
         throw SystemException(errno);
@@ -167,8 +156,11 @@ prefix_ void senf::Scheduler::process()
     while (! terminate_) {
         ClockService::clock_type timeNow = ClockService::now();
 
-        while ( ! timerQueue_.empty() && timerQueue_.top().timeout <= timeNow ) {
-            timerQueue_.top().cb();
+        while ( ! timerQueue_.empty() && timerQueue_.top()->second.timeout <= timeNow ) {
+            TimerMap::iterator i (timerQueue_.top());
+            if (! i->second.canceled)
+                i->second.cb();
+            timerMap_.erase(i);
             timerQueue_.pop();
         }
 
@@ -177,7 +169,8 @@ prefix_ void senf::Scheduler::process()
 
         int timeout (MinTimeout);
         if (! timerQueue_.empty()) {
-            ClockService::clock_type delta ((timerQueue_.top().timeout - timeNow)/1000000UL);
+            ClockService::clock_type delta (
+                (timerQueue_.top()->second.timeout - timeNow)/1000000UL);
             if (delta<MinTimeout)
                 timeout = int(delta);
         }
index 2308705..fbae221 100644 (file)
 #define prefix_ inline
 ///////////////////////////////cci.p///////////////////////////////////////
 
+prefix_ senf::Scheduler::Scheduler & senf::Scheduler::instance()
+{
+    static Scheduler instance;
+    return instance;
+}
+
+prefix_ unsigned senf::Scheduler::timeout(ClockService::clock_type timeout,
+                                          TimerCallback const & cb)
+{
+    TimerMap::iterator i (
+        timerMap_.insert(std::make_pair(timerIdCounter_, 
+                                        TimerSpec(timeout,cb,timerIdCounter_))).first);
+    timerQueue_.push(i);
+    return timerIdCounter_++;
+}
+
+prefix_ void senf::Scheduler::cancelTimeout(unsigned id)
+{
+    TimerMap::iterator i (timerMap_.find(id));
+    if (i != timerMap_.end())
+        i->second.canceled = true;
+}
+
 prefix_ void senf::Scheduler::terminate()
 {
     terminate_ = true;
@@ -41,6 +64,12 @@ prefix_ int senf::retrieve_filehandle(int fd)
     return fd;
 }
 
+prefix_ senf::Scheduler::TimerSpecCompare::result_type
+senf::Scheduler::TimerSpecCompare::operator()(first_argument_type a, second_argument_type b)
+{
+    return a->second < b->second;
+}
+
 ///////////////////////////////cci.e///////////////////////////////////////
 #undef prefix_
 
index 4914e59..92eb86c 100644 (file)
@@ -147,7 +147,7 @@ namespace senf {
                                              \param[in] eventMask arbitrary combination via '|'
                                                  operator of EventId designators. */
 
-        void timeout(ClockService::clock_type timeout, TimerCallback const & cb); 
+        unsigned timeout(ClockService::clock_type timeout, TimerCallback const & cb); 
                                         ///< Add timeout event
                                         /**< \param[in] timeout timeout in nanoseconds
                                              \param[in] cb callback to call after \a timeout
@@ -155,6 +155,8 @@ namespace senf {
                                              \todo Return some kind of handle/pointer and add
                                                  support to update or revoke a timeout */
 
+        void cancelTimeout(unsigned id);
+
         void process();                 ///< Event handler main loop
                                         /**< This member must be called at some time to enter the
                                              event handler main loop. Only while this function is
@@ -196,21 +198,37 @@ namespace senf {
         struct TimerSpec
         {
             TimerSpec() : timeout(), cb() {}
-            TimerSpec(ClockService::clock_type timeout_, TimerCallback cb_)
-                : timeout(timeout_), cb(cb_) {}
+            TimerSpec(ClockService::clock_type timeout_, TimerCallback cb_, unsigned id_)
+                : timeout(timeout_), cb(cb_), id(id_), canceled(false) {}
 
             bool operator< (TimerSpec const & other) const
                 { return timeout > other.timeout; }
 
             ClockService::clock_type timeout;
             TimerCallback cb;
+            unsigned id;
+            bool canceled;
         };
 
         typedef std::map<int,EventSpec> FdTable;
-        typedef std::priority_queue<TimerSpec> TimerQueue;
+        typedef std::map<unsigned,TimerSpec> TimerMap;
+
+        struct TimerSpecCompare
+        {
+            typedef TimerMap::iterator first_argument_type;
+            typedef TimerMap::iterator second_argument_type;
+            typedef bool result_type;
+            
+            result_type operator()(first_argument_type a, second_argument_type b);
+        };
+
+        typedef std::priority_queue<TimerMap::iterator, std::vector<TimerMap::iterator>, 
+                                    TimerSpecCompare> TimerQueue;
 
         FdTable fdTable_;
+        unsigned timerIdCounter_;
         TimerQueue timerQueue_;
+        TimerMap timerMap_;
         int epollFd_;
         bool terminate_;
     };
index b87b508..102d1a3 100644 (file)
@@ -223,8 +223,10 @@ BOOST_AUTO_UNIT_TEST(scheduler)
     buffer[size]=0;
     BOOST_CHECK_EQUAL( buffer, "READ" );
 
-    BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(100000000UL,&timeout) );
-    BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(200000000UL,&timeout) );
+    BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(
+                              ClockService::now()+100000000UL,&timeout) );
+    BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(
+                              ClockService::now()+200000000UL,&timeout) );
     ClockService::clock_type t (ClockService::now());
     BOOST_CHECK_NO_THROW( Scheduler::instance().process() );
     BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+100000000UL) );