X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=PPI%2FConnectors.cc;h=058a85c1d0b57265a5939a5124f0cd9f7ae2a254;hb=44c966bc9d744d0926cffd5184fdb77a62564c16;hp=d8e74dcaf77819bd478937b731c0897370aafaaa;hpb=81ffa1c459b96dd44472bcef37e1e373934ee138;p=senf.git diff --git a/PPI/Connectors.cc b/PPI/Connectors.cc index d8e74dc..058a85c 100644 --- a/PPI/Connectors.cc +++ b/PPI/Connectors.cc @@ -27,29 +27,88 @@ //#include "Connectors.ih" // Custom includes +#include "Route.hh" //#include "Connectors.mpp" #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// /////////////////////////////////////////////////////////////////////////// -// protected members +// senf::ppi::connector::PassiveConnector -prefix_ senf::ppi::connector::Connector::~Connector() -{} +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle() +{ + if (throttled() && !nativeThrottled_) { + Routes::const_iterator i (routes_.begin()); + Routes::const_iterator const i_end (routes_.end()); + for (; i != i_end; ++i) + if ((*i)->throttled()) + break; + if (i == i_end) { + remoteThrottled_ = false; + emitUnthrottle(); + } + } + else + remoteThrottled_ = false; +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveConnector + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle() +{ + if (throttleCallback_) + throttleCallback_(); + NotifyRoutes::const_iterator i (notifyRoutes_.begin()); + NotifyRoutes::const_iterator const i_end (notifyRoutes_.end()); + for (; i != i_end; ++i) + (*i)->notifyThrottle(); +} + +prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle() +{ + if (unthrottleCallback_) + unthrottleCallback_(); + NotifyRoutes::const_iterator i (notifyRoutes_.begin()); + NotifyRoutes::const_iterator const i_end (notifyRoutes_.end()); + for (; i != i_end; ++i) + (*i)->notifyUnthrottle(); +} -prefix_ void senf::ppi::connector::Connector::connect(Connector & target) +prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route) { - peer_ = & target; - target.peer_ = this; + notifyRoutes_.push_back(&route); } /////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::InputConnector +prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()() +{ + if (empty()) + v_requestEvent(); + Packet p; + if (! empty()) { + p = peek(); + queue_.pop_back(); + v_dequeueEvent(); + } + return p; +} + //////////////////////////////////////// // private members +prefix_ void senf::ppi::connector::InputConnector::v_requestEvent() +{} + prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent() {} @@ -57,6 +116,17 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent() {} /////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveInput + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::ActiveInput::v_requestEvent() +{ + request(); +} + +/////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::PassiveInput //////////////////////////////////////// @@ -64,15 +134,25 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent() prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent() { - ///\fixme Emit notifications when qstate_ changes - qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE); emit(); + qdisc_->update(*this, QueueingDiscipline::ENQUEUE); } prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent() { - ///\fixme Emit notifications when qstate_ changes - qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE); + qdisc_->update(*this, QueueingDiscipline::DEQUEUE); +} + +prefix_ void senf::ppi::connector::PassiveInput::v_unthrottleEvent() +{ + size_type n (queueSize()); + while (n) { + emit(); + size_type nn (queueSize()); + if (n == nn) + break; + n = nn; + } } ///////////////////////////////cc.e////////////////////////////////////////