PPI: added QueueingDiscipline::NONE
tho [Fri, 6 Aug 2010 09:05:15 +0000 (09:05 +0000)]
git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@1663 270642c3-0616-0410-b53a-bc976706d245

senf/PPI/Connectors.cc
senf/PPI/Connectors.cci
senf/PPI/Connectors.ct [new file with mode: 0644]
senf/PPI/Connectors.cti
senf/PPI/Connectors.hh
senf/PPI/Joins.cc
senf/PPI/Joins.cci
senf/PPI/Queueing.hh
senf/PPI/Queueing.test.cc
senf/Socket/ProtocolClientSocketHandle.cti

index a6daa13..eca5ecc 100644 (file)
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::Connector
 
+prefix_ senf::ppi::connector::Connector::~Connector()
+{
+    if (connected()) {
+        Connector & peer (*peer_);
+        peer_->peer_ = 0;
+        if (! peer.initializationScheduled())
+            peer.enqueueInitializable();
+        peer.v_disconnected();
+    }
+}
+
 prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
 {
     // The connector is not registered -> route() or noroute() statement missing
@@ -215,6 +226,18 @@ prefix_ void senf::ppi::connector::PassiveConnector::v_init()
         emitUnthrottle();
 }
 
+prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
+{
+    routes_.push_back(&route);
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::unregisterRoute(ForwardingRoute & route)
+{
+    Routes::iterator i (std::find(routes_.begin(), routes_.end(), &route));
+    if (i != routes_.end())
+        routes_.erase(i);
+}
+
 prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
 {}
 
@@ -332,12 +355,19 @@ prefix_ void senf::ppi::connector::GenericActiveInput::v_requestEvent()
 prefix_ void senf::ppi::connector::GenericPassiveInput::v_enqueueEvent()
 {
     emit();
-    qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
+    if (qdisc_)
+        qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
 }
 
 prefix_ void senf::ppi::connector::GenericPassiveInput::v_dequeueEvent()
 {
-    qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+    if (qdisc_)
+        qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+}
+
+prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QueueingDiscipline::None_t)
+{
+    qdisc_.reset( 0);
 }
 
 prefix_ void senf::ppi::connector::GenericPassiveInput::v_unthrottleEvent()
index 16948c0..10d3399 100644 (file)
@@ -76,17 +76,6 @@ prefix_ senf::ppi::connector::Connector::Connector()
     : peer_(), module_()
 {}
 
-prefix_ senf::ppi::connector::Connector::~Connector()
-{
-    if (connected()) {
-        Connector & peer (*peer_);
-        peer_->peer_ = 0;
-        if (! peer.initializationScheduled())
-            peer.enqueueInitializable();
-        peer.v_disconnected();
-    }
-}
-
 prefix_ bool senf::ppi::connector::Connector::connected()
     const
 {
@@ -151,18 +140,6 @@ prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle()
         remoteThrottled_ = true;
 }
 
-prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
-{
-    routes_.push_back(&route);
-}
-
-prefix_ void senf::ppi::connector::PassiveConnector::unregisterRoute(ForwardingRoute & route)
-{
-    Routes::iterator i (std::find(routes_.begin(), routes_.end(), &route));
-    if (i != routes_.end())
-        routes_.erase(i);
-}
-
 // public members
 
 prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled()
diff --git a/senf/PPI/Connectors.ct b/senf/PPI/Connectors.ct
new file mode 100644 (file)
index 0000000..8868e9c
--- /dev/null
@@ -0,0 +1,90 @@
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief Connectors non-inline template implementation */
+
+//#include "Connectors.ih"
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////ct.p////////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::PassiveConnector
+
+prefix_ senf::ppi::connector::PassiveConnector::~PassiveConnector()
+{
+    // Must be here and NOT in base so it is called before destructing the routes_ member
+    unregisterConnector();
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::PassiveConnector::onRequest(Handler handler)
+{
+    callback_ = ppi::detail::Callback<>::make(handler, module());
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveConnector
+
+prefix_ senf::ppi::connector::ActiveConnector::~ActiveConnector()
+{
+    // Must be here and NOT in base so it is called before destructing the routes_ member
+    unregisterConnector();
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onThrottle(Handler handler)
+{
+    throttleCallback_ = ppi::detail::Callback<>::make(handler, module());
+}
+
+template <class Handler>
+prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle(Handler handler)
+{
+    unthrottleCallback_ = ppi::detail::Callback<>::make(handler, module());
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::GenericPassiveInput
+
+template <class QDisc>
+prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QDisc const & disc)
+{
+    qdisc_.reset(new QDisc(disc));
+}
+
+///////////////////////////////ct.e////////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
index 598e7f1..9a756b9 100644 (file)
@@ -63,51 +63,6 @@ prefix_ void senf::ppi::connector::detail::TypedOutputMixin<Self,PacketType>::wr
     static_cast<Self*>(this)->OutputConnector::write(p);
 }
 
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::PassiveConnector
-
-prefix_ senf::ppi::connector::PassiveConnector::~PassiveConnector()
-{
-    // Must be here and NOT in base so it is called before destructing the routes_ member
-    unregisterConnector();
-}
-
-template <class Handler>
-prefix_ void senf::ppi::connector::PassiveConnector::onRequest(Handler handler)
-{
-    callback_ = ppi::detail::Callback<>::make(handler, module());
-}
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::ActiveConnector
-
-prefix_ senf::ppi::connector::ActiveConnector::~ActiveConnector()
-{
-    // Must be here and NOT in base so it is called before destructing the routes_ member
-    unregisterConnector();
-}
-
-template <class Handler>
-prefix_ void senf::ppi::connector::ActiveConnector::onThrottle(Handler handler)
-{
-    throttleCallback_ = ppi::detail::Callback<>::make(handler, module());
-}
-
-template <class Handler>
-prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle(Handler handler)
-{
-    unthrottleCallback_ = ppi::detail::Callback<>::make(handler, module());
-}
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::connector::GenericPassiveInput
-
-template <class QDisc>
-prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QDisc const & disc)
-{
-    qdisc_.reset(new QDisc(disc));
-}
-
 ///////////////////////////////cti.e///////////////////////////////////////
 #undef prefix_
 
index cd6e1e1..2ca48be 100644 (file)
@@ -463,6 +463,9 @@ namespace connector {
                                              QueueingDiscipline interface.
 
                                              \param[in] disc New queueing discipline */
+        void qdisc(QueueingDiscipline::None_t);
+                                        ///< Disable queueing discipline
+
 
     protected:
         GenericPassiveInput();
@@ -658,7 +661,7 @@ namespace connector {
 
 ///////////////////////////////hh.e////////////////////////////////////////
 #include "Connectors.cci"
-//#include "Connectors.ct"
+#include "Connectors.ct"
 #include "Connectors.cti"
 #endif
 
index 9338347..bad31fa 100644 (file)
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::module::PassiveJoin
 
+prefix_ senf::ppi::module::PassiveJoin::PassiveJoin()
+{
+    noroute(output);
+    output.onThrottle(&PassiveJoin::onThrottle);
+    output.onUnthrottle(&PassiveJoin::onUnthrottle);
+}
+
 ////////////////////////////////////////
 // private members
 
@@ -48,11 +55,6 @@ prefix_ void senf::ppi::module::PassiveJoin::connectorSetup(connector::PassiveIn
     conn.onRequest(boost::bind(&PassiveJoin::request,this,boost::ref(conn)));
 }
 
-prefix_ void senf::ppi::module::PassiveJoin::request(connector::GenericPassiveInput & input)
-{
-    output(input());
-}
-
 prefix_ void senf::ppi::module::PassiveJoin::onThrottle()
 {
     using boost::lambda::_1;
@@ -72,6 +74,12 @@ prefix_ void senf::ppi::module::PassiveJoin::onUnthrottle()
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::module::PriorityJoin
 
+prefix_ senf::ppi::module::PriorityJoin::PriorityJoin()
+{
+    noroute(output);
+    output.onRequest(&PriorityJoin::request);
+}
+
 ////////////////////////////////////////
 // private members
 
index 2bb3449..4b4f4e5 100644 (file)
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::module::PassiveJoin
 
-prefix_ senf::ppi::module::PassiveJoin::PassiveJoin()
+prefix_ void senf::ppi::module::PassiveJoin::request(connector::GenericPassiveInput & input)
 {
-    noroute(output);
-    output.onThrottle(&PassiveJoin::onThrottle);
-    output.onUnthrottle(&PassiveJoin::onUnthrottle);
-}
-
-///////////////////////////////////////////////////////////////////////////
-// senf::ppi::module::PriorityJoin
-
-prefix_ senf::ppi::module::PriorityJoin::PriorityJoin()
-{
-    noroute(output);
-    output.onRequest(&PriorityJoin::request);
+    output(input());
 }
 
 ///////////////////////////////cci.e///////////////////////////////////////
index 233d9a5..9407cae 100644 (file)
@@ -58,6 +58,7 @@ namespace ppi {
         virtual ~QueueingDiscipline();
 
         enum Event { ENQUEUE, DEQUEUE }; ///< Possible queueing events
+        enum None_t { NONE };            ///< Type to disable the queueing discipline
 
         virtual void update(connector::GenericPassiveInput & input, Event event) = 0;
                                         ///< Calculate new queueing state
index 3c10045..839d150 100644 (file)
@@ -69,7 +69,7 @@ namespace {
     };
 }
 
-SENF_AUTO_UNIT_TEST(thresholdQueueing)
+SENF_AUTO_UNIT_TEST(PPI_Queueing)
 {
     debug::ActiveSource source;
     QueueTester tester;
@@ -80,18 +80,36 @@ SENF_AUTO_UNIT_TEST(thresholdQueueing)
     ppi::init();
 
     senf::Packet p (senf::DataPacket::create());
-    BOOST_CHECK( source );
-    source.submit(p);
-    BOOST_CHECK( source );
-    source.submit(p);
-    BOOST_CHECK( ! source );
-    BOOST_CHECK_EQUAL( tester.input.queueSize(), 2u );
-    tester.forward();
-    BOOST_CHECK_EQUAL( tester.input.queueSize(), 1u );
-    BOOST_CHECK( source );
-    tester.forward();
-    BOOST_CHECK_EQUAL( tester.input.queueSize(), 0u );
-    BOOST_CHECK( source );
+    {
+        BOOST_CHECK( source );
+        source.submit(p);
+        BOOST_CHECK( source );
+        source.submit(p);
+        BOOST_CHECK( ! source );
+        BOOST_CHECK_EQUAL( tester.input.queueSize(), 2u );
+        tester.forward();
+        BOOST_CHECK_EQUAL( tester.input.queueSize(), 1u );
+        BOOST_CHECK( source );
+        tester.forward();
+        BOOST_CHECK_EQUAL( tester.input.queueSize(), 0u );
+        BOOST_CHECK( source );
+        BOOST_CHECK_EQUAL( sink.size(), 2u);
+        sink.clear();
+    }
+    {
+        tester.input.qdisc(ppi::QueueingDiscipline::NONE);
+        BOOST_CHECK( source );
+        source.submit(p);
+        BOOST_CHECK( source );
+        source.submit(p);
+        BOOST_CHECK( source );
+        BOOST_CHECK_EQUAL( tester.input.queueSize(), 2u );
+        tester.forward();
+        tester.forward();
+        BOOST_CHECK( source );
+        BOOST_CHECK_EQUAL( tester.input.queueSize(), 0u );
+        BOOST_CHECK_EQUAL( sink.size(), 2u);
+    }
 }
 
 ///////////////////////////////cc.e////////////////////////////////////////
index dc6b543..0c4a427 100644 (file)
@@ -63,7 +63,7 @@ prefix_ SocketProtocol &
 senf::ProtocolClientSocketHandle<SocketProtocol>::protocol()
 {
     SENF_ASSERT( dynamic_cast<SocketProtocol *>(&this->body().protocol()),
-                 "Internal failure: Incompatible protocol class fount it's way into this handle");
+                 "Internal failure: Incompatible protocol class found it's way into this handle");
     // Need dynamic_cast here, since senf::SocketProtocol is a
     // virtual base
     return dynamic_cast<SocketProtocol &>(this->body().protocol());