///////////////////////////////////////////////////////////////////////////
// senf::ppi::connector::Connector
+prefix_ senf::ppi::connector::Connector::~Connector()
+{
+ if (connected()) {
+ Connector & peer (*peer_);
+ peer_->peer_ = 0;
+ if (! peer.initializationScheduled())
+ peer.enqueueInitializable();
+ peer.v_disconnected();
+ }
+}
+
prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
{
// The connector is not registered -> route() or noroute() statement missing
emitUnthrottle();
}
+prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
+{
+ routes_.push_back(&route);
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::unregisterRoute(ForwardingRoute & route)
+{
+ Routes::iterator i (std::find(routes_.begin(), routes_.end(), &route));
+ if (i != routes_.end())
+ routes_.erase(i);
+}
+
prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
{}
prefix_ void senf::ppi::connector::GenericPassiveInput::v_enqueueEvent()
{
emit();
- qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
+ if (qdisc_)
+ qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
}
prefix_ void senf::ppi::connector::GenericPassiveInput::v_dequeueEvent()
{
- qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+ if (qdisc_)
+ qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+}
+
+prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QueueingDiscipline::None_t)
+{
+ qdisc_.reset( 0);
}
prefix_ void senf::ppi::connector::GenericPassiveInput::v_unthrottleEvent()
: peer_(), module_()
{}
-prefix_ senf::ppi::connector::Connector::~Connector()
-{
- if (connected()) {
- Connector & peer (*peer_);
- peer_->peer_ = 0;
- if (! peer.initializationScheduled())
- peer.enqueueInitializable();
- peer.v_disconnected();
- }
-}
-
prefix_ bool senf::ppi::connector::Connector::connected()
const
{
remoteThrottled_ = true;
}
-prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
-{
- routes_.push_back(&route);
-}
-
-prefix_ void senf::ppi::connector::PassiveConnector::unregisterRoute(ForwardingRoute & route)
-{
- Routes::iterator i (std::find(routes_.begin(), routes_.end(), &route));
- if (i != routes_.end())
- routes_.erase(i);
-}
-
// public members
prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled()
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief Connectors non-inline template implementation */
+
+//#include "Connectors.ih"
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////ct.p////////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::PassiveConnector
+
+prefix_ senf::ppi::connector::PassiveConnector::~PassiveConnector()
+{
+ // Must be here and NOT in base so it is called before destructing the routes_ member
+ unregisterConnector();
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::PassiveConnector::onRequest(Handler handler)
+{
+ callback_ = ppi::detail::Callback<>::make(handler, module());
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveConnector
+
+prefix_ senf::ppi::connector::ActiveConnector::~ActiveConnector()
+{
+ // Must be here and NOT in base so it is called before destructing the routes_ member
+ unregisterConnector();
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle(Handler handler)
+{
+ throttleCallback_ = ppi::detail::Callback<>::make(handler, module());
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle(Handler handler)
+{
+ unthrottleCallback_ = ppi::detail::Callback<>::make(handler, module());
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::GenericPassiveInput
+
+template <class QDisc>
+prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QDisc const & disc)
+{
+ qdisc_.reset(new QDisc(disc));
+}
+
+///////////////////////////////ct.e////////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
static_cast<Self*>(this)->OutputConnector::write(p);
}
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveConnector
-
-prefix_ senf::ppi::connector::PassiveConnector::~PassiveConnector()
-{
- // Must be here and NOT in base so it is called before destructing the routes_ member
- unregisterConnector();
-}
-
-template <class Handler>
-prefix_ void senf::ppi::connector::PassiveConnector::onRequest(Handler handler)
-{
- callback_ = ppi::detail::Callback<>::make(handler, module());
-}
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveConnector
-
-prefix_ senf::ppi::connector::ActiveConnector::~ActiveConnector()
-{
- // Must be here and NOT in base so it is called before destructing the routes_ member
- unregisterConnector();
-}
-
-template <class Handler>
-prefix_ void senf::ppi::connector::ActiveConnector::onThrottle(Handler handler)
-{
- throttleCallback_ = ppi::detail::Callback<>::make(handler, module());
-}
-
-template <class Handler>
-prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle(Handler handler)
-{
- unthrottleCallback_ = ppi::detail::Callback<>::make(handler, module());
-}
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::GenericPassiveInput
-
-template <class QDisc>
-prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QDisc const & disc)
-{
- qdisc_.reset(new QDisc(disc));
-}
-
///////////////////////////////cti.e///////////////////////////////////////
#undef prefix_
QueueingDiscipline interface.
\param[in] disc New queueing discipline */
+ void qdisc(QueueingDiscipline::None_t);
+ ///< Disable queueing discipline
+
protected:
GenericPassiveInput();
///////////////////////////////hh.e////////////////////////////////////////
#include "Connectors.cci"
-//#include "Connectors.ct"
+#include "Connectors.ct"
#include "Connectors.cti"
#endif
///////////////////////////////////////////////////////////////////////////
// senf::ppi::module::PassiveJoin
+prefix_ senf::ppi::module::PassiveJoin::PassiveJoin()
+{
+ noroute(output);
+ output.onThrottle(&PassiveJoin::onThrottle);
+ output.onUnthrottle(&PassiveJoin::onUnthrottle);
+}
+
////////////////////////////////////////
// private members
conn.onRequest(boost::bind(&PassiveJoin::request,this,boost::ref(conn)));
}
-prefix_ void senf::ppi::module::PassiveJoin::request(connector::GenericPassiveInput & input)
-{
- output(input());
-}
-
prefix_ void senf::ppi::module::PassiveJoin::onThrottle()
{
using boost::lambda::_1;
///////////////////////////////////////////////////////////////////////////
// senf::ppi::module::PriorityJoin
+prefix_ senf::ppi::module::PriorityJoin::PriorityJoin()
+{
+ noroute(output);
+ output.onRequest(&PriorityJoin::request);
+}
+
////////////////////////////////////////
// private members
///////////////////////////////////////////////////////////////////////////
// senf::ppi::module::PassiveJoin
-prefix_ senf::ppi::module::PassiveJoin::PassiveJoin()
+prefix_ void senf::ppi::module::PassiveJoin::request(connector::GenericPassiveInput & input)
{
- noroute(output);
- output.onThrottle(&PassiveJoin::onThrottle);
- output.onUnthrottle(&PassiveJoin::onUnthrottle);
-}
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::module::PriorityJoin
-
-prefix_ senf::ppi::module::PriorityJoin::PriorityJoin()
-{
- noroute(output);
- output.onRequest(&PriorityJoin::request);
+ output(input());
}
///////////////////////////////cci.e///////////////////////////////////////
virtual ~QueueingDiscipline();
enum Event { ENQUEUE, DEQUEUE }; ///< Possible queueing events
+ enum None_t { NONE }; ///< Type to disable the queueing discipline
virtual void update(connector::GenericPassiveInput & input, Event event) = 0;
///< Calculate new queueing state
};
}
-SENF_AUTO_UNIT_TEST(thresholdQueueing)
+SENF_AUTO_UNIT_TEST(PPI_Queueing)
{
debug::ActiveSource source;
QueueTester tester;
ppi::init();
senf::Packet p (senf::DataPacket::create());
- BOOST_CHECK( source );
- source.submit(p);
- BOOST_CHECK( source );
- source.submit(p);
- BOOST_CHECK( ! source );
- BOOST_CHECK_EQUAL( tester.input.queueSize(), 2u );
- tester.forward();
- BOOST_CHECK_EQUAL( tester.input.queueSize(), 1u );
- BOOST_CHECK( source );
- tester.forward();
- BOOST_CHECK_EQUAL( tester.input.queueSize(), 0u );
- BOOST_CHECK( source );
+ {
+ BOOST_CHECK( source );
+ source.submit(p);
+ BOOST_CHECK( source );
+ source.submit(p);
+ BOOST_CHECK( ! source );
+ BOOST_CHECK_EQUAL( tester.input.queueSize(), 2u );
+ tester.forward();
+ BOOST_CHECK_EQUAL( tester.input.queueSize(), 1u );
+ BOOST_CHECK( source );
+ tester.forward();
+ BOOST_CHECK_EQUAL( tester.input.queueSize(), 0u );
+ BOOST_CHECK( source );
+ BOOST_CHECK_EQUAL( sink.size(), 2u);
+ sink.clear();
+ }
+ {
+ tester.input.qdisc(ppi::QueueingDiscipline::NONE);
+ BOOST_CHECK( source );
+ source.submit(p);
+ BOOST_CHECK( source );
+ source.submit(p);
+ BOOST_CHECK( source );
+ BOOST_CHECK_EQUAL( tester.input.queueSize(), 2u );
+ tester.forward();
+ tester.forward();
+ BOOST_CHECK( source );
+ BOOST_CHECK_EQUAL( tester.input.queueSize(), 0u );
+ BOOST_CHECK_EQUAL( sink.size(), 2u);
+ }
}
///////////////////////////////cc.e////////////////////////////////////////
senf::ProtocolClientSocketHandle<SocketProtocol>::protocol()
{
SENF_ASSERT( dynamic_cast<SocketProtocol *>(&this->body().protocol()),
- "Internal failure: Incompatible protocol class fount it's way into this handle");
+ "Internal failure: Incompatible protocol class found it's way into this handle");
// Need dynamic_cast here, since senf::SocketProtocol is a
// virtual base
return dynamic_cast<SocketProtocol &>(this->body().protocol());