// $Id$
//
-// Copyright (C) 2007
-// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
-// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Copyright (C) 2007
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
// Stefan Bund <g0dil@berlios.de>
//
// This program is free software; you can redistribute it and/or modify
\brief Connectors inline non-template implementation */
// Custom includes
+#include "../Utils/TypeInfo.hh"
+#include "../Utils/senfassert.hh"
#define prefix_ inline
///////////////////////////////cci.p///////////////////////////////////////
// senf::ppi::connector::Connector
prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer()
+ const
{
- BOOST_ASSERT(peer_);
+ SENF_ASSERT(peer_);
return *peer_;
}
prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module()
+ const
{
- BOOST_ASSERT(module_);
+ SENF_ASSERT(module_);
return *module_;
}
: peer_(), module_()
{}
+prefix_ senf::ppi::connector::Connector::~Connector()
+{
+ if (peer_)
+ peer_->peer_ = 0;
+}
+
////////////////////////////////////////
// private members
}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveConnector
+// senf::ppi::connector::PassiveConnector
-////////////////////////////////////////
-// protected members
+prefix_ bool senf::ppi::connector::PassiveConnector::throttled()
+ const
+{
+ return nativeThrottled_ || remoteThrottled_;
+}
-prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
-{}
+prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled()
+ const
+{
+ return nativeThrottled_;
+}
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveConnector
+prefix_ void senf::ppi::connector::PassiveConnector::throttle()
+{
+ if (!throttled()) {
+ nativeThrottled_ = true;
+ emitThrottle();
+ }
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::unthrottle()
+{
+ if (throttled() && ! remoteThrottled_) {
+ nativeThrottled_ = false;
+ emitUnthrottle();
+ } else
+ nativeThrottled_ = false;
+
+}
prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer()
+ const
{
return dynamic_cast<ActiveConnector&>(Connector::peer());
}
// protected members
prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector()
- : callback_()
+ : callback_(), remoteThrottled_(), nativeThrottled_()
{}
prefix_ void senf::ppi::connector::PassiveConnector::emit()
{
- BOOST_ASSERT(callback_);
- callback_();
+ SENF_ASSERT(callback_);
+ if (!throttled())
+ callback_();
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle()
+{
+ if (!throttled()) {
+ remoteThrottled_ = true;
+ emitThrottle();
+ }
+ else
+ remoteThrottled_ = true;
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle()
+{
+ peer().notifyThrottle();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle()
+{
+ peer().notifyUnthrottle();
+ v_unthrottleEvent();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
+{}
+
+prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
+{
+ routes_.push_back(&route);
}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::InputConnector
+// senf::ppi::connector::ActiveConnector
-prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
+prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer()
+ const
{
- v_requestEvent();
- Packet p (peek());
- queue_.pop_back();
- v_dequeueEvent();
- return p;
+ return dynamic_cast<PassiveConnector&>(Connector::peer());
}
-prefix_ bool senf::ppi::connector::InputConnector::boolean_test()
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle()
{
- ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?)
- return ! empty();
+ throttleCallback_ = Callback();
+}
+
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle()
+{
+ unthrottleCallback_ = Callback();
+}
+
+prefix_ bool senf::ppi::connector::ActiveConnector::throttled()
+ const
+{
+ return peer().throttled();
+}
+
+////////////////////////////////////////
+// protected members
+
+prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
+ : throttleCallback_(), unthrottleCallback_(), notifyRoutes_()
+{}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::InputConnector
+
+prefix_ senf::Packet senf::ppi::connector::InputConnector::read()
+{
+ return operator()();
}
prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer()
+ const
{
return dynamic_cast<OutputConnector &>(Connector::peer());
}
prefix_ senf::ppi::connector::InputConnector::queue_iterator
senf::ppi::connector::InputConnector::begin()
+ const
{
return queue_.begin();
}
prefix_ senf::ppi::connector::InputConnector::queue_iterator
senf::ppi::connector::InputConnector::end()
+ const
{
return queue_.end();
}
prefix_ senf::Packet senf::ppi::connector::InputConnector::peek()
+ const
{
- BOOST_ASSERT( ! queue_.empty() );
+ SENF_ASSERT( ! queue_.empty() );
return queue_.back();
}
prefix_ senf::ppi::connector::InputConnector::size_type
senf::ppi::connector::InputConnector::queueSize()
+ const
{
return queue_.size();
}
prefix_ bool senf::ppi::connector::InputConnector::empty()
+ const
{
return queue_.empty();
}
peer().enqueue(p);
}
+prefix_ void senf::ppi::connector::OutputConnector::write(Packet p)
+{
+ operator()(p);
+}
+
prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer()
+ const
{
return dynamic_cast<InputConnector&>(Connector::peer());
}
{}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveInput
+// senf::ppi::connector::GenericPassiveInput
-prefix_ senf::ppi::connector::PassiveInput::PassiveInput()
- : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED)
+prefix_ senf::ppi::connector::GenericPassiveInput::GenericPassiveInput()
+ : qdisc_(new ThresholdQueueing(1,0))
{}
-prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer()
+prefix_ senf::ppi::connector::GenericActiveOutput & senf::ppi::connector::GenericPassiveInput::peer()
+ const
{
- return dynamic_cast<ActiveOutput&>(Connector::peer());
+ return dynamic_cast<GenericActiveOutput&>(Connector::peer());
+}
+
+prefix_ bool senf::ppi::connector::GenericPassiveInput::boolean_test()
+ const
+{
+ return ! empty();
}
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveOutput
+// senf::ppi::connector::GenericPassiveOutput
-prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer()
+prefix_ senf::ppi::connector::GenericActiveInput & senf::ppi::connector::GenericPassiveOutput::peer()
+ const
{
- return dynamic_cast<ActiveInput&>(Connector::peer());
+ return dynamic_cast<GenericActiveInput&>(Connector::peer());
}
-prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target)
+prefix_ bool senf::ppi::connector::GenericPassiveOutput::boolean_test()
+ const
+{
+ return true;
+}
+
+prefix_ void senf::ppi::connector::GenericPassiveOutput::connect(GenericActiveInput & target)
{
Connector::connect(target);
}
+prefix_ senf::ppi::connector::GenericPassiveOutput::GenericPassiveOutput()
+{}
+
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveInput
+// senf::ppi::connector::GenericActiveInput
-prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer()
+prefix_ senf::ppi::connector::GenericPassiveOutput & senf::ppi::connector::GenericActiveInput::peer()
+ const
{
- return dynamic_cast<PassiveOutput&>(Connector::peer());
+ return dynamic_cast<GenericPassiveOutput&>(Connector::peer());
}
-prefix_ void senf::ppi::connector::ActiveInput::request()
+prefix_ bool senf::ppi::connector::GenericActiveInput::boolean_test()
+ const
+{
+ return ! empty() || ! peer().throttled();
+}
+
+prefix_ void senf::ppi::connector::GenericActiveInput::request()
{
peer().emit();
}
+prefix_ senf::ppi::connector::GenericActiveInput::GenericActiveInput()
+{}
+
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveOutput
+// senf::ppi::connector::GenericActiveOutput
-prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::ActiveOutput::peer()
+prefix_ senf::ppi::connector::GenericPassiveInput & senf::ppi::connector::GenericActiveOutput::peer()
+ const
{
- return dynamic_cast<ActiveInput&>(Connector::peer());
+ return dynamic_cast<GenericPassiveInput&>(Connector::peer());
}
-prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target)
+prefix_ bool senf::ppi::connector::GenericActiveOutput::boolean_test()
+ const
+{
+ return ! peer().throttled();
+}
+
+prefix_ void senf::ppi::connector::GenericActiveOutput::connect(GenericPassiveInput & target)
{
Connector::connect(target);
}
+prefix_ senf::ppi::connector::GenericActiveOutput::GenericActiveOutput()
+{}
+
///////////////////////////////cci.e///////////////////////////////////////
#undef prefix_