From: g0dil Date: Mon, 13 Aug 2007 15:38:11 +0000 (+0000) Subject: PPI: Complete connector implementation X-Git-Url: http://g0dil.de/git?a=commitdiff_plain;h=f539f4271d470794a773a92bacd8ba086c9bc1cd;p=senf.git PPI: Complete connector implementation PPI: Implement throttling PPI: Implement ModuleManager and ppi::init PPI: Implement ThresholdQueuing PPI: Hugely extended routing classes git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@390 270642c3-0616-0410-b53a-bc976706d245 --- diff --git a/PPI/Connectors.cc b/PPI/Connectors.cc index 8b02972..7fe75a0 100644 --- a/PPI/Connectors.cc +++ b/PPI/Connectors.cc @@ -27,26 +27,65 @@ //#include "Connectors.ih" // Custom includes +#include "Route.hh" //#include "Connectors.mpp" #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// /////////////////////////////////////////////////////////////////////////// -// protected members +// senf::ppi::connector::PassiveConnector -prefix_ senf::ppi::connector::Connector::~Connector() -{} +//////////////////////////////////////// +// private members + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveConnector + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle() +{ + if (throttleCallback_) + throttleCallback_(); + NotifyRoutes::const_iterator i (notifyRoutes_.begin()); + NotifyRoutes::const_iterator const i_end (notifyRoutes_.end()); + for (; i != i_end; ++i) + (*i)->notifyThrottle(); +} -prefix_ void senf::ppi::connector::Connector::connect(Connector & target) +prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle() { - peer_ = & target; - target.peer_ = this; + if (unthrottleCallback_) + unthrottleCallback_(); + NotifyRoutes::const_iterator i (notifyRoutes_.begin()); + NotifyRoutes::const_iterator const i_end (notifyRoutes_.end()); + for (; i != i_end; ++i) + (*i)->notifyUnthrottle(); +} + +prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route) +{ + notifyRoutes_.push_back(&route); } /////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::InputConnector +prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()() +{ + if (empty()) + v_requestEvent(); + Packet p; + if (! empty()) { + p = peek(); + queue_.pop_back(); + v_dequeueEvent(); + } + return p; +} + //////////////////////////////////////// // private members @@ -78,21 +117,25 @@ prefix_ void senf::ppi::connector::ActiveInput::v_requestEvent() prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent() { - ///\fixme Emit notifications when qstate_ changes - if (qdisc_) - qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE); - else - qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED; emit(); + qdisc_->update(*this, QueueingDiscipline::ENQUEUE); } prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent() { - ///\fixme Emit notifications when qstate_ changes - if (qdisc_) - qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE); - else - qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED; + qdisc_->update(*this, QueueingDiscipline::DEQUEUE); +} + +prefix_ void senf::ppi::connector::PassiveInput::v_unthrottleEvent() +{ + size_type n (queueSize()); + while (n) { + emit(); + size_type nn (queueSize()); + if (n == nn) + break; + n = nn; + } } ///////////////////////////////cc.e//////////////////////////////////////// diff --git a/PPI/Connectors.cci b/PPI/Connectors.cci index 4b51a58..3862aab 100644 --- a/PPI/Connectors.cci +++ b/PPI/Connectors.cci @@ -32,12 +32,14 @@ // senf::ppi::connector::Connector prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer() + const { BOOST_ASSERT(peer_); return *peer_; } prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module() + const { BOOST_ASSERT(module_); return *module_; @@ -50,6 +52,15 @@ prefix_ senf::ppi::connector::Connector::Connector() : peer_(), module_() {} +prefix_ senf::ppi::connector::Connector::~Connector() +{} + +prefix_ void senf::ppi::connector::Connector::connect(Connector & target) +{ + peer_ = & target; + target.peer_ = this; +} + //////////////////////////////////////// // private members @@ -59,18 +70,39 @@ prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module) } /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::ActiveConnector +// senf::ppi::connector::PassiveConnector -//////////////////////////////////////// -// protected members +prefix_ bool senf::ppi::connector::PassiveConnector::throttled() + const +{ + return nativeThrottled_ || remoteThrottled_; +} -prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector() -{} +prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled() + const +{ + return nativeThrottled_; +} -/////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::PassiveConnector +prefix_ void senf::ppi::connector::PassiveConnector::throttle() +{ + if (!throttled()) + emitThrottle(); + nativeThrottled_ = true; +} + +prefix_ void senf::ppi::connector::PassiveConnector::unthrottle() +{ + if (throttled() && ! remoteThrottled_) { + nativeThrottled_ = false; + emitUnthrottle(); + } else + nativeThrottled_ = false; + +} prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer() + const { return dynamic_cast(Connector::peer()); } @@ -79,51 +111,104 @@ prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveCon // protected members prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector() - : callback_() + : callback_(), remoteThrottled_(), nativeThrottled_() {} prefix_ void senf::ppi::connector::PassiveConnector::emit() { BOOST_ASSERT(callback_); - callback_(); + if (!throttled()) + callback_(); +} + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle() +{ + if (!throttled()) { + remoteThrottled_ = true; + emitThrottle(); + } + else + remoteThrottled_ = true; +} + +prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle() +{ + if (throttled() && !nativeThrottled_) { + remoteThrottled_ = false; + emitUnthrottle(); + } + else + remoteThrottled_ = false; +} + +prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle() +{ + peer().notifyThrottle(); } +prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle() +{ + peer().notifyUnthrottle(); + v_unthrottleEvent(); +} + +prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent() +{} + /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::InputConnector +// senf::ppi::connector::ActiveConnector -prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()() +prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer() + const { - v_requestEvent(); - Packet p (peek()); - queue_.pop_back(); - v_dequeueEvent(); - return p; + return dynamic_cast(Connector::peer()); } -prefix_ bool senf::ppi::connector::InputConnector::boolean_test() +prefix_ void senf::ppi::connector::ActiveConnector::onThrottle() { - ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?) - return ! empty(); + throttleCallback_ = Callback(); } +prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle() +{ + unthrottleCallback_ = Callback(); +} + +//////////////////////////////////////// +// protected members + +prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector() + : throttleCallback_(), unthrottleCallback_(), notifyRoutes_() +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::InputConnector + prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer() + const { return dynamic_cast(Connector::peer()); } prefix_ senf::ppi::connector::InputConnector::queue_iterator senf::ppi::connector::InputConnector::begin() + const { return queue_.begin(); } prefix_ senf::ppi::connector::InputConnector::queue_iterator senf::ppi::connector::InputConnector::end() + const { return queue_.end(); } prefix_ senf::Packet senf::ppi::connector::InputConnector::peek() + const { BOOST_ASSERT( ! queue_.empty() ); return queue_.back(); @@ -131,11 +216,13 @@ prefix_ senf::Packet senf::ppi::connector::InputConnector::peek() prefix_ senf::ppi::connector::InputConnector::size_type senf::ppi::connector::InputConnector::queueSize() + const { return queue_.size(); } prefix_ bool senf::ppi::connector::InputConnector::empty() + const { return queue_.empty(); } @@ -164,6 +251,7 @@ prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet p) } prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer() + const { return dynamic_cast(Connector::peer()); } @@ -178,22 +266,36 @@ prefix_ senf::ppi::connector::OutputConnector::OutputConnector() // senf::ppi::connector::PassiveInput prefix_ senf::ppi::connector::PassiveInput::PassiveInput() - : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED) + : qdisc_(new ThresholdQueueing(1,0)) {} prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer() + const { return dynamic_cast(Connector::peer()); } +prefix_ bool senf::ppi::connector::PassiveInput::boolean_test() + const +{ + return ! empty(); +} + /////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::PassiveOutput prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer() + const { return dynamic_cast(Connector::peer()); } +prefix_ bool senf::ppi::connector::PassiveOutput::boolean_test() + const +{ + return true; +} + prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target) { Connector::connect(target); @@ -203,10 +305,17 @@ prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target) // senf::ppi::connector::ActiveInput prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer() + const { return dynamic_cast(Connector::peer()); } +prefix_ bool senf::ppi::connector::ActiveInput::boolean_test() + const +{ + return ! empty() || ! peer().throttled(); +} + prefix_ void senf::ppi::connector::ActiveInput::request() { peer().emit(); @@ -216,10 +325,17 @@ prefix_ void senf::ppi::connector::ActiveInput::request() // senf::ppi::connector::ActiveOutput prefix_ senf::ppi::connector::PassiveInput & senf::ppi::connector::ActiveOutput::peer() + const { return dynamic_cast(Connector::peer()); } +prefix_ bool senf::ppi::connector::ActiveOutput::boolean_test() + const +{ + return ! peer().throttled(); +} + prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target) { Connector::connect(target); diff --git a/PPI/Connectors.cti b/PPI/Connectors.cti index 9c77368..992bd8c 100644 --- a/PPI/Connectors.cti +++ b/PPI/Connectors.cti @@ -40,12 +40,27 @@ prefix_ void senf::ppi::connector::PassiveConnector::onRequest(Handler handler) } /////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveConnector + +template +prefix_ void senf::ppi::connector::ActiveConnector::onThrottle(Handler handler) +{ + throttleCallback_ = detail::Callback<>::make(handler, module()); +} + +template +prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle(Handler handler) +{ + unthrottleCallback_ = detail::Callback<>::make(handler, module()); +} + +/////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::PassiveInput template prefix_ void senf::ppi::connector::PassiveInput::qdisc(QDisc const & disc) { - qdisc_ = boost::scoped_ptr(new QDisc(disc)); + qdisc_.reset(new QDisc(disc)); } ///////////////////////////////cti.e/////////////////////////////////////// diff --git a/PPI/Connectors.hh b/PPI/Connectors.hh index 6ca4aa5..6e01ca3 100644 --- a/PPI/Connectors.hh +++ b/PPI/Connectors.hh @@ -67,8 +67,8 @@ namespace connector { : boost::noncopyable { public: - Connector & peer(); ///< Get peer connected to this connector - module::Module & module(); ///< Get this connectors containing module + Connector & peer() const; ///< Get peer connected to this connector + module::Module & module() const; ///< Get this connectors containing module protected: Connector(); @@ -119,13 +119,13 @@ namespace connector { operation is to be performed. */ - bool throttled(); ///< Get accumulative throttling state - bool nativeThrottled(); ///< Get native throttling state + bool throttled() const; ///< Get accumulative throttling state + bool nativeThrottled() const; ///< Get native throttling state void throttle(); ///< Set native throttling void unthrottle(); ///< Revoke native throttling - ActiveConnector & peer(); + ActiveConnector & peer() const; protected: PassiveConnector(); @@ -133,13 +133,25 @@ namespace connector { void emit(); private: + // Called by the routing to change the remote throttling state void notifyThrottle(); ///< Forward a throttling notification to this connector void notifyUnthrottle(); ///< Forward an unthrottling notification to this connector + // Internal members to emit throttling notifications + void emitThrottle(); + void emitUnthrottle(); + + // Called after unthrottling the connector + virtual void v_unthrottleEvent(); + typedef detail::Callback<>::type Callback; Callback callback_; - - friend class ActiveConnector; + + bool remoteThrottled_; + bool nativeThrottled_; + + friend class senf::ppi::detail::ForwardForwardingRouteImplementation; + friend class senf::ppi::detail::BackwardForwardingRouteImplementation; }; /** \brief Active connector baseclass @@ -155,9 +167,10 @@ namespace connector { class ActiveConnector : public virtual Connector { + typedef detail::Callback<>::type Callback; public: template - void onThrottle(Handler handle); ///< Register throttle notification handler + void onThrottle(Handler handler); ///< Register throttle notification handler /**< The handler register here will be called, whenever a throttle notification comes in. The \a handler argument is either an arbitrary callable object or it is a @@ -165,11 +178,12 @@ namespace connector { this input. In the second case, the pointer will automatically be bound to the containing instance. - \param[in] handle Handler to call on throttle + \param[in] handler Handler to call on throttle notifications. */ + void onThrottle(); template - void onUnthrottle(Handler handle); ///< Register unthrottle notification handler + void onUnthrottle(Handler handler); ///< Register unthrottle notification handler /**< The handler register here will be called, whenever an unthrottle notification comes in. The \a handler argument is either an arbitrary callable object or it @@ -179,11 +193,29 @@ namespace connector { \param[in] handle Handler to call on unthrottle notifications. */ + void onUnthrottle(); - PassiveConnector & peer(); + PassiveConnector & peer() const; protected: ActiveConnector(); + + private: + // called by the peer() to forward throttling notifications + void notifyThrottle(); + void notifyUnthrottle(); + + // called by ForwardingRoute to register a new route + void registerRoute(ForwardingRoute & route); + + Callback throttleCallback_; + Callback unthrottleCallback_; + + typedef std::vector NotifyRoutes; + NotifyRoutes notifyRoutes_; + + friend class senf::ppi::ForwardingRoute; + friend class PassiveConnector; }; /** \brief Input connector baseclass @@ -207,8 +239,7 @@ namespace connector { be added to the queue before it can be processed. */ class InputConnector - : public virtual Connector, - public SafeBool + : public virtual Connector { typedef std::deque Queue; public: @@ -224,28 +255,15 @@ namespace connector { request cannot be fulfilled, this is considered to be a logic error in the module implementation and an exception is raised. */ - bool boolean_test (); ///< Check packet availability - /**< Using any input connector in a boolean context will - check, whether an input request can be fulfilled. This - is always possible if the queue is non-empty. If the - input is active, it also returns when the connected - passive output is not throttled so new packets can be - requested. - - Calling the operator() member is an error if this test - returns \c false - - \returns \c true if operator() can be called, \c false - otherwise */ - OutputConnector & peer(); + OutputConnector & peer() const; - queue_iterator begin(); ///< Access queue begin (head) - queue_iterator end(); ///< Access queue past-the-end (tail) - Packet peek(); ///< Return head element from the queue + queue_iterator begin() const; ///< Access queue begin (head) + queue_iterator end() const; ///< Access queue past-the-end (tail) + Packet peek() const; ///< Return head element from the queue - size_type queueSize(); ///< Return number of elements in the queue - bool empty(); ///< Return queueSize() == 0 + size_type queueSize() const; ///< Return number of elements in the queue + bool empty() const; ///< Return queueSize() == 0 protected: InputConnector(); @@ -274,7 +292,7 @@ namespace connector { public: void operator()(Packet p); ///< Send out a packet - InputConnector & peer(); + InputConnector & peer() const; protected: OutputConnector(); @@ -285,21 +303,23 @@ namespace connector { /** \brief Combination of PassiveConnector and InputConnector - In addition to the native and the forwarded throttling state, the PassiveInput manages a - queue throttling state. This state is automatically managed by a queueing discipline. The - standard queueing discipline is ThresholdQueueing, which throttles the connection whenever - the queue length reaches the high threshold and unthrottles the connection when the queue - reaches the low threshold. The default queueing discipline is + The PassiveInput automatically controls the connectors throttling state using a queueing + discipline. The standard queueing discipline is ThresholdQueueing, which throttles the + connection whenever the queue length reaches the high threshold and unthrottles the + connection when the queue reaches the low threshold. The default queueing discipline is ThresholdQueueing(1,0) which will throttle the input whenever the queue is non-empty. */ class PassiveInput - : public PassiveConnector, public InputConnector + : public PassiveConnector, public InputConnector, + public SafeBool { public: PassiveInput(); - ActiveOutput & peer(); + ActiveOutput & peer() const; + + bool boolean_test() const; template void qdisc(QDisc const & disc); ///< Change the queueing discipline @@ -311,18 +331,21 @@ namespace connector { private: void v_enqueueEvent(); void v_dequeueEvent(); + void v_unthrottleEvent(); boost::scoped_ptr qdisc_; - QueueingDiscipline::State qstate_; }; /** \brief Combination of PassiveConnector and OutputConnector */ class PassiveOutput - : public PassiveConnector, public OutputConnector + : public PassiveConnector, public OutputConnector, + public SafeBool { public: - ActiveInput & peer(); + ActiveInput & peer() const; + + bool boolean_test() const; void connect(ActiveInput & target); @@ -332,10 +355,13 @@ namespace connector { /** \brief Combination of ActiveConnector and InputConnector */ class ActiveInput - : public ActiveConnector, public InputConnector + : public ActiveConnector, public InputConnector, + public SafeBool { public: - PassiveOutput & peer(); + PassiveOutput & peer() const; + + bool boolean_test() const; void request(); ///< request more packets without dequeuing any packet @@ -346,10 +372,13 @@ namespace connector { /** \brief Combination of ActiveConnector and OutputConnector */ class ActiveOutput - : public ActiveConnector, public OutputConnector + : public ActiveConnector, public OutputConnector, + public SafeBool { public: - PassiveInput & peer(); + PassiveInput & peer() const; + + bool boolean_test() const; void connect(PassiveInput & target); }; diff --git a/PPI/Connectors.test.cc b/PPI/Connectors.test.cc index 694697b..80548f2 100644 --- a/PPI/Connectors.test.cc +++ b/PPI/Connectors.test.cc @@ -37,8 +37,8 @@ #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// -namespace debug = senf::ppi::module::debug; namespace ppi = senf::ppi; +namespace debug = ppi::module::debug; // For each type of connector we use the corresponding debug module. Additionally, we always need // the corresponding connected module since otherwise the connectors cannot be connected anywhere @@ -53,6 +53,7 @@ BOOST_AUTO_UNIT_TEST(connector) debug::PassivePacketSink target; ppi::connect(source.output,target.input); + ppi::init(); BOOST_CHECK_EQUAL( & source.output.module(), & source ); BOOST_CHECK_EQUAL( & target.input.module(), & target ); @@ -66,11 +67,11 @@ BOOST_AUTO_UNIT_TEST(passiveConnector) debug::PassivePacketSink target; ppi::connect(source.output,target.input); + ppi::init(); // onRequest is implicitly tested within the PassivePacketSink implementation which is tested // in DebugModules.test.cc -#if 0 target.input.throttle(); BOOST_CHECK( target.input.throttled() ); BOOST_CHECK( target.input.nativeThrottled() ); @@ -78,9 +79,210 @@ BOOST_AUTO_UNIT_TEST(passiveConnector) target.input.unthrottle(); BOOST_CHECK( ! target.input.throttled() ); BOOST_CHECK( ! target.input.nativeThrottled() ); -#endif + + BOOST_CHECK_EQUAL( & target.input.peer(), & source.output ); } +namespace { + + bool called = false; + + void handler() { called = true; } +} + +BOOST_AUTO_UNIT_TEST(activeConnector) +{ + debug::ActivePacketSource source; + debug::PassivePacketSink target; + + ppi::connect(source.output,target.input); + ppi::init(); + + source.output.onThrottle(handler); + BOOST_CHECK( ! called ); + target.input.throttle(); + BOOST_CHECK( called ); + called = false; + target.input.unthrottle(); + BOOST_CHECK( ! called ); + source.output.onThrottle(); + source.output.onUnthrottle(handler); + BOOST_CHECK( ! called ); + target.input.throttle(); + BOOST_CHECK( ! called ); + target.input.unthrottle(); + BOOST_CHECK( called ); + source.output.onUnthrottle(); + called = false; + BOOST_CHECK( ! called ); + target.input.throttle(); + target.input.unthrottle(); + BOOST_CHECK( ! called ); + + BOOST_CHECK_EQUAL( & source.output.peer(), & target.input ); +} + +BOOST_AUTO_UNIT_TEST(inputConnector) +{ + debug::ActivePacketSource source; + debug::PassivePacketSink target; + + ppi::connect(source.output,target.input); + ppi::init(); + + // operator() is implicitly tested within the Active/PassivePacketSink implementation which is + // tested in DebugModules.test.cc + + // peek() is implicitly tested within the Active/PassivePacketSink implementation + + BOOST_CHECK_EQUAL ( & target.input.peer(), & source.output ); + + BOOST_CHECK( target.input.begin() == target.input.end() ); + BOOST_CHECK_EQUAL( target.input.queueSize(), 0u ); + BOOST_CHECK( target.input.empty() ); +} + +BOOST_AUTO_UNIT_TEST(outputConnector) +{ + debug::ActivePacketSource source; + debug::PassivePacketSink target; + + ppi::connect(source.output,target.input); + ppi::init(); + + // operator() is implicitly tested within the Active/PassivePacketSource implementation which is + // tested in DebugModules.test.cc + + BOOST_CHECK_EQUAL( & source.output.peer(), & target.input ); +} + +namespace { + + class PassiveInputTest + : public ppi::module::Module + { + public: + ppi::connector::PassiveInput input; + + PassiveInputTest() : counter() { + noroute(input); + input.onRequest(&PassiveInputTest::request); + } + + void request() { + ++ counter; + } + + unsigned counter; + }; +} + +BOOST_AUTO_UNIT_TEST(passiveInput) +{ + debug::ActivePacketSource source; + PassiveInputTest target; + + ppi::connect(source.output,target.input); + ppi::init(); + + BOOST_CHECK_EQUAL( & target.input.peer(), & source.output ); + + target.input.throttle(); + senf::Packet p (senf::DataPacket::create()); + source.submit(p); + + BOOST_CHECK_EQUAL( target.counter, 0u ); + BOOST_CHECK( target.input ); + BOOST_CHECK_EQUAL( target.input.queueSize(), 1u ); + target.input.unthrottle(); + BOOST_CHECK( target.input ); + BOOST_CHECK_EQUAL( target.counter, 1u ); + + BOOST_CHECK( target.input() == p ); + BOOST_CHECK( ! target.input ); + + source.submit(p); + + BOOST_CHECK_EQUAL( target.counter, 2u ); + BOOST_CHECK( target.input.throttled() ); + BOOST_CHECK( target.input() == p ); + BOOST_CHECK( ! target.input.throttled() ); + + target.input.qdisc(ppi::ThresholdQueueing(2,0)); + + source.submit(p); + BOOST_CHECK ( ! target.input.throttled() ); + source.submit(p); + BOOST_CHECK( target.input.throttled() ); + target.input(); + BOOST_CHECK( target.input.throttled() ); + target.input(); + BOOST_CHECK( ! target.input.throttled() ); +} + +BOOST_AUTO_UNIT_TEST(passiveOutput) +{ + debug::PassivePacketSource source; + debug::ActivePacketSink target; + + ppi::connect(source.output,target.input); + ppi::init(); + + senf::Packet p (senf::DataPacket::create()); + source.submit(p); + + BOOST_CHECK_EQUAL( & source.output.peer(), & target.input ); + + BOOST_CHECK( source.output ); + + source.submit(p); + BOOST_CHECK( target.request() == p ); + + // connect() is tested indirectly via ppi::connect +} + +BOOST_AUTO_UNIT_TEST(activeInput) +{ + debug::PassivePacketSource source; + debug::ActivePacketSink target; + + ppi::connect(source.output,target.input); + ppi::init(); + + BOOST_CHECK_EQUAL( & target.input.peer(), & source.output ); + + BOOST_CHECK ( ! target.input ); + + senf::Packet p (senf::DataPacket::create()); + source.submit(p); + + BOOST_CHECK( target.input ); + BOOST_CHECK( target.request() == p ); + + source.submit(p); + target.input.request(); + BOOST_CHECK_EQUAL( target.input.queueSize(), 1u ); + BOOST_CHECK( target.input ); + BOOST_CHECK( target.request() == p ); +} + +BOOST_AUTO_UNIT_TEST(activeOutput) +{ + debug::ActivePacketSource source; + debug::PassivePacketSink target; + + ppi::connect(source.output,target.input); + ppi::init(); + + BOOST_CHECK_EQUAL( & source.output.peer(), & target.input ); + BOOST_CHECK( source.output ); + target.input.throttle(); + BOOST_CHECK( ! source.output ); + + // connect() is tested indirectly via ppi::connect +} + + ///////////////////////////////cc.e//////////////////////////////////////// #undef prefix_ diff --git a/PPI/DebugModules.cci b/PPI/DebugModules.cci index 6accafa..aa2a49e 100644 --- a/PPI/DebugModules.cci +++ b/PPI/DebugModules.cci @@ -53,6 +53,7 @@ prefix_ senf::ppi::module::debug::PassivePacketSource::PassivePacketSource() prefix_ void senf::ppi::module::debug::PassivePacketSource::submit(Packet packet) { packets_.push_back(packet); + output.unthrottle(); } prefix_ bool senf::ppi::module::debug::PassivePacketSource::empty() @@ -71,8 +72,16 @@ senf::ppi::module::debug::PassivePacketSource::size() prefix_ void senf::ppi::module::debug::PassivePacketSource::request() { + BOOST_ASSERT( ! packets_.empty() ); output(packets_.front()); packets_.pop_front(); + if (packets_.empty()) + output.throttle(); +} + +prefix_ void senf::ppi::module::debug::PassivePacketSource::init() +{ + output.throttle(); } /////////////////////////////////////////////////////////////////////////// diff --git a/PPI/DebugModules.hh b/PPI/DebugModules.hh index 5545871..3ff2548 100644 --- a/PPI/DebugModules.hh +++ b/PPI/DebugModules.hh @@ -69,6 +69,7 @@ namespace debug { private: void request(); + void init(); Queue packets_; }; diff --git a/PPI/DebugModules.test.cc b/PPI/DebugModules.test.cc index fd95c32..3f19899 100644 --- a/PPI/DebugModules.test.cc +++ b/PPI/DebugModules.test.cc @@ -48,12 +48,16 @@ BOOST_AUTO_UNIT_TEST(debugModules) debug::PassivePacketSink sink; ppi::connect(source.output, sink.input); - + ppi::init(); + senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u }; senf::Packet p (senf::DataPacket::create(data)); + BOOST_CHECK( ! sink.input.throttled() ); + source.submit(p); - + + BOOST_CHECK( ! sink.input.throttled() ); BOOST_CHECK_EQUAL( sink.size(), 1u ); BOOST_CHECK( ! sink.empty() ); BOOST_CHECK_EQUAL( @@ -73,6 +77,7 @@ BOOST_AUTO_UNIT_TEST(debugModules) debug::ActivePacketSink sink; ppi::connect(source.output, sink.input); + ppi::init(); senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u }; senf::Packet p (senf::DataPacket::create(data)); diff --git a/PPI/Module.cci b/PPI/Module.cci index 04ecc61..382ac57 100644 --- a/PPI/Module.cci +++ b/PPI/Module.cci @@ -27,6 +27,7 @@ #include "Route.hh" #include "Connectors.hh" #include "EventManager.hh" +#include "ModuleManager.hh" #define prefix_ inline ///////////////////////////////cci.p/////////////////////////////////////// @@ -34,10 +35,9 @@ /////////////////////////////////////////////////////////////////////////// // senf::ppi::module::Module -prefix_ void senf::ppi::module::Module::noroute(connector::Connector & connector) +prefix_ senf::ppi::module::Module::~Module() { - registerConnector(connector); - connector.setModule(*this); + moduleManager().unregisterModule(*this); } prefix_ boost::posix_time::ptime senf::ppi::module::Module::eventTime() @@ -49,16 +49,32 @@ prefix_ boost::posix_time::ptime senf::ppi::module::Module::eventTime() // protected members prefix_ senf::ppi::module::Module::Module() -{} +{ + moduleManager().registerModule(*this); +} + +prefix_ void senf::ppi::module::Module::noroute(connector::Connector & connector) +{ + registerConnector(connector); + connector.setModule(*this); +} //////////////////////////////////////// // private members +prefix_ void senf::ppi::module::Module::init() +{} + prefix_ senf::ppi::EventManager & senf::ppi::module::Module::eventManager() { return EventManager::instance(); } +prefix_ senf::ppi::ModuleManager & senf::ppi::module::Module::moduleManager() +{ + return ModuleManager::instance(); +} + prefix_ void senf::ppi::module::Module::registerConnector(connector::Connector & connector) { connectorRegistry_.push_back(&connector); diff --git a/PPI/Module.hh b/PPI/Module.hh index 1a3d22b..0002b05 100644 --- a/PPI/Module.hh +++ b/PPI/Module.hh @@ -57,6 +57,9 @@ namespace module { class Module : boost::noncopyable { + public: + virtual ~Module(); + protected: Module(); @@ -122,7 +125,10 @@ namespace module { ///< event private: + virtual void init(); + EventManager & eventManager(); + ModuleManager & moduleManager(); void registerConnector(connector::Connector & connector); RouteBase & addRoute(std::auto_ptr route); @@ -135,6 +141,7 @@ namespace module { template friend class detail::RouteHelper; + friend class senf::ppi::ModuleManager; }; /** \brief Connect compatible connectors diff --git a/PPI/ModuleManager.cc b/PPI/ModuleManager.cc new file mode 100644 index 0000000..55c65d2 --- /dev/null +++ b/PPI/ModuleManager.cc @@ -0,0 +1,67 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief ModuleManager non-inline non-template implementation */ + +#include "ModuleManager.hh" +//#include "ModuleManager.ih" + +// Custom includes +#include "Scheduler/Scheduler.hh" +#include "Module.hh" + +//#include "ModuleManager.mpp" +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::ModuleManager + +prefix_ void senf::ppi::ModuleManager::init() +{ + ModuleRegistry::const_iterator i (moduleRegistry_.begin()); + ModuleRegistry::const_iterator const i_end (moduleRegistry_.end()); + for (; i != i_end; ++i) + (*i)->init(); +} + +prefix_ void senf::ppi::ModuleManager::run() +{ + init(); + Scheduler::instance().process(); +} + +///////////////////////////////cc.e//////////////////////////////////////// +#undef prefix_ +//#include "ModuleManager.mpp" + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/PPI/ModuleManager.cci b/PPI/ModuleManager.cci new file mode 100644 index 0000000..f8b2a08 --- /dev/null +++ b/PPI/ModuleManager.cci @@ -0,0 +1,65 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief ModuleManager inline non-template implementation */ + +// Custom includes +#include + +#define prefix_ inline +///////////////////////////////cci.p/////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::ModuleManager + +prefix_ senf::ppi::ModuleManager & senf::ppi::ModuleManager::instance() +{ + static ModuleManager manager; + return manager; +} + +prefix_ void senf::ppi::ModuleManager::registerModule(module::Module & module) +{ + moduleRegistry_.push_back(&module); +} + +prefix_ void senf::ppi::ModuleManager::unregisterModule(module::Module & module) +{ + moduleRegistry_.erase( + std::remove(moduleRegistry_.begin(), moduleRegistry_.end(), & module), + moduleRegistry_.end()); +} + +///////////////////////////////cci.e/////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/PPI/ModuleManager.hh b/PPI/ModuleManager.hh new file mode 100644 index 0000000..abf7cbc --- /dev/null +++ b/PPI/ModuleManager.hh @@ -0,0 +1,92 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief ModuleManager public header */ + +#ifndef HH_ModuleManager_ +#define HH_ModuleManager_ 1 + +// Custom includes +#include +#include "predecl.hh" + +//#include "ModuleManager.mpp" +///////////////////////////////hh.p//////////////////////////////////////// + +namespace senf { +namespace ppi { + + /** \brief + */ + class ModuleManager + { + public: + /////////////////////////////////////////////////////////////////////////// + ///\name Structors and default members + ///@{ + + static ModuleManager & instance(); + + // default default constructor + // default copy constructor + // default copy assignment + // default destructor + + // no conversion constructors + + ///@} + /////////////////////////////////////////////////////////////////////////// + + void registerModule(module::Module & module); + void unregisterModule(module::Module & module); + + void init(); + void run(); + + protected: + + private: + typedef std::vector ModuleRegistry; + + ModuleRegistry moduleRegistry_; + }; + + +}} + +///////////////////////////////hh.e//////////////////////////////////////// +#include "ModuleManager.cci" +//#include "ModuleManager.ct" +//#include "ModuleManager.cti" +#endif + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/PPI/Queueing.cc b/PPI/Queueing.cc index c084f51..aa59e3d 100644 --- a/PPI/Queueing.cc +++ b/PPI/Queueing.cc @@ -27,16 +27,28 @@ //#include "Queueing.ih" // Custom includes +#include "Connectors.hh" //#include "Queueing.mpp" #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// /////////////////////////////////////////////////////////////////////////// -// senf::ppi::QueueingDiscipline +// senf::ppi::ThresholdQueueing -prefix_ senf::ppi::QueueingDiscipline::~QueueingDiscipline() -{} +prefix_ void senf::ppi::ThresholdQueueing::update(connector::PassiveInput & input, Event event) +{ + switch (event) { + case ENQUEUE: + if (input.queueSize() >= high_) + input.throttle(); + break; + case DEQUEUE: + if (input.queueSize() <= low_) + input.unthrottle(); + break; + } +} ///////////////////////////////cc.e//////////////////////////////////////// #undef prefix_ diff --git a/PPI/Queueing.cci b/PPI/Queueing.cci new file mode 100644 index 0000000..76eea26 --- /dev/null +++ b/PPI/Queueing.cci @@ -0,0 +1,56 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief Queueing inline non-template implementation */ + +// Custom includes + +#define prefix_ inline +///////////////////////////////cci.p/////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::QueueingDiscipline + +prefix_ senf::ppi::QueueingDiscipline::~QueueingDiscipline() +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::ThresholdQueueing + +prefix_ senf::ppi::ThresholdQueueing::ThresholdQueueing(unsigned high, unsigned low) + : high_(high), low_(low) +{} + +///////////////////////////////cci.e/////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/PPI/Queueing.hh b/PPI/Queueing.hh index 7b7421e..59e8e67 100644 --- a/PPI/Queueing.hh +++ b/PPI/Queueing.hh @@ -43,6 +43,12 @@ namespace ppi { operating system by sending throttling events. The PPI will never loose a packet internally (if not a module explicitly does so), however it may disable reception of new incoming packets which will then probably be dropped by the operating system. + + \attention Notifications may be forwarded to the QueueingDiscipline implementation + out-of-order: A dequeue event may be notified before the corresponding enqueue + event (this happens to optimize away transient throttling state changes which would + otherwise occur if a packet is entered into the queue and then removed from it in the + same processing step). */ class QueueingDiscipline { @@ -50,22 +56,35 @@ namespace ppi { virtual ~QueueingDiscipline(); enum Event { ENQUEUE, DEQUEUE }; ///< Possible queueing events - enum State { THROTTLED, UNTHROTTLED }; ///< Possible queueing states - virtual State update(connector::PassiveInput & input, Event event) = 0; + virtual void update(connector::PassiveInput & input, Event event) = 0; ///< Calculate new queueing state /**< Whenever the queue is manipulated, this member is - called to calculate the new throttling state. + called to calculate the new throttling state. The + member must call \a input's \c throttle() or \c + unthrottle() member to set the new throttling state. \param[in] input Connector holding the queue - \param[in] event Type of event triggering the update - \returns new throttling state */ + \param[in] event Type of event triggering the update */ + }; + + class ThresholdQueueing + : public QueueingDiscipline + { + public: + ThresholdQueueing(unsigned high, unsigned low); + + virtual void update(connector::PassiveInput & input, Event event); + + private: + unsigned high_; + unsigned low_; }; }} ///////////////////////////////hh.e//////////////////////////////////////// -//#include "Queueing.cci" +#include "Queueing.cci" //#include "Queueing.ct" //#include "Queueing.cti" #endif diff --git a/PPI/Route.cci b/PPI/Route.cci index 100b09c..01c6f3a 100644 --- a/PPI/Route.cci +++ b/PPI/Route.cci @@ -24,6 +24,8 @@ \brief Route inline non-template implementation */ // Custom includes +#include "Connectors.hh" +#include "Events.hh" #define prefix_ inline ///////////////////////////////cci.p/////////////////////////////////////// @@ -31,6 +33,9 @@ /////////////////////////////////////////////////////////////////////////// // senf::ppi::RouteBase +prefix_ senf::ppi::RouteBase::~RouteBase() +{} + //////////////////////////////////////// // protected members @@ -38,6 +43,202 @@ prefix_ senf::ppi::RouteBase::RouteBase(module::Module & module) : module_(&module) {} +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::ForwardingRoute + +prefix_ bool senf::ppi::ForwardingRoute::autoThrottling() +{ + return autoThrottling_; +} + +prefix_ void senf::ppi::ForwardingRoute::autoThrottling(bool state) +{ + autoThrottling_ = state; +} + +//////////////////////////////////////// +// protected members + +prefix_ senf::ppi::ForwardingRoute::ForwardingRoute(module::Module & module) + : RouteBase(module), autoThrottling_(false) +{} + +prefix_ void senf::ppi::ForwardingRoute::registerRoute(connector::ActiveConnector & connector) +{ + connector.registerRoute(*this); +} + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::ForwardingRoute::notifyThrottle() +{ + v_notifyThrottle(); +} + +prefix_ void senf::ppi::ForwardingRoute::notifyUnthrottle() +{ + v_notifyUnthrottle(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::NonForwardingRouteImplementation + +prefix_ senf::ppi::detail::NonForwardingRouteImplementation:: +NonForwardingRouteImplementation(module::Module & module, connector::InputConnector & source, + connector::OutputConnector & target) + : RouteBase(module), source_(&source), target_(&target) +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::NonForwardingRouteToEventImplementation + +prefix_ +senf::ppi::detail::NonForwardingRouteToEventImplementation:: +NonForwardingRouteToEventImplementation(module::Module & module, + connector::InputConnector & source, + EventDescriptor & target) + : RouteBase(module), source_(&source), target_(&target) +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::NonForwardingRouteFromEventImplementation + +prefix_ +senf::ppi::detail::NonForwardingRouteFromEventImplementation:: +NonForwardingRouteFromEventImplementation(module::Module & module, EventDescriptor & source, + connector::OutputConnector & target) + : RouteBase(module), source_(&source), target_(&target) +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::ForwardForwardingRouteImplementation + +prefix_ +senf::ppi::detail::ForwardForwardingRouteImplementation:: +ForwardForwardingRouteImplementation(module::Module & module, connector::ActiveInput & source, + connector::PassiveOutput & target) + : ForwardingRoute(module), source_(&source), target_(&target) +{ + registerRoute(*source_); +} + +prefix_ void senf::ppi::detail::ForwardForwardingRouteImplementation::v_notifyThrottle() +{ + if (autoThrottling()) + target_->notifyThrottle(); +} + +prefix_ void senf::ppi::detail::ForwardForwardingRouteImplementation::v_notifyUnthrottle() +{ + if (autoThrottling()) + target_->notifyUnthrottle(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::BackwardForwardingRouteImplementation + +prefix_ +senf::ppi::detail::BackwardForwardingRouteImplementation:: +BackwardForwardingRouteImplementation(module::Module & module, + connector::PassiveInput & source, + connector::ActiveOutput & target) + : ForwardingRoute(module), source_(&source), target_(&target) +{ + registerRoute(*target_); +} + +prefix_ void senf::ppi::detail::BackwardForwardingRouteImplementation::v_notifyThrottle() +{ + if (autoThrottling()) + source_->notifyThrottle(); +} + +prefix_ void senf::ppi::detail::BackwardForwardingRouteImplementation::v_notifyUnthrottle() +{ + if (autoThrottling()) + source_->notifyUnthrottle(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::ForwardForwardingRouteToEventImplementation + +prefix_ +senf::ppi::detail::ForwardForwardingRouteToEventImplementation:: +ForwardForwardingRouteToEventImplementation(module::Module & module, + connector::ActiveInput & source, + EventDescriptor & target) + : ForwardingRoute(module), source_(&source), target_(&target) +{ + registerRoute(*source_); +} + +prefix_ void senf::ppi::detail::ForwardForwardingRouteToEventImplementation::v_notifyThrottle() +{ + if (autoThrottling()) + target_->enabled(false); +} + +prefix_ void +senf::ppi::detail::ForwardForwardingRouteToEventImplementation::v_notifyUnthrottle() +{ + if (autoThrottling()) + target_->enabled(true); +} + +/////////////////////////////////////////////////////////////////////////// +//senf::ppi::detail::BackwardForwardingRouteFromEventImplementation + +prefix_ +senf::ppi::detail::BackwardForwardingRouteFromEventImplementation:: +BackwardForwardingRouteFromEventImplementation(module::Module & module, + EventDescriptor & source, + connector::ActiveOutput & target) + : ForwardingRoute(module), source_(&source), target_(&target) +{ + registerRoute(*target_); +} + +prefix_ void +senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::v_notifyThrottle() +{ + if (autoThrottling()) + source_->enabled(false); +} + +prefix_ void +senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::v_notifyUnthrottle() +{ + if (autoThrottling()) + source_->enabled(true); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::RouteImplementation + +prefix_ +senf::ppi::detail::RouteImplementation:: +RouteImplementation(module::Module & module, connector::ActiveInput & source, + connector::PassiveOutput & target) + : ForwardForwardingRouteImplementation(module, source, target) +{} + +//////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::RouteImplementation + +prefix_ +senf::ppi::detail::RouteImplementation:: +RouteImplementation(module::Module & module, connector::PassiveInput & source, + connector::ActiveOutput & target) + : BackwardForwardingRouteImplementation(module, source, target) +{} + ///////////////////////////////cci.e/////////////////////////////////////// #undef prefix_ diff --git a/PPI/Route.cti b/PPI/Route.cti index 3d0f465..015ea90 100644 --- a/PPI/Route.cti +++ b/PPI/Route.cti @@ -31,52 +31,65 @@ ///////////////////////////////cti.p/////////////////////////////////////// /////////////////////////////////////////////////////////////////////////// -// senf::ppi::Route +// senf::ppi::detail::RouteImplementation -//////////////////////////////////////// -// protected members +template +prefix_ +senf::ppi::detail::RouteImplementation:: +RouteImplementation(module::Module & module, Source & source, Target & target) + : NonForwardingRouteImplementation(module, source, target) +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::RouteImplementation template -prefix_ senf::ppi::Route::Route(module::Module & module, Source & source, - Target & target) - : Implementation(module, source, target) +prefix_ +senf::ppi::detail::RouteImplementation:: +RouteImplementation(module::Module & module, Source & source, Target & target) + : NonForwardingRouteFromEventImplementation(module, source, target) {} /////////////////////////////////////////////////////////////////////////// -// senf::ppi::detail::RouteImplementation +// senf::ppi::detail::RouteImplementation -//////////////////////////////////////// -// protected members - -template -prefix_ senf::ppi::detail::RouteImplementation:: -RouteImplementation(module::Module & module, connector::InputConnector & source, - connector::OutputConnector & target) - : RouteBase(module), source_(&source), target_(&target) +template +prefix_ +senf::ppi::detail::RouteImplementation:: +RouteImplementation(module::Module & module, Source & source, Target & target) + : NonForwardingRouteToEventImplementation(module, source, target) {} /////////////////////////////////////////////////////////////////////////// -// senf::ppi::detail::RouteImplementation +// senf::ppi::detail::RouteImplementation -//////////////////////////////////////// -// protected members +template +prefix_ +senf::ppi::detail::RouteImplementation:: +RouteImplementation(module::Module & module, connector::ActiveInput & source, Event & target) + : ForwardForwardingRouteToEventImplementation(module, source, target) +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::RouteImplementation -prefix_ senf::ppi::detail::RouteImplementation:: -RouteImplementation(module::Module & module, EventDescriptor & source, - connector::OutputConnector & target) - : RouteBase(module), source_(&source), target_(&target) +template +prefix_ +senf::ppi::detail::RouteImplementation:: +RouteImplementation(module::Module & module, Event & source, connector::ActiveOutput & target) + : BackwardForwardingRouteFromEventImplementation(module, source, target) {} /////////////////////////////////////////////////////////////////////////// -// senf::ppi::detail::RouteImplementation +// senf::ppi::Route //////////////////////////////////////// // protected members -prefix_ senf::ppi::detail::RouteImplementation:: -RouteImplementation(module::Module & module, connector::InputConnector & source, - EventDescriptor & target) -: RouteBase(module), source_(&source), target_(&target) +template +prefix_ senf::ppi::Route::Route(module::Module & module, Source & source, + Target & target) + : Implementation(module, source, target) {} ///////////////////////////////cti.e/////////////////////////////////////// diff --git a/PPI/Route.hh b/PPI/Route.hh index 54c4139..e3eee1c 100644 --- a/PPI/Route.hh +++ b/PPI/Route.hh @@ -37,6 +37,20 @@ namespace ppi { class RouteBase { public: + virtual ~RouteBase(); + + protected: + RouteBase(module::Module & module); + + private: + module::Module * module_; + }; + + class ForwardingRoute + : public RouteBase + { + public: + bool autoThrottling(); void autoThrottling(bool state); ///< Change automatic throttle notification forwarding /**< By default, throttle notifications are automatically forwarded from active to passive connectors. This may @@ -50,23 +64,31 @@ namespace ppi { disable the event whenever a throttling notification comes in. Respective for unthrottle notifications. - \param[in] state New throttle forwarding state - - \implementation This class will be implemented using a - baseclass, this template and several - specializations. However, this is an implementation - detail which does not affect the exposed - interface. */ - + \param[in] state New throttle forwarding state */ + protected: - RouteBase(module::Module & module); + ForwardingRoute(module::Module & module); + + // Called to register this route with the connectors forwarding information base + void registerRoute(connector::ActiveConnector & connector); private: - module::Module * module_; + // called to forward a throttling notification along the route + void notifyThrottle(); + void notifyUnthrottle(); + + // Implemented in the derived classes to forward throttling notifications + virtual void v_notifyThrottle() = 0; + virtual void v_notifyUnthrottle() = 0; + + bool autoThrottling_; + + friend class connector::ActiveConnector; }; }} +// We need detail::RouteImplementation here ... #include "Route.ih" namespace senf { @@ -79,13 +101,10 @@ namespace ppi { */ template class Route - : public detail::RouteImplementation< boost::is_base_of::value, - boost::is_base_of::value > + : public detail::RouteImplementation { private: - typedef detail::RouteImplementation< - boost::is_base_of::value, - boost::is_base_of::value > Implementation; + typedef detail::RouteImplementation Implementation; Route(module::Module & module, Source & source, Target & target); diff --git a/PPI/Route.ih b/PPI/Route.ih index 143a7a4..1e76429 100644 --- a/PPI/Route.ih +++ b/PPI/Route.ih @@ -34,50 +34,183 @@ namespace senf { namespace ppi { namespace detail { - template - class RouteImplementation + // Valid Forwarding routes: + // Forward throttling + // ActiveInput -> PassiveOutput + // tempalte<> RouteImplementation + // ActiveInput -> Event + // template class RouteImplementation + // Backward throttling + // PassiveInput -> ActiveOutput + // template<> RouteImplementation + // Event -> ActiveOutput + // template class RouteImplementation + + class NonForwardingRouteImplementation : public RouteBase { protected: - RouteImplementation(module::Module & module, - connector::InputConnector & source, - connector::OutputConnector & target); + NonForwardingRouteImplementation(module::Module & module, + connector::InputConnector & source, + connector::OutputConnector & target); private: connector::InputConnector * source_; connector::OutputConnector * target_; }; -# ifndef DOXYGEN + class NonForwardingRouteToEventImplementation + : public RouteBase + { + protected: + NonForwardingRouteToEventImplementation(module::Module & module, + connector::InputConnector & source, + EventDescriptor & target); + + private: + connector::InputConnector * source_; + EventDescriptor * target_; + }; - template <> - class RouteImplementation + class NonForwardingRouteFromEventImplementation : public RouteBase { protected: - RouteImplementation(module::Module & module, - EventDescriptor & source, - connector::OutputConnector & target); + NonForwardingRouteFromEventImplementation(module::Module & module, + EventDescriptor & source, + connector::OutputConnector & target); private: EventDescriptor * source_; connector::OutputConnector * target_; }; - template<> - class RouteImplementation - : public RouteBase + class ForwardForwardingRouteImplementation + : public ForwardingRoute { protected: - RouteImplementation(module::Module & module, - connector::InputConnector & source, - EventDescriptor & target); + ForwardForwardingRouteImplementation(module::Module & module, + connector::ActiveInput & source, + connector::PassiveOutput & target); private: - connector::InputConnector * source_; + virtual void v_notifyThrottle(); + virtual void v_notifyUnthrottle(); + + connector::ActiveInput * source_; + connector::PassiveOutput * target_; + }; + + class BackwardForwardingRouteImplementation + : public ForwardingRoute + { + protected: + BackwardForwardingRouteImplementation(module::Module & module, + connector::PassiveInput & source, + connector::ActiveOutput & target); + + private: + virtual void v_notifyThrottle(); + virtual void v_notifyUnthrottle(); + + connector::PassiveInput * source_; + connector::ActiveOutput * target_; + }; + + class ForwardForwardingRouteToEventImplementation + : public ForwardingRoute + { + protected: + ForwardForwardingRouteToEventImplementation(module::Module & module, + connector::ActiveInput & source, + EventDescriptor & target); + + private: + virtual void v_notifyThrottle(); + virtual void v_notifyUnthrottle(); + + connector::ActiveInput * source_; EventDescriptor * target_; }; + class BackwardForwardingRouteFromEventImplementation + : public ForwardingRoute + { + protected: + BackwardForwardingRouteFromEventImplementation(module::Module & module, + EventDescriptor & source, + connector::ActiveOutput & target); + + private: + virtual void v_notifyThrottle(); + virtual void v_notifyUnthrottle(); + + EventDescriptor * source_; + connector::ActiveOutput * target_; + }; + + template + class RouteImplementation + : public NonForwardingRouteImplementation + { + protected: + RouteImplementation(module::Module & module, Source & source, Target & target); + }; + +# ifndef DOXYGEN + + template + class RouteImplementation + : public NonForwardingRouteFromEventImplementation + { + protected: + RouteImplementation(module::Module & module, Source & source, Target & target); + }; + + template + class RouteImplementation + : public NonForwardingRouteToEventImplementation + { + protected: + RouteImplementation(module::Module & module, Source & source, Target & target); + }; + + template<> + class RouteImplementation + : public ForwardForwardingRouteImplementation + { + protected: + RouteImplementation(module::Module & module, connector::ActiveInput & source, + connector::PassiveOutput & target); + }; + + template + class RouteImplementation + : public ForwardForwardingRouteToEventImplementation + { + protected: + RouteImplementation(module::Module & module, connector::ActiveInput & source, + Event & target); + }; + + template <> + class RouteImplementation + : public BackwardForwardingRouteImplementation + { + protected: + RouteImplementation(module::Module & module, connector::PassiveInput & source, + connector::ActiveOutput & target); + }; + + template + class RouteImplementation + : public BackwardForwardingRouteFromEventImplementation + { + protected: + RouteImplementation(module::Module & module, Event & source, + connector::ActiveOutput & target); + }; + # endif }}} diff --git a/PPI/Setup.cci b/PPI/Setup.cci index f14f33b..01c3168 100644 --- a/PPI/Setup.cci +++ b/PPI/Setup.cci @@ -25,7 +25,7 @@ // Custom includes #include "Connectors.hh" -#include "Scheduler/Scheduler.hh" +#include "ModuleManager.hh" #define prefix_ inline ///////////////////////////////cci.p/////////////////////////////////////// @@ -44,7 +44,12 @@ prefix_ void senf::ppi::connect(connector::PassiveOutput & source, prefix_ void senf::ppi::run() { - Scheduler::instance().process(); + ModuleManager::instance().run(); +} + +prefix_ void senf::ppi::init() +{ + ModuleManager::instance().init(); } ///////////////////////////////cci.e/////////////////////////////////////// diff --git a/PPI/Setup.hh b/PPI/Setup.hh index 5f0613a..2920e1e 100644 --- a/PPI/Setup.hh +++ b/PPI/Setup.hh @@ -39,6 +39,7 @@ namespace ppi { void connect(connector::PassiveOutput & source, connector::ActiveInput & target); void run(); + void init(); }} diff --git a/PPI/predecl.hh b/PPI/predecl.hh index 92dd000..be9c053 100644 --- a/PPI/predecl.hh +++ b/PPI/predecl.hh @@ -39,14 +39,26 @@ namespace ppi { template class EventImplementation; class EventManager; class RouteBase; + class ForwardingRoute; template class Route; class QueueingDiscipline; + class ModuleManager; namespace detail { class EventBindingBase; template class EventBinding; template struct EventArgType; - template class RouteImplementation; + class NonForwardingRouteImplementation; + class NonForwardingRouteToEventImplementation; + class NonForwardingRouteFromEventImplementation; + class ForwardForwardingRouteImplementation; + class BackwardForwardingRouteImplementation; + class ForwardForwardingRouteToEventImplementation; + class BackwardForwardingRouteFromEventImplementation; + template ::value, + bool trgEvent = boost::is_base_of::value> + class RouteImplementation; } namespace module {