//#include "Connectors.ih"
// Custom includes
+#include "Route.hh"
//#include "Connectors.mpp"
#define prefix_
///////////////////////////////cc.p////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
-// protected members
+// senf::ppi::connector::PassiveConnector
-prefix_ senf::ppi::connector::Connector::~Connector()
-{}
+////////////////////////////////////////
+// private members
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveConnector
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
+{
+ if (throttleCallback_)
+ throttleCallback_();
+ NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+ NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+ for (; i != i_end; ++i)
+ (*i)->notifyThrottle();
+}
-prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
+prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
{
- peer_ = & target;
- target.peer_ = this;
+ if (unthrottleCallback_)
+ unthrottleCallback_();
+ NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+ NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+ for (; i != i_end; ++i)
+ (*i)->notifyUnthrottle();
+}
+
+prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route)
+{
+ notifyRoutes_.push_back(&route);
}
///////////////////////////////////////////////////////////////////////////
// senf::ppi::connector::InputConnector
+prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+{
+ if (empty())
+ v_requestEvent();
+ Packet p;
+ if (! empty()) {
+ p = peek();
+ queue_.pop_back();
+ v_dequeueEvent();
+ }
+ return p;
+}
+
////////////////////////////////////////
// private members
prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent()
{
- ///\fixme Emit notifications when qstate_ changes
- if (qdisc_)
- qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
- else
- qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED;
emit();
+ qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
}
prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent()
{
- ///\fixme Emit notifications when qstate_ changes
- if (qdisc_)
- qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
- else
- qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED;
+ qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+}
+
+prefix_ void senf::ppi::connector::PassiveInput::v_unthrottleEvent()
+{
+ size_type n (queueSize());
+ while (n) {
+ emit();
+ size_type nn (queueSize());
+ if (n == nn)
+ break;
+ n = nn;
+ }
}
///////////////////////////////cc.e////////////////////////////////////////
// senf::ppi::connector::Connector
prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer()
+ const
{
BOOST_ASSERT(peer_);
return *peer_;
}
prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module()
+ const
{
BOOST_ASSERT(module_);
return *module_;
: peer_(), module_()
{}
+prefix_ senf::ppi::connector::Connector::~Connector()
+{}
+
+prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
+{
+ peer_ = & target;
+ target.peer_ = this;
+}
+
////////////////////////////////////////
// private members
}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveConnector
+// senf::ppi::connector::PassiveConnector
-////////////////////////////////////////
-// protected members
+prefix_ bool senf::ppi::connector::PassiveConnector::throttled()
+ const
+{
+ return nativeThrottled_ || remoteThrottled_;
+}
-prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
-{}
+prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled()
+ const
+{
+ return nativeThrottled_;
+}
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveConnector
+prefix_ void senf::ppi::connector::PassiveConnector::throttle()
+{
+ if (!throttled())
+ emitThrottle();
+ nativeThrottled_ = true;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::unthrottle()
+{
+ if (throttled() && ! remoteThrottled_) {
+ nativeThrottled_ = false;
+ emitUnthrottle();
+ } else
+ nativeThrottled_ = false;
+
+}
prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer()
+ const
{
return dynamic_cast<ActiveConnector&>(Connector::peer());
}
// protected members
prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector()
- : callback_()
+ : callback_(), remoteThrottled_(), nativeThrottled_()
{}
prefix_ void senf::ppi::connector::PassiveConnector::emit()
{
BOOST_ASSERT(callback_);
- callback_();
+ if (!throttled())
+ callback_();
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle()
+{
+ if (!throttled()) {
+ remoteThrottled_ = true;
+ emitThrottle();
+ }
+ else
+ remoteThrottled_ = true;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle()
+{
+ if (throttled() && !nativeThrottled_) {
+ remoteThrottled_ = false;
+ emitUnthrottle();
+ }
+ else
+ remoteThrottled_ = false;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle()
+{
+ peer().notifyThrottle();
}
+prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle()
+{
+ peer().notifyUnthrottle();
+ v_unthrottleEvent();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
+{}
+
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::InputConnector
+// senf::ppi::connector::ActiveConnector
-prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer()
+ const
{
- v_requestEvent();
- Packet p (peek());
- queue_.pop_back();
- v_dequeueEvent();
- return p;
+ return dynamic_cast<PassiveConnector&>(Connector::peer());
}
-prefix_ bool senf::ppi::connector::InputConnector::boolean_test()
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle()
{
- ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?)
- return ! empty();
+ throttleCallback_ = Callback();
}
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle()
+{
+ unthrottleCallback_ = Callback();
+}
+
+////////////////////////////////////////
+// protected members
+
+prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
+ : throttleCallback_(), unthrottleCallback_(), notifyRoutes_()
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::InputConnector
+
prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer()
+ const
{
return dynamic_cast<OutputConnector &>(Connector::peer());
}
prefix_ senf::ppi::connector::InputConnector::queue_iterator
senf::ppi::connector::InputConnector::begin()
+ const
{
return queue_.begin();
}
prefix_ senf::ppi::connector::InputConnector::queue_iterator
senf::ppi::connector::InputConnector::end()
+ const
{
return queue_.end();
}
prefix_ senf::Packet senf::ppi::connector::InputConnector::peek()
+ const
{
BOOST_ASSERT( ! queue_.empty() );
return queue_.back();
prefix_ senf::ppi::connector::InputConnector::size_type
senf::ppi::connector::InputConnector::queueSize()
+ const
{
return queue_.size();
}
prefix_ bool senf::ppi::connector::InputConnector::empty()
+ const
{
return queue_.empty();
}
}
prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer()
+ const
{
return dynamic_cast<InputConnector&>(Connector::peer());
}
// senf::ppi::connector::PassiveInput
prefix_ senf::ppi::connector::PassiveInput::PassiveInput()
- : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED)
+ : qdisc_(new ThresholdQueueing(1,0))
{}
prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer()
+ const
{
return dynamic_cast<ActiveOutput&>(Connector::peer());
}
+prefix_ bool senf::ppi::connector::PassiveInput::boolean_test()
+ const
+{
+ return ! empty();
+}
+
///////////////////////////////////////////////////////////////////////////
// senf::ppi::connector::PassiveOutput
prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer()
+ const
{
return dynamic_cast<ActiveInput&>(Connector::peer());
}
+prefix_ bool senf::ppi::connector::PassiveOutput::boolean_test()
+ const
+{
+ return true;
+}
+
prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target)
{
Connector::connect(target);
// senf::ppi::connector::ActiveInput
prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer()
+ const
{
return dynamic_cast<PassiveOutput&>(Connector::peer());
}
+prefix_ bool senf::ppi::connector::ActiveInput::boolean_test()
+ const
+{
+ return ! empty() || ! peer().throttled();
+}
+
prefix_ void senf::ppi::connector::ActiveInput::request()
{
peer().emit();
// senf::ppi::connector::ActiveOutput
prefix_ senf::ppi::connector::PassiveInput & senf::ppi::connector::ActiveOutput::peer()
+ const
{
return dynamic_cast<PassiveInput&>(Connector::peer());
}
+prefix_ bool senf::ppi::connector::ActiveOutput::boolean_test()
+ const
+{
+ return ! peer().throttled();
+}
+
prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target)
{
Connector::connect(target);
}
///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveConnector
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle(Handler handler)
+{
+ throttleCallback_ = detail::Callback<>::make(handler, module());
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle(Handler handler)
+{
+ unthrottleCallback_ = detail::Callback<>::make(handler, module());
+}
+
+///////////////////////////////////////////////////////////////////////////
// senf::ppi::connector::PassiveInput
template <class QDisc>
prefix_ void senf::ppi::connector::PassiveInput::qdisc(QDisc const & disc)
{
- qdisc_ = boost::scoped_ptr<QueueingDiscipline>(new QDisc(disc));
+ qdisc_.reset(new QDisc(disc));
}
///////////////////////////////cti.e///////////////////////////////////////
: boost::noncopyable
{
public:
- Connector & peer(); ///< Get peer connected to this connector
- module::Module & module(); ///< Get this connectors containing module
+ Connector & peer() const; ///< Get peer connected to this connector
+ module::Module & module() const; ///< Get this connectors containing module
protected:
Connector();
operation is to be performed. */
- bool throttled(); ///< Get accumulative throttling state
- bool nativeThrottled(); ///< Get native throttling state
+ bool throttled() const; ///< Get accumulative throttling state
+ bool nativeThrottled() const; ///< Get native throttling state
void throttle(); ///< Set native throttling
void unthrottle(); ///< Revoke native throttling
- ActiveConnector & peer();
+ ActiveConnector & peer() const;
protected:
PassiveConnector();
void emit();
private:
+ // Called by the routing to change the remote throttling state
void notifyThrottle(); ///< Forward a throttling notification to this connector
void notifyUnthrottle(); ///< Forward an unthrottling notification to this connector
+ // Internal members to emit throttling notifications
+ void emitThrottle();
+ void emitUnthrottle();
+
+ // Called after unthrottling the connector
+ virtual void v_unthrottleEvent();
+
typedef detail::Callback<>::type Callback;
Callback callback_;
-
- friend class ActiveConnector;
+
+ bool remoteThrottled_;
+ bool nativeThrottled_;
+
+ friend class senf::ppi::detail::ForwardForwardingRouteImplementation;
+ friend class senf::ppi::detail::BackwardForwardingRouteImplementation;
};
/** \brief Active connector baseclass
class ActiveConnector
: public virtual Connector
{
+ typedef detail::Callback<>::type Callback;
public:
template <class Handler>
- void onThrottle(Handler handle); ///< Register throttle notification handler
+ void onThrottle(Handler handler); ///< Register throttle notification handler
/**< The handler register here will be called, whenever a
throttle notification comes in. The \a handler argument
is either an arbitrary callable object or it is a
this input. In the second case, the pointer will
automatically be bound to the containing instance.
- \param[in] handle Handler to call on throttle
+ \param[in] handler Handler to call on throttle
notifications. */
+ void onThrottle();
template <class Handler>
- void onUnthrottle(Handler handle); ///< Register unthrottle notification handler
+ void onUnthrottle(Handler handler); ///< Register unthrottle notification handler
/**< The handler register here will be called, whenever an
unthrottle notification comes in. The \a handler
argument is either an arbitrary callable object or it
\param[in] handle Handler to call on unthrottle
notifications. */
+ void onUnthrottle();
- PassiveConnector & peer();
+ PassiveConnector & peer() const;
protected:
ActiveConnector();
+
+ private:
+ // called by the peer() to forward throttling notifications
+ void notifyThrottle();
+ void notifyUnthrottle();
+
+ // called by ForwardingRoute to register a new route
+ void registerRoute(ForwardingRoute & route);
+
+ Callback throttleCallback_;
+ Callback unthrottleCallback_;
+
+ typedef std::vector<ForwardingRoute*> NotifyRoutes;
+ NotifyRoutes notifyRoutes_;
+
+ friend class senf::ppi::ForwardingRoute;
+ friend class PassiveConnector;
};
/** \brief Input connector baseclass
be added to the queue before it can be processed.
*/
class InputConnector
- : public virtual Connector,
- public SafeBool<InputConnector>
+ : public virtual Connector
{
typedef std::deque<Packet> Queue;
public:
request cannot be fulfilled, this is considered to be a
logic error in the module implementation and an
exception is raised. */
- bool boolean_test (); ///< Check packet availability
- /**< Using any input connector in a boolean context will
- check, whether an input request can be fulfilled. This
- is always possible if the queue is non-empty. If the
- input is active, it also returns when the connected
- passive output is not throttled so new packets can be
- requested.
-
- Calling the operator() member is an error if this test
- returns \c false
-
- \returns \c true if operator() can be called, \c false
- otherwise */
- OutputConnector & peer();
+ OutputConnector & peer() const;
- queue_iterator begin(); ///< Access queue begin (head)
- queue_iterator end(); ///< Access queue past-the-end (tail)
- Packet peek(); ///< Return head element from the queue
+ queue_iterator begin() const; ///< Access queue begin (head)
+ queue_iterator end() const; ///< Access queue past-the-end (tail)
+ Packet peek() const; ///< Return head element from the queue
- size_type queueSize(); ///< Return number of elements in the queue
- bool empty(); ///< Return queueSize() == 0
+ size_type queueSize() const; ///< Return number of elements in the queue
+ bool empty() const; ///< Return queueSize() == 0
protected:
InputConnector();
public:
void operator()(Packet p); ///< Send out a packet
- InputConnector & peer();
+ InputConnector & peer() const;
protected:
OutputConnector();
/** \brief Combination of PassiveConnector and InputConnector
- In addition to the native and the forwarded throttling state, the PassiveInput manages a
- queue throttling state. This state is automatically managed by a queueing discipline. The
- standard queueing discipline is ThresholdQueueing, which throttles the connection whenever
- the queue length reaches the high threshold and unthrottles the connection when the queue
- reaches the low threshold. The default queueing discipline is
+ The PassiveInput automatically controls the connectors throttling state using a queueing
+ discipline. The standard queueing discipline is ThresholdQueueing, which throttles the
+ connection whenever the queue length reaches the high threshold and unthrottles the
+ connection when the queue reaches the low threshold. The default queueing discipline is
<tt>ThresholdQueueing(1,0)</tt> which will throttle the input whenever the queue is
non-empty.
*/
class PassiveInput
- : public PassiveConnector, public InputConnector
+ : public PassiveConnector, public InputConnector,
+ public SafeBool<PassiveInput>
{
public:
PassiveInput();
- ActiveOutput & peer();
+ ActiveOutput & peer() const;
+
+ bool boolean_test() const;
template <class QDisc>
void qdisc(QDisc const & disc); ///< Change the queueing discipline
private:
void v_enqueueEvent();
void v_dequeueEvent();
+ void v_unthrottleEvent();
boost::scoped_ptr<QueueingDiscipline> qdisc_;
- QueueingDiscipline::State qstate_;
};
/** \brief Combination of PassiveConnector and OutputConnector
*/
class PassiveOutput
- : public PassiveConnector, public OutputConnector
+ : public PassiveConnector, public OutputConnector,
+ public SafeBool<PassiveOutput>
{
public:
- ActiveInput & peer();
+ ActiveInput & peer() const;
+
+ bool boolean_test() const;
void connect(ActiveInput & target);
/** \brief Combination of ActiveConnector and InputConnector
*/
class ActiveInput
- : public ActiveConnector, public InputConnector
+ : public ActiveConnector, public InputConnector,
+ public SafeBool<ActiveInput>
{
public:
- PassiveOutput & peer();
+ PassiveOutput & peer() const;
+
+ bool boolean_test() const;
void request(); ///< request more packets without dequeuing any packet
/** \brief Combination of ActiveConnector and OutputConnector
*/
class ActiveOutput
- : public ActiveConnector, public OutputConnector
+ : public ActiveConnector, public OutputConnector,
+ public SafeBool<ActiveOutput>
{
public:
- PassiveInput & peer();
+ PassiveInput & peer() const;
+
+ bool boolean_test() const;
void connect(PassiveInput & target);
};
#define prefix_
///////////////////////////////cc.p////////////////////////////////////////
-namespace debug = senf::ppi::module::debug;
namespace ppi = senf::ppi;
+namespace debug = ppi::module::debug;
// For each type of connector we use the corresponding debug module. Additionally, we always need
// the corresponding connected module since otherwise the connectors cannot be connected anywhere
debug::PassivePacketSink target;
ppi::connect(source.output,target.input);
+ ppi::init();
BOOST_CHECK_EQUAL( & source.output.module(), & source );
BOOST_CHECK_EQUAL( & target.input.module(), & target );
debug::PassivePacketSink target;
ppi::connect(source.output,target.input);
+ ppi::init();
// onRequest is implicitly tested within the PassivePacketSink implementation which is tested
// in DebugModules.test.cc
-#if 0
target.input.throttle();
BOOST_CHECK( target.input.throttled() );
BOOST_CHECK( target.input.nativeThrottled() );
target.input.unthrottle();
BOOST_CHECK( ! target.input.throttled() );
BOOST_CHECK( ! target.input.nativeThrottled() );
-#endif
+
+ BOOST_CHECK_EQUAL( & target.input.peer(), & source.output );
}
+namespace {
+
+ bool called = false;
+
+ void handler() { called = true; }
+}
+
+BOOST_AUTO_UNIT_TEST(activeConnector)
+{
+ debug::ActivePacketSource source;
+ debug::PassivePacketSink target;
+
+ ppi::connect(source.output,target.input);
+ ppi::init();
+
+ source.output.onThrottle(handler);
+ BOOST_CHECK( ! called );
+ target.input.throttle();
+ BOOST_CHECK( called );
+ called = false;
+ target.input.unthrottle();
+ BOOST_CHECK( ! called );
+ source.output.onThrottle();
+ source.output.onUnthrottle(handler);
+ BOOST_CHECK( ! called );
+ target.input.throttle();
+ BOOST_CHECK( ! called );
+ target.input.unthrottle();
+ BOOST_CHECK( called );
+ source.output.onUnthrottle();
+ called = false;
+ BOOST_CHECK( ! called );
+ target.input.throttle();
+ target.input.unthrottle();
+ BOOST_CHECK( ! called );
+
+ BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+}
+
+BOOST_AUTO_UNIT_TEST(inputConnector)
+{
+ debug::ActivePacketSource source;
+ debug::PassivePacketSink target;
+
+ ppi::connect(source.output,target.input);
+ ppi::init();
+
+ // operator() is implicitly tested within the Active/PassivePacketSink implementation which is
+ // tested in DebugModules.test.cc
+
+ // peek() is implicitly tested within the Active/PassivePacketSink implementation
+
+ BOOST_CHECK_EQUAL ( & target.input.peer(), & source.output );
+
+ BOOST_CHECK( target.input.begin() == target.input.end() );
+ BOOST_CHECK_EQUAL( target.input.queueSize(), 0u );
+ BOOST_CHECK( target.input.empty() );
+}
+
+BOOST_AUTO_UNIT_TEST(outputConnector)
+{
+ debug::ActivePacketSource source;
+ debug::PassivePacketSink target;
+
+ ppi::connect(source.output,target.input);
+ ppi::init();
+
+ // operator() is implicitly tested within the Active/PassivePacketSource implementation which is
+ // tested in DebugModules.test.cc
+
+ BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+}
+
+namespace {
+
+ class PassiveInputTest
+ : public ppi::module::Module
+ {
+ public:
+ ppi::connector::PassiveInput input;
+
+ PassiveInputTest() : counter() {
+ noroute(input);
+ input.onRequest(&PassiveInputTest::request);
+ }
+
+ void request() {
+ ++ counter;
+ }
+
+ unsigned counter;
+ };
+}
+
+BOOST_AUTO_UNIT_TEST(passiveInput)
+{
+ debug::ActivePacketSource source;
+ PassiveInputTest target;
+
+ ppi::connect(source.output,target.input);
+ ppi::init();
+
+ BOOST_CHECK_EQUAL( & target.input.peer(), & source.output );
+
+ target.input.throttle();
+ senf::Packet p (senf::DataPacket::create());
+ source.submit(p);
+
+ BOOST_CHECK_EQUAL( target.counter, 0u );
+ BOOST_CHECK( target.input );
+ BOOST_CHECK_EQUAL( target.input.queueSize(), 1u );
+ target.input.unthrottle();
+ BOOST_CHECK( target.input );
+ BOOST_CHECK_EQUAL( target.counter, 1u );
+
+ BOOST_CHECK( target.input() == p );
+ BOOST_CHECK( ! target.input );
+
+ source.submit(p);
+
+ BOOST_CHECK_EQUAL( target.counter, 2u );
+ BOOST_CHECK( target.input.throttled() );
+ BOOST_CHECK( target.input() == p );
+ BOOST_CHECK( ! target.input.throttled() );
+
+ target.input.qdisc(ppi::ThresholdQueueing(2,0));
+
+ source.submit(p);
+ BOOST_CHECK ( ! target.input.throttled() );
+ source.submit(p);
+ BOOST_CHECK( target.input.throttled() );
+ target.input();
+ BOOST_CHECK( target.input.throttled() );
+ target.input();
+ BOOST_CHECK( ! target.input.throttled() );
+}
+
+BOOST_AUTO_UNIT_TEST(passiveOutput)
+{
+ debug::PassivePacketSource source;
+ debug::ActivePacketSink target;
+
+ ppi::connect(source.output,target.input);
+ ppi::init();
+
+ senf::Packet p (senf::DataPacket::create());
+ source.submit(p);
+
+ BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+
+ BOOST_CHECK( source.output );
+
+ source.submit(p);
+ BOOST_CHECK( target.request() == p );
+
+ // connect() is tested indirectly via ppi::connect
+}
+
+BOOST_AUTO_UNIT_TEST(activeInput)
+{
+ debug::PassivePacketSource source;
+ debug::ActivePacketSink target;
+
+ ppi::connect(source.output,target.input);
+ ppi::init();
+
+ BOOST_CHECK_EQUAL( & target.input.peer(), & source.output );
+
+ BOOST_CHECK ( ! target.input );
+
+ senf::Packet p (senf::DataPacket::create());
+ source.submit(p);
+
+ BOOST_CHECK( target.input );
+ BOOST_CHECK( target.request() == p );
+
+ source.submit(p);
+ target.input.request();
+ BOOST_CHECK_EQUAL( target.input.queueSize(), 1u );
+ BOOST_CHECK( target.input );
+ BOOST_CHECK( target.request() == p );
+}
+
+BOOST_AUTO_UNIT_TEST(activeOutput)
+{
+ debug::ActivePacketSource source;
+ debug::PassivePacketSink target;
+
+ ppi::connect(source.output,target.input);
+ ppi::init();
+
+ BOOST_CHECK_EQUAL( & source.output.peer(), & target.input );
+ BOOST_CHECK( source.output );
+ target.input.throttle();
+ BOOST_CHECK( ! source.output );
+
+ // connect() is tested indirectly via ppi::connect
+}
+
+
///////////////////////////////cc.e////////////////////////////////////////
#undef prefix_
prefix_ void senf::ppi::module::debug::PassivePacketSource::submit(Packet packet)
{
packets_.push_back(packet);
+ output.unthrottle();
}
prefix_ bool senf::ppi::module::debug::PassivePacketSource::empty()
prefix_ void senf::ppi::module::debug::PassivePacketSource::request()
{
+ BOOST_ASSERT( ! packets_.empty() );
output(packets_.front());
packets_.pop_front();
+ if (packets_.empty())
+ output.throttle();
+}
+
+prefix_ void senf::ppi::module::debug::PassivePacketSource::init()
+{
+ output.throttle();
}
///////////////////////////////////////////////////////////////////////////
private:
void request();
+ void init();
Queue packets_;
};
debug::PassivePacketSink sink;
ppi::connect(source.output, sink.input);
-
+ ppi::init();
+
senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u };
senf::Packet p (senf::DataPacket::create(data));
+ BOOST_CHECK( ! sink.input.throttled() );
+
source.submit(p);
-
+
+ BOOST_CHECK( ! sink.input.throttled() );
BOOST_CHECK_EQUAL( sink.size(), 1u );
BOOST_CHECK( ! sink.empty() );
BOOST_CHECK_EQUAL(
debug::ActivePacketSink sink;
ppi::connect(source.output, sink.input);
+ ppi::init();
senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u };
senf::Packet p (senf::DataPacket::create(data));
#include "Route.hh"
#include "Connectors.hh"
#include "EventManager.hh"
+#include "ModuleManager.hh"
#define prefix_ inline
///////////////////////////////cci.p///////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
// senf::ppi::module::Module
-prefix_ void senf::ppi::module::Module::noroute(connector::Connector & connector)
+prefix_ senf::ppi::module::Module::~Module()
{
- registerConnector(connector);
- connector.setModule(*this);
+ moduleManager().unregisterModule(*this);
}
prefix_ boost::posix_time::ptime senf::ppi::module::Module::eventTime()
// protected members
prefix_ senf::ppi::module::Module::Module()
-{}
+{
+ moduleManager().registerModule(*this);
+}
+
+prefix_ void senf::ppi::module::Module::noroute(connector::Connector & connector)
+{
+ registerConnector(connector);
+ connector.setModule(*this);
+}
////////////////////////////////////////
// private members
+prefix_ void senf::ppi::module::Module::init()
+{}
+
prefix_ senf::ppi::EventManager & senf::ppi::module::Module::eventManager()
{
return EventManager::instance();
}
+prefix_ senf::ppi::ModuleManager & senf::ppi::module::Module::moduleManager()
+{
+ return ModuleManager::instance();
+}
+
prefix_ void senf::ppi::module::Module::registerConnector(connector::Connector & connector)
{
connectorRegistry_.push_back(&connector);
class Module
: boost::noncopyable
{
+ public:
+ virtual ~Module();
+
protected:
Module();
///< event
private:
+ virtual void init();
+
EventManager & eventManager();
+ ModuleManager & moduleManager();
void registerConnector(connector::Connector & connector);
RouteBase & addRoute(std::auto_ptr<RouteBase> route);
template <class Source, class Target>
friend class detail::RouteHelper;
+ friend class senf::ppi::ModuleManager;
};
/** \brief Connect compatible connectors
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief ModuleManager non-inline non-template implementation */
+
+#include "ModuleManager.hh"
+//#include "ModuleManager.ih"
+
+// Custom includes
+#include "Scheduler/Scheduler.hh"
+#include "Module.hh"
+
+//#include "ModuleManager.mpp"
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ModuleManager
+
+prefix_ void senf::ppi::ModuleManager::init()
+{
+ ModuleRegistry::const_iterator i (moduleRegistry_.begin());
+ ModuleRegistry::const_iterator const i_end (moduleRegistry_.end());
+ for (; i != i_end; ++i)
+ (*i)->init();
+}
+
+prefix_ void senf::ppi::ModuleManager::run()
+{
+ init();
+ Scheduler::instance().process();
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+//#include "ModuleManager.mpp"
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief ModuleManager inline non-template implementation */
+
+// Custom includes
+#include <algorithm>
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ModuleManager
+
+prefix_ senf::ppi::ModuleManager & senf::ppi::ModuleManager::instance()
+{
+ static ModuleManager manager;
+ return manager;
+}
+
+prefix_ void senf::ppi::ModuleManager::registerModule(module::Module & module)
+{
+ moduleRegistry_.push_back(&module);
+}
+
+prefix_ void senf::ppi::ModuleManager::unregisterModule(module::Module & module)
+{
+ moduleRegistry_.erase(
+ std::remove(moduleRegistry_.begin(), moduleRegistry_.end(), & module),
+ moduleRegistry_.end());
+}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief ModuleManager public header */
+
+#ifndef HH_ModuleManager_
+#define HH_ModuleManager_ 1
+
+// Custom includes
+#include <vector>
+#include "predecl.hh"
+
+//#include "ModuleManager.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+
+ /** \brief
+ */
+ class ModuleManager
+ {
+ public:
+ ///////////////////////////////////////////////////////////////////////////
+ ///\name Structors and default members
+ ///@{
+
+ static ModuleManager & instance();
+
+ // default default constructor
+ // default copy constructor
+ // default copy assignment
+ // default destructor
+
+ // no conversion constructors
+
+ ///@}
+ ///////////////////////////////////////////////////////////////////////////
+
+ void registerModule(module::Module & module);
+ void unregisterModule(module::Module & module);
+
+ void init();
+ void run();
+
+ protected:
+
+ private:
+ typedef std::vector<module::Module *> ModuleRegistry;
+
+ ModuleRegistry moduleRegistry_;
+ };
+
+
+}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+#include "ModuleManager.cci"
+//#include "ModuleManager.ct"
+//#include "ModuleManager.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
//#include "Queueing.ih"
// Custom includes
+#include "Connectors.hh"
//#include "Queueing.mpp"
#define prefix_
///////////////////////////////cc.p////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::QueueingDiscipline
+// senf::ppi::ThresholdQueueing
-prefix_ senf::ppi::QueueingDiscipline::~QueueingDiscipline()
-{}
+prefix_ void senf::ppi::ThresholdQueueing::update(connector::PassiveInput & input, Event event)
+{
+ switch (event) {
+ case ENQUEUE:
+ if (input.queueSize() >= high_)
+ input.throttle();
+ break;
+ case DEQUEUE:
+ if (input.queueSize() <= low_)
+ input.unthrottle();
+ break;
+ }
+}
///////////////////////////////cc.e////////////////////////////////////////
#undef prefix_
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief Queueing inline non-template implementation */
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingDiscipline
+
+prefix_ senf::ppi::QueueingDiscipline::~QueueingDiscipline()
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ThresholdQueueing
+
+prefix_ senf::ppi::ThresholdQueueing::ThresholdQueueing(unsigned high, unsigned low)
+ : high_(high), low_(low)
+{}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
operating system by sending throttling events. The PPI will never loose a packet internally
(if not a module explicitly does so), however it may disable reception of new incoming
packets which will then probably be dropped by the operating system.
+
+ \attention Notifications may be forwarded to the QueueingDiscipline implementation
+ out-of-order: A dequeue event may be notified before the corresponding enqueue
+ event (this happens to optimize away transient throttling state changes which would
+ otherwise occur if a packet is entered into the queue and then removed from it in the
+ same processing step).
*/
class QueueingDiscipline
{
virtual ~QueueingDiscipline();
enum Event { ENQUEUE, DEQUEUE }; ///< Possible queueing events
- enum State { THROTTLED, UNTHROTTLED }; ///< Possible queueing states
- virtual State update(connector::PassiveInput & input, Event event) = 0;
+ virtual void update(connector::PassiveInput & input, Event event) = 0;
///< Calculate new queueing state
/**< Whenever the queue is manipulated, this member is
- called to calculate the new throttling state.
+ called to calculate the new throttling state. The
+ member must call \a input's \c throttle() or \c
+ unthrottle() member to set the new throttling state.
\param[in] input Connector holding the queue
- \param[in] event Type of event triggering the update
- \returns new throttling state */
+ \param[in] event Type of event triggering the update */
+ };
+
+ class ThresholdQueueing
+ : public QueueingDiscipline
+ {
+ public:
+ ThresholdQueueing(unsigned high, unsigned low);
+
+ virtual void update(connector::PassiveInput & input, Event event);
+
+ private:
+ unsigned high_;
+ unsigned low_;
};
}}
///////////////////////////////hh.e////////////////////////////////////////
-//#include "Queueing.cci"
+#include "Queueing.cci"
//#include "Queueing.ct"
//#include "Queueing.cti"
#endif
\brief Route inline non-template implementation */
// Custom includes
+#include "Connectors.hh"
+#include "Events.hh"
#define prefix_ inline
///////////////////////////////cci.p///////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
// senf::ppi::RouteBase
+prefix_ senf::ppi::RouteBase::~RouteBase()
+{}
+
////////////////////////////////////////
// protected members
: module_(&module)
{}
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::ForwardingRoute
+
+prefix_ bool senf::ppi::ForwardingRoute::autoThrottling()
+{
+ return autoThrottling_;
+}
+
+prefix_ void senf::ppi::ForwardingRoute::autoThrottling(bool state)
+{
+ autoThrottling_ = state;
+}
+
+////////////////////////////////////////
+// protected members
+
+prefix_ senf::ppi::ForwardingRoute::ForwardingRoute(module::Module & module)
+ : RouteBase(module), autoThrottling_(false)
+{}
+
+prefix_ void senf::ppi::ForwardingRoute::registerRoute(connector::ActiveConnector & connector)
+{
+ connector.registerRoute(*this);
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::ForwardingRoute::notifyThrottle()
+{
+ v_notifyThrottle();
+}
+
+prefix_ void senf::ppi::ForwardingRoute::notifyUnthrottle()
+{
+ v_notifyUnthrottle();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::NonForwardingRouteImplementation
+
+prefix_ senf::ppi::detail::NonForwardingRouteImplementation::
+NonForwardingRouteImplementation(module::Module & module, connector::InputConnector & source,
+ connector::OutputConnector & target)
+ : RouteBase(module), source_(&source), target_(&target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::NonForwardingRouteToEventImplementation
+
+prefix_
+senf::ppi::detail::NonForwardingRouteToEventImplementation::
+NonForwardingRouteToEventImplementation(module::Module & module,
+ connector::InputConnector & source,
+ EventDescriptor & target)
+ : RouteBase(module), source_(&source), target_(&target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::NonForwardingRouteFromEventImplementation
+
+prefix_
+senf::ppi::detail::NonForwardingRouteFromEventImplementation::
+NonForwardingRouteFromEventImplementation(module::Module & module, EventDescriptor & source,
+ connector::OutputConnector & target)
+ : RouteBase(module), source_(&source), target_(&target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::ForwardForwardingRouteImplementation
+
+prefix_
+senf::ppi::detail::ForwardForwardingRouteImplementation::
+ForwardForwardingRouteImplementation(module::Module & module, connector::ActiveInput & source,
+ connector::PassiveOutput & target)
+ : ForwardingRoute(module), source_(&source), target_(&target)
+{
+ registerRoute(*source_);
+}
+
+prefix_ void senf::ppi::detail::ForwardForwardingRouteImplementation::v_notifyThrottle()
+{
+ if (autoThrottling())
+ target_->notifyThrottle();
+}
+
+prefix_ void senf::ppi::detail::ForwardForwardingRouteImplementation::v_notifyUnthrottle()
+{
+ if (autoThrottling())
+ target_->notifyUnthrottle();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::BackwardForwardingRouteImplementation
+
+prefix_
+senf::ppi::detail::BackwardForwardingRouteImplementation::
+BackwardForwardingRouteImplementation(module::Module & module,
+ connector::PassiveInput & source,
+ connector::ActiveOutput & target)
+ : ForwardingRoute(module), source_(&source), target_(&target)
+{
+ registerRoute(*target_);
+}
+
+prefix_ void senf::ppi::detail::BackwardForwardingRouteImplementation::v_notifyThrottle()
+{
+ if (autoThrottling())
+ source_->notifyThrottle();
+}
+
+prefix_ void senf::ppi::detail::BackwardForwardingRouteImplementation::v_notifyUnthrottle()
+{
+ if (autoThrottling())
+ source_->notifyUnthrottle();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::ForwardForwardingRouteToEventImplementation
+
+prefix_
+senf::ppi::detail::ForwardForwardingRouteToEventImplementation::
+ForwardForwardingRouteToEventImplementation(module::Module & module,
+ connector::ActiveInput & source,
+ EventDescriptor & target)
+ : ForwardingRoute(module), source_(&source), target_(&target)
+{
+ registerRoute(*source_);
+}
+
+prefix_ void senf::ppi::detail::ForwardForwardingRouteToEventImplementation::v_notifyThrottle()
+{
+ if (autoThrottling())
+ target_->enabled(false);
+}
+
+prefix_ void
+senf::ppi::detail::ForwardForwardingRouteToEventImplementation::v_notifyUnthrottle()
+{
+ if (autoThrottling())
+ target_->enabled(true);
+}
+
+///////////////////////////////////////////////////////////////////////////
+//senf::ppi::detail::BackwardForwardingRouteFromEventImplementation
+
+prefix_
+senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::
+BackwardForwardingRouteFromEventImplementation(module::Module & module,
+ EventDescriptor & source,
+ connector::ActiveOutput & target)
+ : ForwardingRoute(module), source_(&source), target_(&target)
+{
+ registerRoute(*target_);
+}
+
+prefix_ void
+senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::v_notifyThrottle()
+{
+ if (autoThrottling())
+ source_->enabled(false);
+}
+
+prefix_ void
+senf::ppi::detail::BackwardForwardingRouteFromEventImplementation::v_notifyUnthrottle()
+{
+ if (autoThrottling())
+ source_->enabled(true);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<connector::ActiveInput, connector::PassiveOutput,
+// false, false>
+
+prefix_
+senf::ppi::detail::RouteImplementation<senf::ppi::connector::ActiveInput,
+ senf::ppi::connector::PassiveOutput,
+ false, false>::
+RouteImplementation(module::Module & module, connector::ActiveInput & source,
+ connector::PassiveOutput & target)
+ : ForwardForwardingRouteImplementation(module, source, target)
+{}
+
+////////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<connector::PassiveInput, connector::ActiveOutput,
+// false, false>
+
+prefix_
+senf::ppi::detail::RouteImplementation<senf::ppi::connector::PassiveInput,
+ senf::ppi::connector::ActiveOutput,
+ false, false>::
+RouteImplementation(module::Module & module, connector::PassiveInput & source,
+ connector::ActiveOutput & target)
+ : BackwardForwardingRouteImplementation(module, source, target)
+{}
+
///////////////////////////////cci.e///////////////////////////////////////
#undef prefix_
///////////////////////////////cti.p///////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::Route<Source,Target>
+// senf::ppi::detail::RouteImplementation<Source,Target,srcEvent,trgEvent>
-////////////////////////////////////////
-// protected members
+template <class Source, class Target, bool srcEvent, bool trgEvent>
+prefix_
+senf::ppi::detail::RouteImplementation<Source,Target,srcEvent,trgEvent>::
+RouteImplementation(module::Module & module, Source & source, Target & target)
+ : NonForwardingRouteImplementation(module, source, target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<Source, Target, true, false>
template <class Source, class Target>
-prefix_ senf::ppi::Route<Source,Target>::Route(module::Module & module, Source & source,
- Target & target)
- : Implementation(module, source, target)
+prefix_
+senf::ppi::detail::RouteImplementation<Source, Target, true, false>::
+RouteImplementation(module::Module & module, Source & source, Target & target)
+ : NonForwardingRouteFromEventImplementation(module, source, target)
{}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::detail::RouteImplementation<Source,Target,srcEvent,trgEvent>
+// senf::ppi::detail::RouteImplementation<Source, Target, false, true>
-////////////////////////////////////////
-// protected members
-
-template <bool srcEvent, bool trgEvent>
-prefix_ senf::ppi::detail::RouteImplementation<srcEvent,trgEvent>::
-RouteImplementation(module::Module & module, connector::InputConnector & source,
- connector::OutputConnector & target)
- : RouteBase(module), source_(&source), target_(&target)
+template <class Source, class Target>
+prefix_
+senf::ppi::detail::RouteImplementation<Source, Target, false, true>::
+RouteImplementation(module::Module & module, Source & source, Target & target)
+ : NonForwardingRouteToEventImplementation(module, source, target)
{}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::detail::RouteImplementation<true,false>
+// senf::ppi::detail::RouteImplementation<connector::ActiveInput, Event, false, true>
-////////////////////////////////////////
-// protected members
+template <class Event>
+prefix_
+senf::ppi::detail::RouteImplementation<senf::ppi::connector::ActiveInput, Event, false, true>::
+RouteImplementation(module::Module & module, connector::ActiveInput & source, Event & target)
+ : ForwardForwardingRouteToEventImplementation(module, source, target)
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::RouteImplementation<Event, connector::ActiveOutput, true, false>
-prefix_ senf::ppi::detail::RouteImplementation<true,false>::
-RouteImplementation(module::Module & module, EventDescriptor & source,
- connector::OutputConnector & target)
- : RouteBase(module), source_(&source), target_(&target)
+template <class Event>
+prefix_
+senf::ppi::detail::RouteImplementation<Event, senf::ppi::connector::ActiveOutput, true, false>::
+RouteImplementation(module::Module & module, Event & source, connector::ActiveOutput & target)
+ : BackwardForwardingRouteFromEventImplementation(module, source, target)
{}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::detail::RouteImplementation<false,true>
+// senf::ppi::Route<Source,Target>
////////////////////////////////////////
// protected members
-prefix_ senf::ppi::detail::RouteImplementation<false,true>::
-RouteImplementation(module::Module & module, connector::InputConnector & source,
- EventDescriptor & target)
-: RouteBase(module), source_(&source), target_(&target)
+template <class Source, class Target>
+prefix_ senf::ppi::Route<Source,Target>::Route(module::Module & module, Source & source,
+ Target & target)
+ : Implementation(module, source, target)
{}
///////////////////////////////cti.e///////////////////////////////////////
class RouteBase
{
public:
+ virtual ~RouteBase();
+
+ protected:
+ RouteBase(module::Module & module);
+
+ private:
+ module::Module * module_;
+ };
+
+ class ForwardingRoute
+ : public RouteBase
+ {
+ public:
+ bool autoThrottling();
void autoThrottling(bool state); ///< Change automatic throttle notification forwarding
/**< By default, throttle notifications are automatically
forwarded from active to passive connectors. This may
disable the event whenever a throttling notification
comes in. Respective for unthrottle notifications.
- \param[in] state New throttle forwarding state
-
- \implementation This class will be implemented using a
- baseclass, this template and several
- specializations. However, this is an implementation
- detail which does not affect the exposed
- interface. */
-
+ \param[in] state New throttle forwarding state */
+
protected:
- RouteBase(module::Module & module);
+ ForwardingRoute(module::Module & module);
+
+ // Called to register this route with the connectors forwarding information base
+ void registerRoute(connector::ActiveConnector & connector);
private:
- module::Module * module_;
+ // called to forward a throttling notification along the route
+ void notifyThrottle();
+ void notifyUnthrottle();
+
+ // Implemented in the derived classes to forward throttling notifications
+ virtual void v_notifyThrottle() = 0;
+ virtual void v_notifyUnthrottle() = 0;
+
+ bool autoThrottling_;
+
+ friend class connector::ActiveConnector;
};
}}
+// We need detail::RouteImplementation here ...
#include "Route.ih"
namespace senf {
*/
template <class Source, class Target>
class Route
- : public detail::RouteImplementation< boost::is_base_of<EventDescriptor,Source>::value,
- boost::is_base_of<EventDescriptor,Target>::value >
+ : public detail::RouteImplementation<Source,Target>
{
private:
- typedef detail::RouteImplementation<
- boost::is_base_of<EventDescriptor,Source>::value,
- boost::is_base_of<EventDescriptor,Target>::value > Implementation;
+ typedef detail::RouteImplementation<Source,Target> Implementation;
Route(module::Module & module, Source & source, Target & target);
namespace ppi {
namespace detail {
- template <bool srcEvent, bool trgEvent>
- class RouteImplementation
+ // Valid Forwarding routes:
+ // Forward throttling
+ // ActiveInput -> PassiveOutput
+ // tempalte<> RouteImplementation<ActiveInput,PassiveOutput,false,false>
+ // ActiveInput -> Event
+ // template<class Event> class RouteImplementation<ActiveInput, Event, false, true>
+ // Backward throttling
+ // PassiveInput -> ActiveOutput
+ // template<> RouteImplementation<PassiveInput, ActiveOutput, false, false>
+ // Event -> ActiveOutput
+ // template<class Event> class RouteImplementation<Event, ActiveOutput, true, false>
+
+ class NonForwardingRouteImplementation
: public RouteBase
{
protected:
- RouteImplementation(module::Module & module,
- connector::InputConnector & source,
- connector::OutputConnector & target);
+ NonForwardingRouteImplementation(module::Module & module,
+ connector::InputConnector & source,
+ connector::OutputConnector & target);
private:
connector::InputConnector * source_;
connector::OutputConnector * target_;
};
-# ifndef DOXYGEN
+ class NonForwardingRouteToEventImplementation
+ : public RouteBase
+ {
+ protected:
+ NonForwardingRouteToEventImplementation(module::Module & module,
+ connector::InputConnector & source,
+ EventDescriptor & target);
+
+ private:
+ connector::InputConnector * source_;
+ EventDescriptor * target_;
+ };
- template <>
- class RouteImplementation<true,false>
+ class NonForwardingRouteFromEventImplementation
: public RouteBase
{
protected:
- RouteImplementation(module::Module & module,
- EventDescriptor & source,
- connector::OutputConnector & target);
+ NonForwardingRouteFromEventImplementation(module::Module & module,
+ EventDescriptor & source,
+ connector::OutputConnector & target);
private:
EventDescriptor * source_;
connector::OutputConnector * target_;
};
- template<>
- class RouteImplementation<false,true>
- : public RouteBase
+ class ForwardForwardingRouteImplementation
+ : public ForwardingRoute
{
protected:
- RouteImplementation(module::Module & module,
- connector::InputConnector & source,
- EventDescriptor & target);
+ ForwardForwardingRouteImplementation(module::Module & module,
+ connector::ActiveInput & source,
+ connector::PassiveOutput & target);
private:
- connector::InputConnector * source_;
+ virtual void v_notifyThrottle();
+ virtual void v_notifyUnthrottle();
+
+ connector::ActiveInput * source_;
+ connector::PassiveOutput * target_;
+ };
+
+ class BackwardForwardingRouteImplementation
+ : public ForwardingRoute
+ {
+ protected:
+ BackwardForwardingRouteImplementation(module::Module & module,
+ connector::PassiveInput & source,
+ connector::ActiveOutput & target);
+
+ private:
+ virtual void v_notifyThrottle();
+ virtual void v_notifyUnthrottle();
+
+ connector::PassiveInput * source_;
+ connector::ActiveOutput * target_;
+ };
+
+ class ForwardForwardingRouteToEventImplementation
+ : public ForwardingRoute
+ {
+ protected:
+ ForwardForwardingRouteToEventImplementation(module::Module & module,
+ connector::ActiveInput & source,
+ EventDescriptor & target);
+
+ private:
+ virtual void v_notifyThrottle();
+ virtual void v_notifyUnthrottle();
+
+ connector::ActiveInput * source_;
EventDescriptor * target_;
};
+ class BackwardForwardingRouteFromEventImplementation
+ : public ForwardingRoute
+ {
+ protected:
+ BackwardForwardingRouteFromEventImplementation(module::Module & module,
+ EventDescriptor & source,
+ connector::ActiveOutput & target);
+
+ private:
+ virtual void v_notifyThrottle();
+ virtual void v_notifyUnthrottle();
+
+ EventDescriptor * source_;
+ connector::ActiveOutput * target_;
+ };
+
+ template <class Source, class Target, bool srcEvent, bool trgEvent>
+ class RouteImplementation
+ : public NonForwardingRouteImplementation
+ {
+ protected:
+ RouteImplementation(module::Module & module, Source & source, Target & target);
+ };
+
+# ifndef DOXYGEN
+
+ template <class Source, class Target>
+ class RouteImplementation<Source, Target, true, false>
+ : public NonForwardingRouteFromEventImplementation
+ {
+ protected:
+ RouteImplementation(module::Module & module, Source & source, Target & target);
+ };
+
+ template<class Source, class Target>
+ class RouteImplementation<Source, Target, false, true>
+ : public NonForwardingRouteToEventImplementation
+ {
+ protected:
+ RouteImplementation(module::Module & module, Source & source, Target & target);
+ };
+
+ template<>
+ class RouteImplementation<connector::ActiveInput, connector::PassiveOutput, false, false>
+ : public ForwardForwardingRouteImplementation
+ {
+ protected:
+ RouteImplementation(module::Module & module, connector::ActiveInput & source,
+ connector::PassiveOutput & target);
+ };
+
+ template <class Event>
+ class RouteImplementation<connector::ActiveInput, Event, false, true>
+ : public ForwardForwardingRouteToEventImplementation
+ {
+ protected:
+ RouteImplementation(module::Module & module, connector::ActiveInput & source,
+ Event & target);
+ };
+
+ template <>
+ class RouteImplementation<connector::PassiveInput, connector::ActiveOutput, false, false>
+ : public BackwardForwardingRouteImplementation
+ {
+ protected:
+ RouteImplementation(module::Module & module, connector::PassiveInput & source,
+ connector::ActiveOutput & target);
+ };
+
+ template <class Event>
+ class RouteImplementation<Event, connector::ActiveOutput, true, false>
+ : public BackwardForwardingRouteFromEventImplementation
+ {
+ protected:
+ RouteImplementation(module::Module & module, Event & source,
+ connector::ActiveOutput & target);
+ };
+
# endif
}}}
// Custom includes
#include "Connectors.hh"
-#include "Scheduler/Scheduler.hh"
+#include "ModuleManager.hh"
#define prefix_ inline
///////////////////////////////cci.p///////////////////////////////////////
prefix_ void senf::ppi::run()
{
- Scheduler::instance().process();
+ ModuleManager::instance().run();
+}
+
+prefix_ void senf::ppi::init()
+{
+ ModuleManager::instance().init();
}
///////////////////////////////cci.e///////////////////////////////////////
void connect(connector::PassiveOutput & source, connector::ActiveInput & target);
void run();
+ void init();
}}
template <class EventType=void> class EventImplementation;
class EventManager;
class RouteBase;
+ class ForwardingRoute;
template <class Source, class Target> class Route;
class QueueingDiscipline;
+ class ModuleManager;
namespace detail {
class EventBindingBase;
template <class EvImpl> class EventBinding;
template <class EventType> struct EventArgType;
- template <bool srcEvent, bool trgEvent> class RouteImplementation;
+ class NonForwardingRouteImplementation;
+ class NonForwardingRouteToEventImplementation;
+ class NonForwardingRouteFromEventImplementation;
+ class ForwardForwardingRouteImplementation;
+ class BackwardForwardingRouteImplementation;
+ class ForwardForwardingRouteToEventImplementation;
+ class BackwardForwardingRouteFromEventImplementation;
+ template <class Source, class Target,
+ bool srcEvent = boost::is_base_of<EventDescriptor,Source>::value,
+ bool trgEvent = boost::is_base_of<EventDescriptor,Target>::value>
+ class RouteImplementation;
}
namespace module {