Socket/Protocols/Inet: Fix off-by-one error in INet6Address
[senf.git] / PPI / Connectors.cc
index d8e74dc..058a85c 100644 (file)
 //#include "Connectors.ih"
 
 // Custom includes
+#include "Route.hh"
 
 //#include "Connectors.mpp"
 #define prefix_
 ///////////////////////////////cc.p////////////////////////////////////////
 
 ///////////////////////////////////////////////////////////////////////////
-// protected members
+// senf::ppi::connector::PassiveConnector
 
-prefix_ senf::ppi::connector::Connector::~Connector()
-{}
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle()
+{
+    if (throttled() && !nativeThrottled_) {
+        Routes::const_iterator i (routes_.begin());
+        Routes::const_iterator const i_end (routes_.end());
+        for (; i != i_end; ++i)
+            if ((*i)->throttled())
+                break;
+        if (i == i_end) {
+            remoteThrottled_ = false;
+            emitUnthrottle();
+        }
+    } 
+    else
+        remoteThrottled_ = false;
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveConnector
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
+{
+    if (throttleCallback_)
+        throttleCallback_();
+    NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+    NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+    for (; i != i_end; ++i)
+        (*i)->notifyThrottle();
+}
+
+prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
+{
+    if (unthrottleCallback_)
+        unthrottleCallback_();
+    NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+    NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+    for (; i != i_end; ++i)
+        (*i)->notifyUnthrottle();
+}
 
-prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
+prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route)
 {
-    peer_ = & target;
-    target.peer_ = this;
+    notifyRoutes_.push_back(&route);
 }
 
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::InputConnector
 
+prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+{
+    if (empty())
+        v_requestEvent();
+    Packet p;
+    if (! empty()) {
+        p = peek();
+        queue_.pop_back();
+        v_dequeueEvent();
+    }
+    return p;
+}
+
 ////////////////////////////////////////
 // private members
 
+prefix_ void senf::ppi::connector::InputConnector::v_requestEvent()
+{}
+
 prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent()
 {}
 
@@ -57,6 +116,17 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent()
 {}
 
 ///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveInput
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::ActiveInput::v_requestEvent()
+{
+    request();
+}
+
+///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::PassiveInput
 
 ////////////////////////////////////////
@@ -64,15 +134,25 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent()
 
 prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent()
 {
-    ///\fixme Emit notifications when qstate_ changes
-    qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
     emit();
+    qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
 }
 
 prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent()
 {
-    ///\fixme Emit notifications when qstate_ changes
-    qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+    qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+}
+
+prefix_ void senf::ppi::connector::PassiveInput::v_unthrottleEvent()
+{
+    size_type n (queueSize());
+    while (n) {
+        emit();
+        size_type nn (queueSize());
+        if (n == nn)
+            break;
+        n = nn;
+    }
 }
 
 ///////////////////////////////cc.e////////////////////////////////////////