PPI: Add optional template arg for packet type to connectors
[senf.git] / PPI / Connectors.cci
index 60be4bd..9e0f832 100644 (file)
@@ -1,8 +1,8 @@
 // $Id$
 //
 // Copyright (C) 2007 
-// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
-// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Fraunhofer Institute for Open Communication Systems (FOKUS) 
+// Competence Center NETwork research (NET), St. Augustin, GERMANY 
 //     Stefan Bund <g0dil@berlios.de>
 //
 // This program is free software; you can redistribute it and/or modify
 // senf::ppi::connector::Connector
 
 prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer()
+    const
 {
     BOOST_ASSERT(peer_);
     return *peer_;
 }
 
 prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module()
+    const
 {
     BOOST_ASSERT(module_);
     return *module_;
@@ -50,6 +52,19 @@ prefix_ senf::ppi::connector::Connector::Connector()
     : peer_(), module_()
 {}
 
+prefix_ senf::ppi::connector::Connector::~Connector()
+{
+    if (peer_)
+        peer_->peer_ = 0;
+}
+
+prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
+{
+    BOOST_ASSERT( ! peer_ && ! target.peer_ );
+    peer_ = & target;
+    target.peer_ = this;
+}
+
 ////////////////////////////////////////
 // private members
 
@@ -59,18 +74,40 @@ prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module)
 }
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveConnector
+// senf::ppi::connector::PassiveConnector
 
-////////////////////////////////////////
-// protected members
+prefix_ bool senf::ppi::connector::PassiveConnector::throttled()
+    const
+{
+    return nativeThrottled_ || remoteThrottled_;
+}
 
-prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
-{}
+prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled()
+    const
+{
+    return nativeThrottled_;
+}
 
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveConnector
+prefix_ void senf::ppi::connector::PassiveConnector::throttle()
+{
+    if (!throttled()) {
+        nativeThrottled_ = true;
+        emitThrottle();
+    }
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::unthrottle()
+{
+    if (throttled() && ! remoteThrottled_) {
+        nativeThrottled_ = false;
+        emitUnthrottle();
+    } else
+        nativeThrottled_ = false;
+
+}
 
 prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer()
+    const
 {
     return dynamic_cast<ActiveConnector&>(Connector::peer());
 }
@@ -79,61 +116,124 @@ prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveCon
 // protected members
 
 prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector()
-    : callback_()
+    : callback_(), remoteThrottled_(), nativeThrottled_()
 {}
 
 prefix_ void senf::ppi::connector::PassiveConnector::emit()
 {
     BOOST_ASSERT(callback_);
-    callback_();
+    if (!throttled())
+        callback_();
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle()
+{
+    if (!throttled()) {
+        remoteThrottled_ = true;
+        emitThrottle();
+    } 
+    else 
+        remoteThrottled_ = true;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle()
+{
+    peer().notifyThrottle();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle()
+{
+    peer().notifyUnthrottle();
+    v_unthrottleEvent();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
+{}
+
+prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
+{
+    routes_.push_back(&route);
 }
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::InputConnector
+// senf::ppi::connector::ActiveConnector
 
-prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer()
+    const
 {
-    Packet p (queue_.back());
-    queue_.pop_back();
-    v_dequeueEvent();
-    return p;
+    return dynamic_cast<PassiveConnector&>(Connector::peer());
 }
 
-prefix_ bool senf::ppi::connector::InputConnector::boolean_test()
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle()
 {
-    ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?)
-    return ! empty();
+    throttleCallback_ = Callback();
+}
+
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle()
+{
+    unthrottleCallback_ = Callback();
+}
+
+prefix_ bool senf::ppi::connector::ActiveConnector::throttled()
+    const
+{
+    return peer().throttled();
+}
+
+////////////////////////////////////////
+// protected members
+
+prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
+    : throttleCallback_(), unthrottleCallback_(), notifyRoutes_()
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::InputConnector
+
+prefix_ senf::Packet senf::ppi::connector::InputConnector::read()
+{
+    return operator()();
 }
 
 prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer()
+    const
 {
     return dynamic_cast<OutputConnector &>(Connector::peer());
 }
 
 prefix_ senf::ppi::connector::InputConnector::queue_iterator
 senf::ppi::connector::InputConnector::begin()
+    const
 {
     return queue_.begin();
 }
 
 prefix_ senf::ppi::connector::InputConnector::queue_iterator
 senf::ppi::connector::InputConnector::end()
+    const
 {
     return queue_.end();
 }
 
 prefix_ senf::Packet senf::ppi::connector::InputConnector::peek()
+    const
 {
+    BOOST_ASSERT( ! queue_.empty() );
     return queue_.back();
 }
 
 prefix_ senf::ppi::connector::InputConnector::size_type
 senf::ppi::connector::InputConnector::queueSize()
+    const
 {
     return queue_.size();
 }
 
 prefix_ bool senf::ppi::connector::InputConnector::empty()
+    const
 {
     return queue_.empty();
 }
@@ -161,7 +261,13 @@ prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet p)
     peer().enqueue(p);
 }
 
+prefix_ void senf::ppi::connector::OutputConnector::write(Packet p)
+{
+    operator()(p);
+}
+
 prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer()
+    const
 {
     return dynamic_cast<InputConnector&>(Connector::peer());
 }
@@ -173,26 +279,80 @@ prefix_ senf::ppi::connector::OutputConnector::OutputConnector()
 {}
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveInput
+// senf::ppi::connector::GenericPassiveInput
 
-prefix_ senf::ppi::connector::PassiveInput::PassiveInput()
-    : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED)
+prefix_ senf::ppi::connector::GenericPassiveInput::GenericPassiveInput()
+    : qdisc_(new ThresholdQueueing(1,0))
 {}
 
-prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer()
+prefix_ senf::ppi::connector::GenericActiveOutput & senf::ppi::connector::GenericPassiveInput::peer()
+    const
+{
+    return dynamic_cast<GenericActiveOutput&>(Connector::peer());
+}
+
+prefix_ bool senf::ppi::connector::GenericPassiveInput::boolean_test()
+    const
 {
-    return dynamic_cast<ActiveOutput&>(Connector::peer());
+    return ! empty();
 }
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveOutput
+// senf::ppi::connector::GenericPassiveOutput
+
+prefix_ senf::ppi::connector::GenericActiveInput & senf::ppi::connector::GenericPassiveOutput::peer()
+    const
+{
+    return dynamic_cast<GenericActiveInput&>(Connector::peer());
+}
+
+prefix_ bool senf::ppi::connector::GenericPassiveOutput::boolean_test()
+    const
+{
+    return  true;
+}
+
+prefix_ void senf::ppi::connector::GenericPassiveOutput::connect(GenericActiveInput & target)
+{
+    Connector::connect(target);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::GenericActiveInput
+
+prefix_ senf::ppi::connector::GenericPassiveOutput & senf::ppi::connector::GenericActiveInput::peer()
+    const
+{
+    return dynamic_cast<GenericPassiveOutput&>(Connector::peer());
+}
+
+prefix_ bool senf::ppi::connector::GenericActiveInput::boolean_test()
+    const
+{
+    return ! empty() || ! peer().throttled();
+}
+
+prefix_ void senf::ppi::connector::GenericActiveInput::request()
+{
+    peer().emit();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::GenericActiveOutput
+
+prefix_ senf::ppi::connector::GenericPassiveInput & senf::ppi::connector::GenericActiveOutput::peer()
+    const
+{
+    return dynamic_cast<GenericPassiveInput&>(Connector::peer());
+}
 
-prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::ActiveOutput::peer()
+prefix_ bool senf::ppi::connector::GenericActiveOutput::boolean_test()
+    const
 {
-    return dynamic_cast<ActiveInput&>(Connector::peer());
+    return ! peer().throttled();
 }
 
-prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target)
+prefix_ void senf::ppi::connector::GenericActiveOutput::connect(GenericPassiveInput & target)
 {
     Connector::connect(target);
 }