X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=PPI%2FConnectors.cci;h=5dcb08ed6f9c7b3d2855cc2b0ee2ca46f6b48706;hb=53a3d02e7fde841badf42555eba87ccef4566073;hp=60be4bde3b785b22a9d2beefb678978fe6be4d92;hpb=81ffa1c459b96dd44472bcef37e1e373934ee138;p=senf.git diff --git a/PPI/Connectors.cci b/PPI/Connectors.cci index 60be4bd..5dcb08e 100644 --- a/PPI/Connectors.cci +++ b/PPI/Connectors.cci @@ -32,12 +32,14 @@ // senf::ppi::connector::Connector prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer() + const { BOOST_ASSERT(peer_); return *peer_; } prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module() + const { BOOST_ASSERT(module_); return *module_; @@ -50,6 +52,19 @@ prefix_ senf::ppi::connector::Connector::Connector() : peer_(), module_() {} +prefix_ senf::ppi::connector::Connector::~Connector() +{ + if (peer_) + peer_->peer_ = 0; +} + +prefix_ void senf::ppi::connector::Connector::connect(Connector & target) +{ + BOOST_ASSERT( ! peer_ && ! target.peer_ ); + peer_ = & target; + target.peer_ = this; +} + //////////////////////////////////////// // private members @@ -59,18 +74,40 @@ prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module) } /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::ActiveConnector +// senf::ppi::connector::PassiveConnector -//////////////////////////////////////// -// protected members +prefix_ bool senf::ppi::connector::PassiveConnector::throttled() + const +{ + return nativeThrottled_ || remoteThrottled_; +} -prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector() -{} +prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled() + const +{ + return nativeThrottled_; +} -/////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::PassiveConnector +prefix_ void senf::ppi::connector::PassiveConnector::throttle() +{ + if (!throttled()) { + nativeThrottled_ = true; + emitThrottle(); + } +} + +prefix_ void senf::ppi::connector::PassiveConnector::unthrottle() +{ + if (throttled() && ! remoteThrottled_) { + nativeThrottled_ = false; + emitUnthrottle(); + } else + nativeThrottled_ = false; + +} prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer() + const { return dynamic_cast(Connector::peer()); } @@ -79,61 +116,119 @@ prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveCon // protected members prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector() - : callback_() + : callback_(), remoteThrottled_(), nativeThrottled_() {} prefix_ void senf::ppi::connector::PassiveConnector::emit() { BOOST_ASSERT(callback_); - callback_(); + if (!throttled()) + callback_(); +} + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle() +{ + if (!throttled()) { + remoteThrottled_ = true; + emitThrottle(); + } + else + remoteThrottled_ = true; +} + +prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle() +{ + peer().notifyThrottle(); +} + +prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle() +{ + peer().notifyUnthrottle(); + v_unthrottleEvent(); +} + +prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent() +{} + +prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route) +{ + routes_.push_back(&route); } /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::InputConnector +// senf::ppi::connector::ActiveConnector -prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()() +prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer() + const { - Packet p (queue_.back()); - queue_.pop_back(); - v_dequeueEvent(); - return p; + return dynamic_cast(Connector::peer()); } -prefix_ bool senf::ppi::connector::InputConnector::boolean_test() +prefix_ void senf::ppi::connector::ActiveConnector::onThrottle() { - ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?) - return ! empty(); + throttleCallback_ = Callback(); } +prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle() +{ + unthrottleCallback_ = Callback(); +} + +prefix_ bool senf::ppi::connector::ActiveConnector::throttled() + const +{ + return peer().throttled(); +} + +//////////////////////////////////////// +// protected members + +prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector() + : throttleCallback_(), unthrottleCallback_(), notifyRoutes_() +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::InputConnector + prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer() + const { return dynamic_cast(Connector::peer()); } prefix_ senf::ppi::connector::InputConnector::queue_iterator senf::ppi::connector::InputConnector::begin() + const { return queue_.begin(); } prefix_ senf::ppi::connector::InputConnector::queue_iterator senf::ppi::connector::InputConnector::end() + const { return queue_.end(); } prefix_ senf::Packet senf::ppi::connector::InputConnector::peek() + const { + BOOST_ASSERT( ! queue_.empty() ); return queue_.back(); } prefix_ senf::ppi::connector::InputConnector::size_type senf::ppi::connector::InputConnector::queueSize() + const { return queue_.size(); } prefix_ bool senf::ppi::connector::InputConnector::empty() + const { return queue_.empty(); } @@ -162,6 +257,7 @@ prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet p) } prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer() + const { return dynamic_cast(Connector::peer()); } @@ -176,22 +272,76 @@ prefix_ senf::ppi::connector::OutputConnector::OutputConnector() // senf::ppi::connector::PassiveInput prefix_ senf::ppi::connector::PassiveInput::PassiveInput() - : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED) + : qdisc_(new ThresholdQueueing(1,0)) {} prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer() + const { return dynamic_cast(Connector::peer()); } +prefix_ bool senf::ppi::connector::PassiveInput::boolean_test() + const +{ + return ! empty(); +} + /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::ActiveOutput +// senf::ppi::connector::PassiveOutput -prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::ActiveOutput::peer() +prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer() + const { return dynamic_cast(Connector::peer()); } +prefix_ bool senf::ppi::connector::PassiveOutput::boolean_test() + const +{ + return true; +} + +prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target) +{ + Connector::connect(target); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveInput + +prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer() + const +{ + return dynamic_cast(Connector::peer()); +} + +prefix_ bool senf::ppi::connector::ActiveInput::boolean_test() + const +{ + return ! empty() || ! peer().throttled(); +} + +prefix_ void senf::ppi::connector::ActiveInput::request() +{ + peer().emit(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveOutput + +prefix_ senf::ppi::connector::PassiveInput & senf::ppi::connector::ActiveOutput::peer() + const +{ + return dynamic_cast(Connector::peer()); +} + +prefix_ bool senf::ppi::connector::ActiveOutput::boolean_test() + const +{ + return ! peer().throttled(); +} + prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target) { Connector::connect(target);