// senf::ppi::connector::Connector
prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer()
+ const
{
BOOST_ASSERT(peer_);
return *peer_;
}
prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module()
+ const
{
BOOST_ASSERT(module_);
return *module_;
: peer_(), module_()
{}
+prefix_ senf::ppi::connector::Connector::~Connector()
+{
+ if (peer_)
+ peer_->peer_ = 0;
+}
+
+prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
+{
+ BOOST_ASSERT( ! peer_ && ! target.peer_ );
+ peer_ = & target;
+ target.peer_ = this;
+}
+
////////////////////////////////////////
// private members
}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveConnector
+// senf::ppi::connector::PassiveConnector
-////////////////////////////////////////
-// protected members
+prefix_ bool senf::ppi::connector::PassiveConnector::throttled()
+ const
+{
+ return nativeThrottled_ || remoteThrottled_;
+}
-prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
-{}
+prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled()
+ const
+{
+ return nativeThrottled_;
+}
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveConnector
+prefix_ void senf::ppi::connector::PassiveConnector::throttle()
+{
+ if (!throttled()) {
+ nativeThrottled_ = true;
+ emitThrottle();
+ }
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::unthrottle()
+{
+ if (throttled() && ! remoteThrottled_) {
+ nativeThrottled_ = false;
+ emitUnthrottle();
+ } else
+ nativeThrottled_ = false;
+
+}
prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer()
+ const
{
return dynamic_cast<ActiveConnector&>(Connector::peer());
}
// protected members
prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector()
- : callback_()
+ : callback_(), remoteThrottled_(), nativeThrottled_()
{}
prefix_ void senf::ppi::connector::PassiveConnector::emit()
{
BOOST_ASSERT(callback_);
- callback_();
+ if (!throttled())
+ callback_();
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle()
+{
+ if (!throttled()) {
+ remoteThrottled_ = true;
+ emitThrottle();
+ }
+ else
+ remoteThrottled_ = true;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle()
+{
+ peer().notifyThrottle();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle()
+{
+ peer().notifyUnthrottle();
+ v_unthrottleEvent();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
+{}
+
+prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
+{
+ routes_.push_back(&route);
}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::InputConnector
+// senf::ppi::connector::ActiveConnector
-prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer()
+ const
{
- Packet p (queue_.back());
- queue_.pop_back();
- v_dequeueEvent();
- return p;
+ return dynamic_cast<PassiveConnector&>(Connector::peer());
}
-prefix_ bool senf::ppi::connector::InputConnector::boolean_test()
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle()
{
- ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?)
- return ! empty();
+ throttleCallback_ = Callback();
+}
+
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle()
+{
+ unthrottleCallback_ = Callback();
+}
+
+prefix_ bool senf::ppi::connector::ActiveConnector::throttled()
+ const
+{
+ return peer().throttled();
+}
+
+////////////////////////////////////////
+// protected members
+
+prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
+ : throttleCallback_(), unthrottleCallback_(), notifyRoutes_()
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::InputConnector
+
+prefix_ senf::Packet senf::ppi::connector::InputConnector::read()
+{
+ return operator()();
}
prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer()
+ const
{
return dynamic_cast<OutputConnector &>(Connector::peer());
}
prefix_ senf::ppi::connector::InputConnector::queue_iterator
senf::ppi::connector::InputConnector::begin()
+ const
{
return queue_.begin();
}
prefix_ senf::ppi::connector::InputConnector::queue_iterator
senf::ppi::connector::InputConnector::end()
+ const
{
return queue_.end();
}
prefix_ senf::Packet senf::ppi::connector::InputConnector::peek()
+ const
{
+ BOOST_ASSERT( ! queue_.empty() );
return queue_.back();
}
prefix_ senf::ppi::connector::InputConnector::size_type
senf::ppi::connector::InputConnector::queueSize()
+ const
{
return queue_.size();
}
prefix_ bool senf::ppi::connector::InputConnector::empty()
+ const
{
return queue_.empty();
}
peer().enqueue(p);
}
+prefix_ void senf::ppi::connector::OutputConnector::write(Packet p)
+{
+ operator()(p);
+}
+
prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer()
+ const
{
return dynamic_cast<InputConnector&>(Connector::peer());
}
// senf::ppi::connector::PassiveInput
prefix_ senf::ppi::connector::PassiveInput::PassiveInput()
- : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED)
+ : qdisc_(new ThresholdQueueing(1,0))
{}
prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer()
+ const
{
return dynamic_cast<ActiveOutput&>(Connector::peer());
}
+prefix_ bool senf::ppi::connector::PassiveInput::boolean_test()
+ const
+{
+ return ! empty();
+}
+
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveOutput
+// senf::ppi::connector::PassiveOutput
-prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::ActiveOutput::peer()
+prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer()
+ const
{
return dynamic_cast<ActiveInput&>(Connector::peer());
}
+prefix_ bool senf::ppi::connector::PassiveOutput::boolean_test()
+ const
+{
+ return true;
+}
+
+prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target)
+{
+ Connector::connect(target);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveInput
+
+prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer()
+ const
+{
+ return dynamic_cast<PassiveOutput&>(Connector::peer());
+}
+
+prefix_ bool senf::ppi::connector::ActiveInput::boolean_test()
+ const
+{
+ return ! empty() || ! peer().throttled();
+}
+
+prefix_ void senf::ppi::connector::ActiveInput::request()
+{
+ peer().emit();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveOutput
+
+prefix_ senf::ppi::connector::PassiveInput & senf::ppi::connector::ActiveOutput::peer()
+ const
+{
+ return dynamic_cast<PassiveInput&>(Connector::peer());
+}
+
+prefix_ bool senf::ppi::connector::ActiveOutput::boolean_test()
+ const
+{
+ return ! peer().throttled();
+}
+
prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target)
{
Connector::connect(target);