PPI: Complete connector implementation
g0dil [Mon, 13 Aug 2007 15:38:11 +0000 (15:38 +0000)]
PPI: Implement throttling
PPI: Implement ModuleManager and ppi::init
PPI: Implement ThresholdQueuing
PPI: Hugely extended routing classes

git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@390 270642c3-0616-0410-b53a-bc976706d245

23 files changed:
PPI/Connectors.cc
PPI/Connectors.cci
PPI/Connectors.cti
PPI/Connectors.hh
PPI/Connectors.test.cc
PPI/DebugModules.cci
PPI/DebugModules.hh
PPI/DebugModules.test.cc
PPI/Module.cci
PPI/Module.hh
PPI/ModuleManager.cc [new file with mode: 0644]
PPI/ModuleManager.cci [new file with mode: 0644]
PPI/ModuleManager.hh [new file with mode: 0644]
PPI/Queueing.cc
PPI/Queueing.cci [new file with mode: 0644]
PPI/Queueing.hh
PPI/Route.cci
PPI/Route.cti
PPI/Route.hh
PPI/Route.ih
PPI/Setup.cci
PPI/Setup.hh
PPI/predecl.hh

index 8b02972..7fe75a0 100644 (file)
 //#include "Connectors.ih"
 
 // Custom includes
+#include "Route.hh"
 
 //#include "Connectors.mpp"
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
 ///////////////////////////////////////////////////////////////////////////
-// protected members
+// senf::ppi::connector::PassiveConnector
 
-prefix_ senf::ppi::connector::Connector::~Connector()
-{}
+////////////////////////////////////////
+// private members
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveConnector
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
+{
+    if (throttleCallback_)
+        throttleCallback_();
+    NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+    NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+    for (; i != i_end; ++i)
+        (*i)->notifyThrottle();
+}
 
-prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
+prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
 {
-    peer_ = & target;
-    target.peer_ = this;
+    if (unthrottleCallback_)
+        unthrottleCallback_();
+    NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+    NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+    for (; i != i_end; ++i)
+        (*i)->notifyUnthrottle();
+}
+
+prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route)
+{
+    notifyRoutes_.push_back(&route);
 }
 
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::InputConnector
 
+prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+{
+    if (empty())
+        v_requestEvent();
+    Packet p;
+    if (! empty()) {
+        p = peek();
+        queue_.pop_back();
+        v_dequeueEvent();
+    }
+    return p;
+}
+
 ////////////////////////////////////////
 // private members
 
@@ -78,21 +117,25 @@ prefix_ void senf::ppi::connector::ActiveInput::v_requestEvent()
 
 prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent()
 {
-    ///\fixme Emit notifications when qstate_ changes
-    if (qdisc_)
-        qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
-    else
-        qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED;
     emit();
+    qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
 }
 
 prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent()
 {
-    ///\fixme Emit notifications when qstate_ changes
-    if (qdisc_)
-        qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
-    else
-        qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED;
+    qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+}
+
+prefix_ void senf::ppi::connector::PassiveInput::v_unthrottleEvent()
+{
+    size_type n (queueSize());
+    while (n) {
+        emit();
+        size_type nn (queueSize());
+        if (n == nn)
+            break;
+        n = nn;
+    }
 }
 
 ///////////////////////////////cc.e////////////////////////////////////////
index 4b51a58..3862aab 100644 (file)
 // senf::ppi::connector::Connector
 
 prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer()
+    const
 {
     BOOST_ASSERT(peer_);
     return *peer_;
 }
 
 prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module()
+    const
 {
     BOOST_ASSERT(module_);
     return *module_;
@@ -50,6 +52,15 @@ prefix_ senf::ppi::connector::Connector::Connector()
     : peer_(), module_()
 {}
 
+prefix_ senf::ppi::connector::Connector::~Connector()
+{}
+
+prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
+{
+    peer_ = & target;
+    target.peer_ = this;
+}
+
 ////////////////////////////////////////
 // private members
 
@@ -59,18 +70,39 @@ prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module)
 }
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveConnector
+// senf::ppi::connector::PassiveConnector
 
-////////////////////////////////////////
-// protected members
+prefix_ bool senf::ppi::connector::PassiveConnector::throttled()
+    const
+{
+    return nativeThrottled_ || remoteThrottled_;
+}
 
-prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
-{}
+prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled()
+    const
+{
+    return nativeThrottled_;
+}
 
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveConnector
+prefix_ void senf::ppi::connector::PassiveConnector::throttle()
+{
+    if (!throttled())
+        emitThrottle();
+    nativeThrottled_ = true;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::unthrottle()
+{
+    if (throttled() && ! remoteThrottled_) {
+        nativeThrottled_ = false;
+        emitUnthrottle();
+    } else
+        nativeThrottled_ = false;
+
+}
 
 prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer()
+    const
 {
     return dynamic_cast<ActiveConnector&>(Connector::peer());
 }
@@ -79,51 +111,104 @@ prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveCon
 // protected members
 
 prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector()
-    : callback_()
+    : callback_(), remoteThrottled_(), nativeThrottled_()
 {}
 
 prefix_ void senf::ppi::connector::PassiveConnector::emit()
 {
     BOOST_ASSERT(callback_);
-    callback_();
+    if (!throttled())
+        callback_();
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle()
+{
+    if (!throttled()) {
+        remoteThrottled_ = true;
+        emitThrottle();
+    } 
+    else 
+        remoteThrottled_ = true;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle()
+{
+    if (throttled() && !nativeThrottled_) {
+        remoteThrottled_ = false;
+        emitUnthrottle();
+    } 
+    else
+        remoteThrottled_ = false;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle()
+{
+    peer().notifyThrottle();
 }
 
+prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle()
+{
+    peer().notifyUnthrottle();
+    v_unthrottleEvent();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
+{}
+
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::InputConnector
+// senf::ppi::connector::ActiveConnector
 
-prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer()
+    const
 {
-    v_requestEvent();
-    Packet p (peek());
-    queue_.pop_back();
-    v_dequeueEvent();
-    return p;
+    return dynamic_cast<PassiveConnector&>(Connector::peer());
 }
 
-prefix_ bool senf::ppi::connector::InputConnector::boolean_test()
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle()
 {
-    ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?)
-    return ! empty();
+    throttleCallback_ = Callback();
 }
 
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle()
+{
+    unthrottleCallback_ = Callback();
+}
+
+////////////////////////////////////////
+// protected members
+
+prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
+    : throttleCallback_(), unthrottleCallback_(), notifyRoutes_()
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::InputConnector
+
 prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer()
+    const
 {
     return dynamic_cast<OutputConnector &>(Connector::peer());
 }
 
 prefix_ senf::ppi::connector::InputConnector::queue_iterator
 senf::ppi::connector::InputConnector::begin()
+    const
 {
     return queue_.begin();
 }
 
 prefix_ senf::ppi::connector::InputConnector::queue_iterator
 senf::ppi::connector::InputConnector::end()
+    const
 {
     return queue_.end();
 }
 
 prefix_ senf::Packet senf::ppi::connector::InputConnector::peek()
+    const
 {
     BOOST_ASSERT( ! queue_.empty() );
     return queue_.back();
@@ -131,11 +216,13 @@ prefix_ senf::Packet senf::ppi::connector::InputConnector::peek()
 
 prefix_ senf::ppi::connector::InputConnector::size_type
 senf::ppi::connector::InputConnector::queueSize()
+    const
 {
     return queue_.size();
 }
 
 prefix_ bool senf::ppi::connector::InputConnector::empty()
+    const
 {
     return queue_.empty();
 }
@@ -164,6 +251,7 @@ prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet p)
 }
 
 prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer()
+    const
 {
     return dynamic_cast<InputConnector&>(Connector::peer());
 }
@@ -178,22 +266,36 @@ prefix_ senf::ppi::connector::OutputConnector::OutputConnector()
 // senf::ppi::connector::PassiveInput
 
 prefix_ senf::ppi::connector::PassiveInput::PassiveInput()
-    : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED)
+    : qdisc_(new ThresholdQueueing(1,0))
 {}
 
 prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer()
+    const
 {
     return dynamic_cast<ActiveOutput&>(Connector::peer());
 }
 
+prefix_ bool senf::ppi::connector::PassiveInput::boolean_test()
+    const
+{
+    return ! empty();
+}
+
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::PassiveOutput
 
 prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer()
+    const
 {
     return dynamic_cast<ActiveInput&>(Connector::peer());
 }
 
+prefix_ bool senf::ppi::connector::PassiveOutput::boolean_test()
+    const
+{
+    return  true;
+}
+
 prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target)
 {
     Connector::connect(target);
@@ -203,10 +305,17 @@ prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target)
 // senf::ppi::connector::ActiveInput
 
 prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer()
+    const
 {
     return dynamic_cast<PassiveOutput&>(Connector::peer());
 }
 
+prefix_ bool senf::ppi::connector::ActiveInput::boolean_test()
+    const
+{
+    return ! empty() || ! peer().throttled();
+}
+
 prefix_ void senf::ppi::connector::ActiveInput::request()
 {
     peer().emit();
@@ -216,10 +325,17 @@ prefix_ void senf::ppi::connector::ActiveInput::request()
 // senf::ppi::connector::ActiveOutput
 
 prefix_ senf::ppi::connector::PassiveInput & senf::ppi::connector::ActiveOutput::peer()
+    const
 {
     return dynamic_cast<PassiveInput&>(Connector::peer());
 }
 
+prefix_ bool senf::ppi::connector::ActiveOutput::boolean_test()
+    const
+{
+    return ! peer().throttled();
+}
+
 prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target)
 {
     Connector::connect(target);
index 9c77368..992bd8c 100644 (file)
@@ -40,12 +40,27 @@ prefix_ void senf::ppi::connector::PassiveConnector::onRequest(Handler handler)
 }
 
 ///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveConnector
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle(Handler handler)
+{
+    throttleCallback_ = detail::Callback<>::make(handler, module());
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle(Handler handler)
+{
+    unthrottleCallback_ = detail::Callback<>::make(handler, module());
+}
+
+///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::PassiveInput
 
 template <class QDisc>
 prefix_ void senf::ppi::connector::PassiveInput::qdisc(QDisc const & disc)
 {
-    qdisc_ = boost::scoped_ptr<QueueingDiscipline>(new QDisc(disc));
+    qdisc_.reset(new QDisc(disc));
 }
 
 ///////////////////////////////cti.e///////////////////////////////////////
index 6ca4aa5..6e01ca3 100644 (file)
@@ -67,8 +67,8 @@ namespace connector {
         : boost::noncopyable
     {
     public:
-        Connector & peer();             ///< Get peer connected to this connector
-        module::Module & module();      ///< Get this connectors containing module
+        Connector & peer() const;       ///< Get peer connected to this connector
+        module::Module & module() const; ///< Get this connectors containing module
 
     protected:
         Connector();
@@ -119,13 +119,13 @@ namespace connector {
                                                  operation is to be performed. */
 
         
-        bool throttled();               ///< Get accumulative throttling state
-        bool nativeThrottled();         ///< Get native throttling state
+        bool throttled() const;         ///< Get accumulative throttling state
+        bool nativeThrottled() const;   ///< Get native throttling state
 
         void throttle();                ///< Set native throttling
         void unthrottle();              ///< Revoke native throttling
         
-        ActiveConnector & peer();
+        ActiveConnector & peer() const;
 
     protected:
         PassiveConnector();
@@ -133,13 +133,25 @@ namespace connector {
         void emit();
 
     private:
+        // Called by the routing to change the remote throttling state
         void notifyThrottle();          ///< Forward a throttling notification to this connector
         void notifyUnthrottle();        ///< Forward an unthrottling notification to this connector
 
+        // Internal members to emit throttling notifications
+        void emitThrottle();
+        void emitUnthrottle();
+
+        // Called after unthrottling the connector
+        virtual void v_unthrottleEvent();
+
         typedef detail::Callback<>::type Callback;
         Callback callback_;
-        
-        friend class ActiveConnector;
+
+        bool remoteThrottled_;
+        bool nativeThrottled_;
+
+        friend class senf::ppi::detail::ForwardForwardingRouteImplementation;
+        friend class senf::ppi::detail::BackwardForwardingRouteImplementation;
     };
 
     /** \brief Active connector baseclass
@@ -155,9 +167,10 @@ namespace connector {
     class ActiveConnector 
         : public virtual Connector
     {
+        typedef detail::Callback<>::type Callback;
     public:
         template <class Handler>
-        void onThrottle(Handler handle); ///< Register throttle notification handler
+        void onThrottle(Handler handler); ///< Register throttle notification handler
                                         /**< The handler register here will be called, whenever a
                                              throttle notification comes in. The \a handler argument
                                              is either an arbitrary callable object or it is a
@@ -165,11 +178,12 @@ namespace connector {
                                              this input. In the second case, the pointer will
                                              automatically be bound to the containing instance.
 
-                                             \param[in] handle Handler to call on throttle
+                                             \param[in] handler Handler to call on throttle
                                                  notifications. */
+        void onThrottle();
 
         template <class Handler>
-        void onUnthrottle(Handler handle); ///< Register unthrottle notification handler
+        void onUnthrottle(Handler handler); ///< Register unthrottle notification handler
                                         /**< The handler register here will be called, whenever an
                                              unthrottle notification comes in. The \a handler
                                              argument is either an arbitrary callable object or it
@@ -179,11 +193,29 @@ namespace connector {
 
                                              \param[in] handle Handler to call on unthrottle
                                                  notifications. */
+        void onUnthrottle();
 
-        PassiveConnector & peer();
+        PassiveConnector & peer() const;
 
     protected:
         ActiveConnector();
+
+    private:
+        // called by the peer() to forward throttling notifications
+        void notifyThrottle();
+        void notifyUnthrottle();
+
+        // called by ForwardingRoute to register a new route
+        void registerRoute(ForwardingRoute & route);
+
+        Callback throttleCallback_;
+        Callback unthrottleCallback_;
+
+        typedef std::vector<ForwardingRoute*> NotifyRoutes;
+        NotifyRoutes notifyRoutes_;
+
+        friend class senf::ppi::ForwardingRoute;
+        friend class PassiveConnector;
     };
 
     /** \brief Input connector baseclass
@@ -207,8 +239,7 @@ namespace connector {
             be added to the queue before it can be processed.
      */
     class InputConnector 
-        : public virtual Connector,
-          public SafeBool<InputConnector>
+        : public virtual Connector
     {
         typedef std::deque<Packet> Queue;
     public:
@@ -224,28 +255,15 @@ namespace connector {
                                              request cannot be fulfilled, this is considered to be a
                                              logic error in the module implementation and an
                                              exception is raised. */
-        bool boolean_test ();           ///< Check packet availability
-                                        /**< Using any input connector in a boolean context will
-                                             check, whether an input request can be fulfilled. This
-                                             is always possible if the queue is non-empty. If the
-                                             input is active, it also returns when the connected
-                                             passive output is not throttled so new packets can be
-                                             requested. 
-
-                                             Calling the operator() member is an error if this test
-                                             returns \c false
-
-                                             \returns \c true if operator() can be called, \c false
-                                                 otherwise */
 
-        OutputConnector & peer();
+        OutputConnector & peer() const;
 
-        queue_iterator begin();         ///< Access queue begin (head)
-        queue_iterator end();           ///< Access queue past-the-end (tail)
-        Packet peek();                  ///< Return head element from the queue
+        queue_iterator begin() const;   ///< Access queue begin (head)
+        queue_iterator end() const;     ///< Access queue past-the-end (tail)
+        Packet peek() const;            ///< Return head element from the queue
 
-        size_type queueSize();          ///< Return number of elements in the queue
-        bool empty();                   ///< Return queueSize() == 0
+        size_type queueSize() const;    ///< Return number of elements in the queue
+        bool empty() const;             ///< Return queueSize() == 0
 
     protected:
         InputConnector();
@@ -274,7 +292,7 @@ namespace connector {
     public:
         void operator()(Packet p);        ///< Send out a packet
 
-        InputConnector & peer();
+        InputConnector & peer() const;
 
     protected:
         OutputConnector();
@@ -285,21 +303,23 @@ namespace connector {
 
     /** \brief Combination of PassiveConnector and InputConnector
 
-        In addition to the native and the forwarded throttling state, the PassiveInput manages a
-        queue throttling state. This state is automatically managed by a queueing discipline. The
-        standard queueing discipline is ThresholdQueueing, which throttles the connection whenever
-        the queue length reaches the high threshold and unthrottles the connection when the queue
-        reaches the low threshold. The default queueing discipline is
+        The PassiveInput automatically controls the connectors throttling state using a queueing
+        discipline. The standard queueing discipline is ThresholdQueueing, which throttles the
+        connection whenever the queue length reaches the high threshold and unthrottles the
+        connection when the queue reaches the low threshold. The default queueing discipline is
         <tt>ThresholdQueueing(1,0)</tt> which will throttle the input whenever the queue is
         non-empty.
      */
     class PassiveInput 
-        : public PassiveConnector, public InputConnector
+        : public PassiveConnector, public InputConnector,
+          public SafeBool<PassiveInput>
     {
     public:
         PassiveInput();
 
-        ActiveOutput & peer();
+        ActiveOutput & peer() const;
+
+        bool boolean_test() const;
 
         template <class QDisc>
         void qdisc(QDisc const & disc); ///< Change the queueing discipline
@@ -311,18 +331,21 @@ namespace connector {
     private:
         void v_enqueueEvent();
         void v_dequeueEvent();
+        void v_unthrottleEvent();
 
         boost::scoped_ptr<QueueingDiscipline> qdisc_;
-        QueueingDiscipline::State qstate_;
     };
 
     /** \brief Combination of PassiveConnector and OutputConnector
      */
     class PassiveOutput
-        : public PassiveConnector, public OutputConnector
+        : public PassiveConnector, public OutputConnector,
+          public SafeBool<PassiveOutput>
     {
     public:
-        ActiveInput & peer();
+        ActiveInput & peer() const;
+
+        bool boolean_test() const;
 
         void connect(ActiveInput & target);
 
@@ -332,10 +355,13 @@ namespace connector {
     /** \brief Combination of ActiveConnector and InputConnector
      */
     class ActiveInput
-        : public ActiveConnector, public InputConnector
+        : public ActiveConnector, public InputConnector,
+          public SafeBool<ActiveInput>
     {
     public:
-        PassiveOutput & peer();
+        PassiveOutput & peer() const;
+
+        bool boolean_test() const;
 
         void request();                 ///< request more packets without dequeuing any packet
 
@@ -346,10 +372,13 @@ namespace connector {
     /** \brief Combination of ActiveConnector and OutputConnector
      */
     class ActiveOutput
-        : public ActiveConnector, public OutputConnector
+        : public ActiveConnector, public OutputConnector,
+          public SafeBool<ActiveOutput>
     {
     public:
-        PassiveInput & peer();
+        PassiveInput & peer() const;
+
+        bool boolean_test() const;
 
         void connect(PassiveInput & target);
     };
index 694697b..80548f2 100644 (file)
@@ -37,8 +37,8 @@
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
-namespace debug = senf::ppi::module::debug;
 namespace ppi = senf::ppi;
+namespace debug = ppi::module::debug;
 
 // For each type of connector we use the corresponding debug module. Additionally, we always need
 // the corresponding connected module since otherwise the connectors cannot be connected anywhere
@@ -53,6 +53,7 @@ BOOST_AUTO_UNIT_TEST(connector)
     debug::PassivePacketSink target;
 
     ppi::connect(source.output,target.input);
+    ppi::init();
 
     BOOST_CHECK_EQUAL( & source.output.module(), & source );
     BOOST_CHECK_EQUAL( & target.input.module(), & target );
@@ -66,11 +67,11 @@ BOOST_AUTO_UNIT_TEST(passiveConnector)
     debug::PassivePacketSink target;
 
     ppi::connect(source.output,target.input);
+    ppi::init();
 
     // onRequest is implicitly tested within the PassivePacketSink implementation which is tested
     // in DebugModules.test.cc
 
-#if 0
     target.input.throttle();
     BOOST_CHECK( target.input.throttled() );
     BOOST_CHECK( target.input.nativeThrottled() );
@@ -78,9 +79,210 @@ BOOST_AUTO_UNIT_TEST(passiveConnector)
     target.input.unthrottle();
     BOOST_CHECK( ! target.input.throttled() );
     BOOST_CHECK( ! target.input.nativeThrottled() );
-#endif
+
+    BOOST_CHECK_EQUAL( & target.input.peer(), & source.output );
 }
 
+namespace {
+    
+    bool called = false;
+    
+    void handler() { called = true; }
+}
+
+BOOST_AUTO_UNIT_TEST(activeConnector)
+{
+    debug::ActivePacketSource source;
+    debug::PassivePacketSink target;
+
+    ppi::connect(source.output,target.input);
+    ppi::init();
+
+    source.output.onThrottle(handler);
+    BOOST_CHECK( ! called );
+    target.input.throttle();
+    BOOST_CHECK( called );
+    called = false;
+    target.input.unthrottle();
+    BOOST_CHECK( ! called );
+    source.output.onThrottle();
+    source.output.onUnthrottle(handler);
+    BOOST_CHECK( ! called );
+    target.input.throttle();
+    BOOST_CHECK( ! called );
+    target.input.unthrottle();
+    BOOST_CHECK( called );
+    source.output.onUnthrottle();
+    called = false;
+    BOOST_CHECK( ! called );
+    target.input.throttle();
+    target.input.unthrottle();
+    BOOST_CHECK( ! called );
+
+    BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+}
+
+BOOST_AUTO_UNIT_TEST(inputConnector)
+{
+    debug::ActivePacketSource source;
+    debug::PassivePacketSink target;
+
+    ppi::connect(source.output,target.input);
+    ppi::init();
+
+    // operator() is implicitly tested within the Active/PassivePacketSink implementation which is
+    // tested in DebugModules.test.cc
+
+    // peek() is implicitly tested within the Active/PassivePacketSink implementation
+
+    BOOST_CHECK_EQUAL ( & target.input.peer(), & source.output );
+    
+    BOOST_CHECK( target.input.begin() == target.input.end() );
+    BOOST_CHECK_EQUAL( target.input.queueSize(), 0u );
+    BOOST_CHECK( target.input.empty() );
+}
+
+BOOST_AUTO_UNIT_TEST(outputConnector)
+{
+    debug::ActivePacketSource source;
+    debug::PassivePacketSink target;
+
+    ppi::connect(source.output,target.input);
+    ppi::init();
+
+    // operator() is implicitly tested within the Active/PassivePacketSource implementation which is
+    // tested in DebugModules.test.cc
+
+    BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+}
+
+namespace {
+
+    class PassiveInputTest
+        : public ppi::module::Module
+    {
+    public:
+        ppi::connector::PassiveInput input;
+
+        PassiveInputTest() : counter() {
+            noroute(input);
+            input.onRequest(&PassiveInputTest::request);
+        }
+
+        void request() {
+            ++ counter;
+        }
+
+        unsigned counter;
+    };
+}
+
+BOOST_AUTO_UNIT_TEST(passiveInput)
+{
+    debug::ActivePacketSource source;
+    PassiveInputTest target;
+
+    ppi::connect(source.output,target.input);
+    ppi::init();
+
+    BOOST_CHECK_EQUAL( & target.input.peer(), & source.output );
+    
+    target.input.throttle();
+    senf::Packet p (senf::DataPacket::create());
+    source.submit(p);
+    
+    BOOST_CHECK_EQUAL( target.counter, 0u );
+    BOOST_CHECK( target.input );
+    BOOST_CHECK_EQUAL( target.input.queueSize(), 1u );
+    target.input.unthrottle();
+    BOOST_CHECK( target.input );
+    BOOST_CHECK_EQUAL( target.counter, 1u );
+    
+    BOOST_CHECK( target.input() == p );
+    BOOST_CHECK( ! target.input );
+    
+    source.submit(p);
+    
+    BOOST_CHECK_EQUAL( target.counter, 2u );
+    BOOST_CHECK( target.input.throttled() );
+    BOOST_CHECK( target.input() == p );
+    BOOST_CHECK( ! target.input.throttled() );
+
+    target.input.qdisc(ppi::ThresholdQueueing(2,0));
+
+    source.submit(p);
+    BOOST_CHECK ( ! target.input.throttled() );
+    source.submit(p);
+    BOOST_CHECK( target.input.throttled() );
+    target.input();
+    BOOST_CHECK( target.input.throttled() );
+    target.input();
+    BOOST_CHECK( ! target.input.throttled() );
+}
+
+BOOST_AUTO_UNIT_TEST(passiveOutput)
+{
+    debug::PassivePacketSource source;
+    debug::ActivePacketSink target;
+
+    ppi::connect(source.output,target.input);
+    ppi::init();
+
+    senf::Packet p (senf::DataPacket::create());
+    source.submit(p);
+
+    BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+
+    BOOST_CHECK( source.output );
+
+    source.submit(p);
+    BOOST_CHECK( target.request() == p );
+    
+    // connect() is tested indirectly via ppi::connect
+}
+
+BOOST_AUTO_UNIT_TEST(activeInput)
+{
+    debug::PassivePacketSource source;
+    debug::ActivePacketSink target;
+
+    ppi::connect(source.output,target.input);
+    ppi::init();
+
+    BOOST_CHECK_EQUAL( & target.input.peer(), & source.output );
+
+    BOOST_CHECK ( ! target.input );
+
+    senf::Packet p (senf::DataPacket::create());
+    source.submit(p);
+
+    BOOST_CHECK( target.input );
+    BOOST_CHECK( target.request() == p );
+
+    source.submit(p);
+    target.input.request();
+    BOOST_CHECK_EQUAL( target.input.queueSize(), 1u );
+    BOOST_CHECK( target.input );
+    BOOST_CHECK( target.request() == p );
+}
+
+BOOST_AUTO_UNIT_TEST(activeOutput)
+{
+    debug::ActivePacketSource source;
+    debug::PassivePacketSink target;
+
+    ppi::connect(source.output,target.input);
+    ppi::init();
+    
+    BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+    BOOST_CHECK( source.output );
+    target.input.throttle();
+    BOOST_CHECK( ! source.output );
+
+    // connect() is tested indirectly via ppi::connect
+}
+
+
 ///////////////////////////////cc.e////////////////////////////////////////
 #undef prefix_
 
index 6accafa..aa2a49e 100644 (file)
@@ -53,6 +53,7 @@ prefix_ senf::ppi::module::debug::PassivePacketSource::PassivePacketSource()
 prefix_ void senf::ppi::module::debug::PassivePacketSource::submit(Packet packet)
 {
     packets_.push_back(packet);
+    output.unthrottle();
 }
 
 prefix_ bool senf::ppi::module::debug::PassivePacketSource::empty()
@@ -71,8 +72,16 @@ senf::ppi::module::debug::PassivePacketSource::size()
 
 prefix_ void senf::ppi::module::debug::PassivePacketSource::request()
 {
+    BOOST_ASSERT( ! packets_.empty() );
     output(packets_.front());
     packets_.pop_front();
+    if (packets_.empty())
+        output.throttle();
+}
+
+prefix_ void senf::ppi::module::debug::PassivePacketSource::init()
+{
+    output.throttle();
 }
 
 ///////////////////////////////////////////////////////////////////////////
index 5545871..3ff2548 100644 (file)
@@ -69,6 +69,7 @@ namespace debug {
 
     private:
         void request();
+        void init();
         
         Queue packets_;
     };
index fd95c32..3f19899 100644 (file)
@@ -48,12 +48,16 @@ BOOST_AUTO_UNIT_TEST(debugModules)
         debug::PassivePacketSink sink;
 
         ppi::connect(source.output, sink.input);
-        
+        ppi::init();
+    
         senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u };
         senf::Packet p (senf::DataPacket::create(data));
 
+        BOOST_CHECK( ! sink.input.throttled() );
+
         source.submit(p);
-        
+
+        BOOST_CHECK( ! sink.input.throttled() );
         BOOST_CHECK_EQUAL( sink.size(), 1u );
         BOOST_CHECK( ! sink.empty() );
         BOOST_CHECK_EQUAL( 
@@ -73,6 +77,7 @@ BOOST_AUTO_UNIT_TEST(debugModules)
         debug::ActivePacketSink sink;
 
         ppi::connect(source.output, sink.input);
+        ppi::init();
 
         senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u };
         senf::Packet p (senf::DataPacket::create(data));
index 04ecc61..382ac57 100644 (file)
@@ -27,6 +27,7 @@
 #include "Route.hh"
 #include "Connectors.hh"
 #include "EventManager.hh"
+#include "ModuleManager.hh"
 
 #define prefix_ inline
 ///////////////////////////////cci.p///////////////////////////////////////
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::module::Module
 
-prefix_ void senf::ppi::module::Module::noroute(connector::Connector & connector)
+prefix_ senf::ppi::module::Module::~Module()
 {
-    registerConnector(connector);
-    connector.setModule(*this);
+    moduleManager().unregisterModule(*this);
 }
 
 prefix_ boost::posix_time::ptime senf::ppi::module::Module::eventTime()
@@ -49,16 +49,32 @@ prefix_ boost::posix_time::ptime senf::ppi::module::Module::eventTime()
 // protected members
 
 prefix_ senf::ppi::module::Module::Module()
-{}
+{
+    moduleManager().registerModule(*this);
+}
+
+prefix_ void senf::ppi::module::Module::noroute(connector::Connector & connector)
+{
+    registerConnector(connector);
+    connector.setModule(*this);
+}
 
 ////////////////////////////////////////
 // private members
 
+prefix_ void senf::ppi::module::Module::init()
+{}
+
 prefix_ senf::ppi::EventManager & senf::ppi::module::Module::eventManager()
 {
     return EventManager::instance();
 }
 
+prefix_ senf::ppi::ModuleManager & senf::ppi::module::Module::moduleManager()
+{
+    return ModuleManager::instance();
+}
+
 prefix_ void senf::ppi::module::Module::registerConnector(connector::Connector & connector)
 {
     connectorRegistry_.push_back(&connector);
index 1a3d22b..0002b05 100644 (file)
@@ -57,6 +57,9 @@ namespace module {
     class Module
         : boost::noncopyable
     {
+    public:
+        virtual ~Module();
+
     protected:
         Module();
 
@@ -122,7 +125,10 @@ namespace module {
                                               ///< event
 
     private:
+        virtual void init();
+
         EventManager & eventManager();
+        ModuleManager & moduleManager();
         
         void registerConnector(connector::Connector & connector);
         RouteBase & addRoute(std::auto_ptr<RouteBase> route);
@@ -135,6 +141,7 @@ namespace module {
 
         template <class Source, class Target>
         friend class detail::RouteHelper;
+        friend class senf::ppi::ModuleManager;
     };
 
     /** \brief Connect compatible connectors
diff --git a/PPI/ModuleManager.cc b/PPI/ModuleManager.cc
new file mode 100644 (file)
index 0000000..55c65d2
--- /dev/null
@@ -0,0 +1,67 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief ModuleManager non-inline non-template implementation */
+
+#include "ModuleManager.hh"
+//#include "ModuleManager.ih"
+
+// Custom includes
+#include "Scheduler/Scheduler.hh"
+#include "Module.hh"
+
+//#include "ModuleManager.mpp"
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ModuleManager
+
+prefix_ void senf::ppi::ModuleManager::init()
+{
+    ModuleRegistry::const_iterator i (moduleRegistry_.begin());
+    ModuleRegistry::const_iterator const i_end (moduleRegistry_.end());
+    for (; i != i_end; ++i)
+        (*i)->init();
+}
+
+prefix_ void senf::ppi::ModuleManager::run()
+{
+    init();
+    Scheduler::instance().process();
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+//#include "ModuleManager.mpp"
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/PPI/ModuleManager.cci b/PPI/ModuleManager.cci
new file mode 100644 (file)
index 0000000..f8b2a08
--- /dev/null
@@ -0,0 +1,65 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief ModuleManager inline non-template implementation */
+
+// Custom includes
+#include <algorithm>
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ModuleManager
+
+prefix_ senf::ppi::ModuleManager & senf::ppi::ModuleManager::instance()
+{
+    static ModuleManager manager;
+    return manager;
+}
+
+prefix_ void senf::ppi::ModuleManager::registerModule(module::Module & module)
+{
+    moduleRegistry_.push_back(&module);
+}
+
+prefix_ void senf::ppi::ModuleManager::unregisterModule(module::Module & module)
+{
+    moduleRegistry_.erase(
+        std::remove(moduleRegistry_.begin(), moduleRegistry_.end(), & module),
+        moduleRegistry_.end());
+}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/PPI/ModuleManager.hh b/PPI/ModuleManager.hh
new file mode 100644 (file)
index 0000000..abf7cbc
--- /dev/null
@@ -0,0 +1,92 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief ModuleManager public header */
+
+#ifndef HH_ModuleManager_
+#define HH_ModuleManager_ 1
+
+// Custom includes
+#include <vector>
+#include "predecl.hh"
+
+//#include "ModuleManager.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+
+    /** \brief
+      */
+    class ModuleManager
+    {
+    public:
+        ///////////////////////////////////////////////////////////////////////////
+        ///\name Structors and default members
+        ///@{
+
+        static ModuleManager & instance();
+
+        // default default constructor
+        // default copy constructor
+        // default copy assignment
+        // default destructor
+
+        // no conversion constructors
+
+        ///@}
+        ///////////////////////////////////////////////////////////////////////////
+
+        void registerModule(module::Module & module);
+        void unregisterModule(module::Module & module);
+        
+        void init();
+        void run();
+
+    protected:
+
+    private:
+        typedef std::vector<module::Module *> ModuleRegistry;
+
+        ModuleRegistry moduleRegistry_;
+    };
+
+
+}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+#include "ModuleManager.cci"
+//#include "ModuleManager.ct"
+//#include "ModuleManager.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
index c084f51..aa59e3d 100644 (file)
 //#include "Queueing.ih"
 
 // Custom includes
+#include "Connectors.hh"
 
 //#include "Queueing.mpp"
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::QueueingDiscipline
+// senf::ppi::ThresholdQueueing
 
-prefix_ senf::ppi::QueueingDiscipline::~QueueingDiscipline()
-{}
+prefix_ void senf::ppi::ThresholdQueueing::update(connector::PassiveInput & input, Event event)
+{
+    switch (event) {
+    case ENQUEUE:    
+        if (input.queueSize() >= high_)
+            input.throttle();
+        break;
+    case DEQUEUE:
+        if (input.queueSize() <= low_)
+            input.unthrottle();
+        break;
+    }
+}
 
 ///////////////////////////////cc.e////////////////////////////////////////
 #undef prefix_
diff --git a/PPI/Queueing.cci b/PPI/Queueing.cci
new file mode 100644 (file)
index 0000000..76eea26
--- /dev/null
@@ -0,0 +1,56 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief Queueing inline non-template implementation */
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingDiscipline
+
+prefix_ senf::ppi::QueueingDiscipline::~QueueingDiscipline()
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ThresholdQueueing
+
+prefix_ senf::ppi::ThresholdQueueing::ThresholdQueueing(unsigned high, unsigned low)
+    : high_(high), low_(low)
+{}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
index 7b7421e..59e8e67 100644 (file)
@@ -43,6 +43,12 @@ namespace ppi {
         operating system by sending throttling events. The PPI will never loose a packet internally
         (if not a module explicitly does so), however it may disable reception of new incoming
         packets which will then probably be dropped by the operating system.
+
+        \attention Notifications may be forwarded to the QueueingDiscipline implementation
+            out-of-order: A dequeue event may be notified before the corresponding enqueue
+            event (this happens to optimize away transient throttling state changes which would
+            otherwise occur if a packet is entered into the queue and then removed from it in the
+            same processing step).
      */
     class QueueingDiscipline
     {
@@ -50,22 +56,35 @@ namespace ppi {
         virtual ~QueueingDiscipline();
 
         enum Event { ENQUEUE, DEQUEUE }; ///< Possible queueing events
-        enum State { THROTTLED, UNTHROTTLED }; ///< Possible queueing states
         
-        virtual State update(connector::PassiveInput & input, Event event) = 0;
+        virtual void update(connector::PassiveInput & input, Event event) = 0;
                                         ///< Calculate new queueing state
                                         /**< Whenever the queue is manipulated, this member is
-                                             called to calculate the new throttling state.
+                                             called to calculate the new throttling state. The
+                                             member must call \a input's \c throttle() or \c
+                                             unthrottle() member to set the new throttling state.
                                              
                                              \param[in] input Connector holding the queue
-                                             \param[in] event Type of event triggering the update
-                                             \returns new throttling state */
+                                             \param[in] event Type of event triggering the update */
+    };
+
+    class ThresholdQueueing
+        : public QueueingDiscipline
+    {
+    public:
+        ThresholdQueueing(unsigned high, unsigned low);
+
+        virtual void update(connector::PassiveInput & input, Event event);
+
+    private:
+        unsigned high_;
+        unsigned low_;
     };
 
 }}
 
 ///////////////////////////////hh.e////////////////////////////////////////
-//#include "Queueing.cci"
+#include "Queueing.cci"
 //#include "Queueing.ct"
 //#include "Queueing.cti"
 #endif
index 100b09c..01c6f3a 100644 (file)
@@ -24,6 +24,8 @@
     \brief Route inline non-template implementation */
 
 // Custom includes
+#include "Connectors.hh"
+#include "Events.hh"
 
 #define prefix_ inline
 ///////////////////////////////cci.p///////////////////////////////////////
@@ -31,6 +33,9 @@
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::RouteBase
 
+prefix_ senf::ppi::RouteBase::~RouteBase()
+{}
+
 ////////////////////////////////////////
 // protected members
 
@@ -38,6 +43,202 @@ prefix_ senf::ppi::RouteBase::RouteBase(module::Module & module)
     : module_(&module)
 {}
 
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ForwardingRoute
+
+prefix_ bool senf::ppi::ForwardingRoute::autoThrottling()
+{
+    return autoThrottling_;
+}
+
+prefix_ void senf::ppi::ForwardingRoute::autoThrottling(bool state)
+{
+    autoThrottling_ = state;
+}
+
+////////////////////////////////////////
+// protected members
+
+prefix_ senf::ppi::ForwardingRoute::ForwardingRoute(module::Module & module)
+    : RouteBase(module), autoThrottling_(false)
+{}
+
+prefix_ void senf::ppi::ForwardingRoute::registerRoute(connector::ActiveConnector & connector)
+{
+    connector.registerRoute(*this);
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::ForwardingRoute::notifyThrottle()
+{
+    v_notifyThrottle();
+}
+
+prefix_ void senf::ppi::ForwardingRoute::notifyUnthrottle()
+{
+    v_notifyUnthrottle();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::NonForwardingRouteImplementation
+
+prefix_ senf::ppi::detail::NonForwardingRouteImplementation::
+NonForwardingRouteImplementation(module::Module & module, connector::InputConnector & source,
+                                 connector::OutputConnector & target)
+    : RouteBase(module), source_(&source), target_(&target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::NonForwardingRouteToEventImplementation
+
+prefix_
+senf::ppi::detail::NonForwardingRouteToEventImplementation::
+NonForwardingRouteToEventImplementation(module::Module & module,
+                                        connector::InputConnector & source,
+                                        EventDescriptor & target)
+    : RouteBase(module), source_(&source), target_(&target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::NonForwardingRouteFromEventImplementation
+
+prefix_
+senf::ppi::detail::NonForwardingRouteFromEventImplementation::
+NonForwardingRouteFromEventImplementation(module::Module & module, EventDescriptor & source,
+                                          connector::OutputConnector & target)
+    : RouteBase(module), source_(&source), target_(&target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::ForwardForwardingRouteImplementation
+
+prefix_
+senf::ppi::detail::ForwardForwardingRouteImplementation::
+ForwardForwardingRouteImplementation(module::Module & module, connector::ActiveInput & source,
+                                     connector::PassiveOutput & target)
+    : ForwardingRoute(module), source_(&source), target_(&target)
+{
+    registerRoute(*source_);
+}
+
+prefix_ void senf::ppi::detail::ForwardForwardingRouteImplementation::v_notifyThrottle()
+{
+    if (autoThrottling())
+        target_->notifyThrottle();
+}
+
+prefix_ void senf::ppi::detail::ForwardForwardingRouteImplementation::v_notifyUnthrottle()
+{
+    if (autoThrottling())
+        target_->notifyUnthrottle();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::BackwardForwardingRouteImplementation
+
+prefix_
+senf::ppi::detail::BackwardForwardingRouteImplementation::
+BackwardForwardingRouteImplementation(module::Module & module,
+                                      connector::PassiveInput & source,
+                                      connector::ActiveOutput & target)
+    : ForwardingRoute(module), source_(&source), target_(&target)
+{
+    registerRoute(*target_);
+}
+
+prefix_ void senf::ppi::detail::BackwardForwardingRouteImplementation::v_notifyThrottle()
+{ 
+    if (autoThrottling())
+        source_->notifyThrottle();
+}
+
+prefix_ void senf::ppi::detail::BackwardForwardingRouteImplementation::v_notifyUnthrottle()
+{
+    if (autoThrottling())
+        source_->notifyUnthrottle();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::ForwardForwardingRouteToEventImplementation
+
+prefix_
+senf::ppi::detail::ForwardForwardingRouteToEventImplementation::
+ForwardForwardingRouteToEventImplementation(module::Module & module,
+                                            connector::ActiveInput & source,
+                                            EventDescriptor & target)
+    : ForwardingRoute(module), source_(&source), target_(&target)
+{
+    registerRoute(*source_);
+}
+
+prefix_ void senf::ppi::detail::ForwardForwardingRouteToEventImplementation::v_notifyThrottle()
+{
+    if (autoThrottling())
+        target_->enabled(false);
+}
+
+prefix_ void
+senf::ppi::detail::ForwardForwardingRouteToEventImplementation::v_notifyUnthrottle()
+{
+    if (autoThrottling())
+        target_->enabled(true);
+}
+
+///////////////////////////////////////////////////////////////////////////
+//senf::ppi::detail::BackwardForwardingRouteFromEventImplementation
+
+prefix_
+senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::
+BackwardForwardingRouteFromEventImplementation(module::Module & module,
+                                               EventDescriptor & source,
+                                               connector::ActiveOutput & target)
+    : ForwardingRoute(module), source_(&source), target_(&target)
+{
+    registerRoute(*target_);
+}
+
+prefix_ void
+senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::v_notifyThrottle()
+{
+    if (autoThrottling())
+        source_->enabled(false);
+}
+
+prefix_ void
+senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::v_notifyUnthrottle()
+{ 
+    if (autoThrottling())
+        source_->enabled(true);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<connector::ActiveInput, connector::PassiveOutput, 
+//                                        false, false>
+
+prefix_
+senf::ppi::detail::RouteImplementation<senf::ppi::connector::ActiveInput, 
+                                       senf::ppi::connector::PassiveOutput, 
+                                       false, false>::
+RouteImplementation(module::Module & module, connector::ActiveInput & source,
+                    connector::PassiveOutput & target)
+    : ForwardForwardingRouteImplementation(module, source, target)
+{}
+
+////////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<connector::PassiveInput, connector::ActiveOutput, 
+//                                        false, false>
+
+prefix_
+senf::ppi::detail::RouteImplementation<senf::ppi::connector::PassiveInput, 
+                                       senf::ppi::connector::ActiveOutput, 
+                                       false, false>::
+RouteImplementation(module::Module & module, connector::PassiveInput & source,
+                    connector::ActiveOutput & target)
+    : BackwardForwardingRouteImplementation(module, source, target)
+{}
+
 ///////////////////////////////cci.e///////////////////////////////////////
 #undef prefix_
 
index 3d0f465..015ea90 100644 (file)
 ///////////////////////////////cti.p///////////////////////////////////////
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::Route<Source,Target>
+// senf::ppi::detail::RouteImplementation<Source,Target,srcEvent,trgEvent>
 
-////////////////////////////////////////
-// protected members
+template <class Source, class Target, bool srcEvent, bool trgEvent>
+prefix_
+senf::ppi::detail::RouteImplementation<Source,Target,srcEvent,trgEvent>::
+RouteImplementation(module::Module & module, Source & source, Target & target)
+    : NonForwardingRouteImplementation(module, source, target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<Source, Target, true, false>
 
 template <class Source, class Target>
-prefix_ senf::ppi::Route<Source,Target>::Route(module::Module & module, Source & source,
-                                               Target & target)
-    : Implementation(module, source, target)
+prefix_
+senf::ppi::detail::RouteImplementation<Source, Target, true, false>::
+RouteImplementation(module::Module & module, Source & source, Target & target)
+    : NonForwardingRouteFromEventImplementation(module, source, target)
 {}
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::detail::RouteImplementation<Source,Target,srcEvent,trgEvent>
+// senf::ppi::detail::RouteImplementation<Source, Target, false, true>
 
-////////////////////////////////////////
-// protected members
-
-template <bool srcEvent, bool trgEvent>
-prefix_ senf::ppi::detail::RouteImplementation<srcEvent,trgEvent>::
-RouteImplementation(module::Module & module, connector::InputConnector & source,
-                    connector::OutputConnector & target)
-    : RouteBase(module), source_(&source), target_(&target)
+template <class Source, class Target>
+prefix_
+senf::ppi::detail::RouteImplementation<Source, Target, false, true>::
+RouteImplementation(module::Module & module, Source & source, Target & target)
+    : NonForwardingRouteToEventImplementation(module, source, target)
 {}
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::detail::RouteImplementation<true,false>
+// senf::ppi::detail::RouteImplementation<connector::ActiveInput, Event, false, true>
 
-////////////////////////////////////////
-// protected members
+template <class Event>
+prefix_
+senf::ppi::detail::RouteImplementation<senf::ppi::connector::ActiveInput, Event, false, true>::
+RouteImplementation(module::Module & module, connector::ActiveInput & source, Event & target)
+    : ForwardForwardingRouteToEventImplementation(module, source, target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<Event, connector::ActiveOutput, true, false>
 
-prefix_ senf::ppi::detail::RouteImplementation<true,false>::
-RouteImplementation(module::Module & module, EventDescriptor & source,
-                    connector::OutputConnector & target)
-    : RouteBase(module), source_(&source), target_(&target)
+template <class Event>
+prefix_
+senf::ppi::detail::RouteImplementation<Event, senf::ppi::connector::ActiveOutput, true, false>::
+RouteImplementation(module::Module & module, Event & source, connector::ActiveOutput & target)
+    : BackwardForwardingRouteFromEventImplementation(module, source, target)
 {}
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::detail::RouteImplementation<false,true>
+// senf::ppi::Route<Source,Target>
 
 ////////////////////////////////////////
 // protected members
 
-prefix_ senf::ppi::detail::RouteImplementation<false,true>::
-RouteImplementation(module::Module & module, connector::InputConnector & source,
-                    EventDescriptor & target)
-: RouteBase(module), source_(&source), target_(&target)
+template <class Source, class Target>
+prefix_ senf::ppi::Route<Source,Target>::Route(module::Module & module, Source & source,
+                                               Target & target)
+    : Implementation(module, source, target)
 {}
 
 ///////////////////////////////cti.e///////////////////////////////////////
index 54c4139..e3eee1c 100644 (file)
@@ -37,6 +37,20 @@ namespace ppi {
     class RouteBase
     {
     public:
+        virtual ~RouteBase();
+
+    protected:
+        RouteBase(module::Module & module);
+
+    private:
+        module::Module * module_;
+    };
+
+    class ForwardingRoute
+        : public RouteBase
+    {
+    public:
+        bool autoThrottling();
         void autoThrottling(bool state); ///< Change automatic throttle notification forwarding
                                         /**< By default, throttle notifications are automatically
                                              forwarded from active to passive connectors. This may
@@ -50,23 +64,31 @@ namespace ppi {
                                              disable the event whenever a throttling notification
                                              comes in. Respective for unthrottle notifications.
 
-                                             \param[in] state New throttle forwarding state 
-
-                                             \implementation This class will be implemented using a
-                                                 baseclass, this template and several
-                                                 specializations. However, this is an implementation
-                                                 detail which does not affect the exposed
-                                                 interface. */
-
+                                             \param[in] state New throttle forwarding state */
+        
     protected:
-        RouteBase(module::Module & module);
+        ForwardingRoute(module::Module & module);
+
+        // Called to register this route with the connectors forwarding information base
+        void registerRoute(connector::ActiveConnector & connector);
 
     private:
-        module::Module * module_;
+        // called to forward a throttling notification along the route
+        void notifyThrottle();
+        void notifyUnthrottle();
+
+        // Implemented in the derived classes to forward throttling notifications
+        virtual void v_notifyThrottle() = 0;
+        virtual void v_notifyUnthrottle() = 0;
+
+        bool autoThrottling_;
+
+        friend class connector::ActiveConnector;
     };
 
 }}
 
+// We need detail::RouteImplementation here ...
 #include "Route.ih"
 
 namespace senf {
@@ -79,13 +101,10 @@ namespace ppi {
      */
     template <class Source, class Target>
     class Route
-        : public detail::RouteImplementation< boost::is_base_of<EventDescriptor,Source>::value,
-                                              boost::is_base_of<EventDescriptor,Target>::value >
+        : public detail::RouteImplementation<Source,Target>
     {
     private:
-        typedef detail::RouteImplementation< 
-            boost::is_base_of<EventDescriptor,Source>::value,
-            boost::is_base_of<EventDescriptor,Target>::value > Implementation;
+        typedef detail::RouteImplementation<Source,Target> Implementation;
         
         Route(module::Module & module, Source & source, Target & target);
 
index 143a7a4..1e76429 100644 (file)
@@ -34,50 +34,183 @@ namespace senf {
 namespace ppi {
 namespace detail {
 
-    template <bool srcEvent, bool trgEvent>
-    class RouteImplementation
+    // Valid Forwarding routes:
+    //   Forward throttling
+    //     ActiveInput -> PassiveOutput
+    //       tempalte<> RouteImplementation<ActiveInput,PassiveOutput,false,false>
+    //     ActiveInput -> Event
+    //       template<class Event> class RouteImplementation<ActiveInput, Event, false, true>
+    //   Backward throttling
+    //     PassiveInput -> ActiveOutput
+    //       template<> RouteImplementation<PassiveInput, ActiveOutput, false, false>
+    //     Event -> ActiveOutput
+    //       template<class Event> class RouteImplementation<Event, ActiveOutput, true, false>
+
+    class NonForwardingRouteImplementation
         : public RouteBase
     {
     protected:
-        RouteImplementation(module::Module & module, 
-                            connector::InputConnector & source, 
-                            connector::OutputConnector & target);
+        NonForwardingRouteImplementation(module::Module & module, 
+                                         connector::InputConnector & source,
+                                         connector::OutputConnector & target);
 
     private:
         connector::InputConnector * source_;
         connector::OutputConnector * target_;
     };
 
-#   ifndef DOXYGEN
+    class NonForwardingRouteToEventImplementation
+        : public RouteBase
+    {
+    protected:
+        NonForwardingRouteToEventImplementation(module::Module & module,
+                                                connector::InputConnector & source,
+                                                EventDescriptor & target);
+        
+    private:
+        connector::InputConnector * source_;
+        EventDescriptor * target_;
+    };
 
-    template <>
-    class RouteImplementation<true,false>
+    class NonForwardingRouteFromEventImplementation
         : public RouteBase
     {
     protected:
-        RouteImplementation(module::Module & module, 
-                            EventDescriptor & source, 
-                            connector::OutputConnector & target);
+        NonForwardingRouteFromEventImplementation(module::Module & module,
+                                                  EventDescriptor & source,
+                                                  connector::OutputConnector & target);
 
     private:
         EventDescriptor * source_;
         connector::OutputConnector * target_;
     };
 
-    template<>
-    class RouteImplementation<false,true>
-        : public RouteBase
+    class ForwardForwardingRouteImplementation
+        : public ForwardingRoute
     {
     protected:
-        RouteImplementation(module::Module & module, 
-                            connector::InputConnector & source,
-                            EventDescriptor & target);
+        ForwardForwardingRouteImplementation(module::Module & module,
+                                             connector::ActiveInput & source,
+                                             connector::PassiveOutput & target);
 
     private:
-        connector::InputConnector * source_;
+        virtual void v_notifyThrottle();
+        virtual void v_notifyUnthrottle();
+
+        connector::ActiveInput * source_;
+        connector::PassiveOutput * target_;
+    };
+
+    class BackwardForwardingRouteImplementation
+        : public ForwardingRoute
+    {
+    protected:
+        BackwardForwardingRouteImplementation(module::Module & module,
+                                              connector::PassiveInput & source,
+                                              connector::ActiveOutput & target);
+
+    private:
+        virtual void v_notifyThrottle();
+        virtual void v_notifyUnthrottle();
+        
+        connector::PassiveInput * source_;
+        connector::ActiveOutput * target_;
+    };
+
+    class ForwardForwardingRouteToEventImplementation
+        : public ForwardingRoute
+    {
+    protected:
+        ForwardForwardingRouteToEventImplementation(module::Module & module,
+                                                    connector::ActiveInput & source,
+                                                    EventDescriptor & target);
+
+    private:
+        virtual void v_notifyThrottle();
+        virtual void v_notifyUnthrottle();
+
+        connector::ActiveInput * source_;
         EventDescriptor * target_;
     };
 
+    class BackwardForwardingRouteFromEventImplementation
+        : public ForwardingRoute
+    {
+    protected:
+        BackwardForwardingRouteFromEventImplementation(module::Module & module,
+                                                       EventDescriptor & source,
+                                                       connector::ActiveOutput & target); 
+
+    private:
+        virtual void v_notifyThrottle();
+        virtual void v_notifyUnthrottle();
+
+        EventDescriptor * source_;
+        connector::ActiveOutput * target_;
+    };
+
+    template <class Source, class Target, bool srcEvent, bool trgEvent>
+    class RouteImplementation
+        : public NonForwardingRouteImplementation
+    {
+    protected:
+        RouteImplementation(module::Module & module, Source & source, Target & target);
+    };
+
+#   ifndef DOXYGEN
+
+    template <class Source, class Target>
+    class RouteImplementation<Source, Target, true, false>
+        : public NonForwardingRouteFromEventImplementation
+    {
+    protected:
+        RouteImplementation(module::Module & module, Source & source, Target & target);
+    };
+
+    template<class Source, class Target>
+    class RouteImplementation<Source, Target, false, true>
+        : public NonForwardingRouteToEventImplementation
+    {
+    protected:
+        RouteImplementation(module::Module & module, Source & source, Target & target);
+    };
+
+    template<>
+    class RouteImplementation<connector::ActiveInput, connector::PassiveOutput, false, false>
+        : public ForwardForwardingRouteImplementation
+    {
+    protected:
+        RouteImplementation(module::Module & module, connector::ActiveInput & source, 
+                            connector::PassiveOutput & target);
+    };
+
+    template <class Event>
+    class RouteImplementation<connector::ActiveInput, Event, false, true>
+        : public ForwardForwardingRouteToEventImplementation
+    {
+    protected:
+        RouteImplementation(module::Module & module, connector::ActiveInput & source, 
+                            Event & target);
+    };
+
+    template <>
+    class RouteImplementation<connector::PassiveInput, connector::ActiveOutput, false, false>
+        : public BackwardForwardingRouteImplementation
+    {
+    protected:
+        RouteImplementation(module::Module & module, connector::PassiveInput & source, 
+                            connector::ActiveOutput & target);
+    };
+    
+    template <class Event>
+    class RouteImplementation<Event, connector::ActiveOutput, true, false>
+        : public BackwardForwardingRouteFromEventImplementation
+    {
+    protected:
+        RouteImplementation(module::Module & module, Event & source, 
+                            connector::ActiveOutput & target);
+    };
+
 #   endif
 
 }}}
index f14f33b..01c3168 100644 (file)
@@ -25,7 +25,7 @@
 
 // Custom includes
 #include "Connectors.hh"
-#include "Scheduler/Scheduler.hh"
+#include "ModuleManager.hh"
 
 #define prefix_ inline
 ///////////////////////////////cci.p///////////////////////////////////////
@@ -44,7 +44,12 @@ prefix_ void senf::ppi::connect(connector::PassiveOutput & source,
 
 prefix_ void senf::ppi::run()
 {
-    Scheduler::instance().process();
+    ModuleManager::instance().run();
+}
+
+prefix_ void senf::ppi::init()
+{
+    ModuleManager::instance().init();
 }
 
 ///////////////////////////////cci.e///////////////////////////////////////
index 5f0613a..2920e1e 100644 (file)
@@ -39,6 +39,7 @@ namespace ppi {
     void connect(connector::PassiveOutput & source, connector::ActiveInput & target);
 
     void run();
+    void init();
 
 }}
 
index 92dd000..be9c053 100644 (file)
@@ -39,14 +39,26 @@ namespace ppi {
     template <class EventType=void> class EventImplementation;
     class EventManager;
     class RouteBase;
+    class ForwardingRoute;
     template <class Source, class Target> class Route;
     class QueueingDiscipline;
+    class ModuleManager;
 
     namespace detail {
         class EventBindingBase;
         template <class EvImpl> class EventBinding;
         template <class EventType> struct EventArgType;
-        template <bool srcEvent, bool trgEvent> class RouteImplementation;
+        class NonForwardingRouteImplementation;
+        class NonForwardingRouteToEventImplementation;
+        class NonForwardingRouteFromEventImplementation;
+        class ForwardForwardingRouteImplementation;
+        class BackwardForwardingRouteImplementation;
+        class ForwardForwardingRouteToEventImplementation;
+        class BackwardForwardingRouteFromEventImplementation;
+        template <class Source, class Target, 
+                  bool srcEvent = boost::is_base_of<EventDescriptor,Source>::value,
+                  bool trgEvent = boost::is_base_of<EventDescriptor,Target>::value>
+            class RouteImplementation;
     }
 
     namespace module {