PPI: Implement PassiveOuput <-> ActiveInput functionality
g0dil [Thu, 9 Aug 2007 12:25:07 +0000 (12:25 +0000)]
PPI: Some small bugfixes to make ppitest work (not only compile :-) )
PPI: Add DebugModules

git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@386 270642c3-0616-0410-b53a-bc976706d245

PPI/Connectors.cc
PPI/Connectors.cci
PPI/Connectors.hh
PPI/DebugModules.cci [new file with mode: 0644]
PPI/DebugModules.hh [new file with mode: 0644]
PPI/DebugModules.test.cc [new file with mode: 0644]
PPI/Module.ct
PPI/Route.ih
PPI/SocketReader.ct
PPI/SocketReader.hh
PPI/ppitest/ppitest.cc

index d8e74dc..8b02972 100644 (file)
@@ -50,6 +50,9 @@ prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
 ////////////////////////////////////////
 // private members
 
+prefix_ void senf::ppi::connector::InputConnector::v_requestEvent()
+{}
+
 prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent()
 {}
 
@@ -57,6 +60,17 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent()
 {}
 
 ///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveInput
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::connector::ActiveInput::v_requestEvent()
+{
+    request();
+}
+
+///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::PassiveInput
 
 ////////////////////////////////////////
@@ -65,14 +79,20 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent()
 prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent()
 {
     ///\fixme Emit notifications when qstate_ changes
-    qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
+    if (qdisc_)
+        qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
+    else
+        qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED;
     emit();
 }
 
 prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent()
 {
     ///\fixme Emit notifications when qstate_ changes
-    qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+    if (qdisc_)
+        qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
+    else
+        qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED;
 }
 
 ///////////////////////////////cc.e////////////////////////////////////////
index 60be4bd..8d24f26 100644 (file)
@@ -93,7 +93,8 @@ prefix_ void senf::ppi::connector::PassiveConnector::emit()
 
 prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
 {
-    Packet p (queue_.back());
+    v_requestEvent();
+    Packet p (peek());
     queue_.pop_back();
     v_dequeueEvent();
     return p;
@@ -124,6 +125,7 @@ senf::ppi::connector::InputConnector::end()
 
 prefix_ senf::Packet senf::ppi::connector::InputConnector::peek()
 {
+    BOOST_ASSERT( ! queue_.empty() );
     return queue_.back();
 }
 
@@ -185,6 +187,32 @@ prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput:
 }
 
 ///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::PassiveOutput
+
+prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer()
+{
+    return dynamic_cast<ActiveInput&>(Connector::peer());
+}
+
+prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target)
+{
+    Connector::connect(target);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::connector::ActiveInput
+
+prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer()
+{
+    return dynamic_cast<PassiveOutput&>(Connector::peer());
+}
+
+prefix_ void senf::ppi::connector::ActiveInput::request()
+{
+    peer().emit();
+}
+
+///////////////////////////////////////////////////////////////////////////
 // senf::ppi::connector::ActiveOutput
 
 prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::ActiveOutput::peer()
index 655e225..553ff96 100644 (file)
@@ -252,6 +252,7 @@ namespace connector {
     private:
         void enqueue(Packet p);
         
+        virtual void v_requestEvent();
         virtual void v_enqueueEvent();
         virtual void v_dequeueEvent();
 
@@ -323,6 +324,8 @@ namespace connector {
         ActiveInput & peer();
 
         void connect(ActiveInput & target);
+
+        friend class ActiveInput;
     };
 
     /** \brief Combination of ActiveConnector and InputConnector
@@ -334,6 +337,9 @@ namespace connector {
         PassiveOutput & peer();
 
         void request();                 ///< request more packets without dequeuing any packet
+
+    private:
+        void v_requestEvent();
     };
 
     /** \brief Combination of ActiveConnector and OutputConnector
diff --git a/PPI/DebugModules.cci b/PPI/DebugModules.cci
new file mode 100644 (file)
index 0000000..6accafa
--- /dev/null
@@ -0,0 +1,164 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief DebugModules inline non-template implementation */
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::module::debug::ActivePacketSource
+
+prefix_ senf::ppi::module::debug::ActivePacketSource::ActivePacketSource()
+{
+    noroute(output);
+}
+
+prefix_ void senf::ppi::module::debug::ActivePacketSource::submit(Packet packet)
+{
+    output(packet);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::module::debug::PassivePacketSource
+
+prefix_ senf::ppi::module::debug::PassivePacketSource::PassivePacketSource()
+{
+    noroute(output);
+    output.onRequest(&PassivePacketSource::request);
+}
+
+prefix_ void senf::ppi::module::debug::PassivePacketSource::submit(Packet packet)
+{
+    packets_.push_back(packet);
+}
+
+prefix_ bool senf::ppi::module::debug::PassivePacketSource::empty()
+{
+    return packets_.empty();
+}
+
+prefix_ senf::ppi::module::debug::PassivePacketSource::size_type
+senf::ppi::module::debug::PassivePacketSource::size()
+{
+    return packets_.size();
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::module::debug::PassivePacketSource::request()
+{
+    output(packets_.front());
+    packets_.pop_front();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::module::debug::ActivePacketSink
+
+prefix_ senf::ppi::module::debug::ActivePacketSink::ActivePacketSink()
+{
+    noroute(input);
+}
+
+prefix_ senf::Packet senf::ppi::module::debug::ActivePacketSink::request()
+{
+    return input();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::module::debug::PassivePacketSink
+
+prefix_ senf::ppi::module::debug::PassivePacketSink::PassivePacketSink()
+{
+    noroute(input);
+    input.onRequest(&PassivePacketSink::request);
+}
+
+prefix_ bool senf::ppi::module::debug::PassivePacketSink::empty()
+{
+    return packets_.empty();
+}
+
+prefix_ senf::ppi::module::debug::PassivePacketSink::size_type
+senf::ppi::module::debug::PassivePacketSink::size()
+{
+    return packets_.size();
+}
+
+prefix_ senf::ppi::module::debug::PassivePacketSink::iterator
+senf::ppi::module::debug::PassivePacketSink::begin()
+{
+    return packets_.begin();
+}
+
+prefix_ senf::ppi::module::debug::PassivePacketSink::iterator
+senf::ppi::module::debug::PassivePacketSink::end()
+{
+    return packets_.end();
+}
+
+prefix_ senf::Packet senf::ppi::module::debug::PassivePacketSink::back()
+{
+    if (empty())
+        return Packet();
+    else
+        return packets_.back();
+}
+
+prefix_ senf::Packet senf::ppi::module::debug::PassivePacketSink::pop_back()
+{
+    Packet p (back());
+    if (p) 
+        packets_.pop_back();
+    return p;
+}
+
+prefix_ void senf::ppi::module::debug::PassivePacketSink::clear()
+{
+    packets_.erase(packets_.begin(), packets_.end());
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::module::debug::PassivePacketSink::request()
+{
+    packets_.push_back(input());
+}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/PPI/DebugModules.hh b/PPI/DebugModules.hh
new file mode 100644 (file)
index 0000000..5545871
--- /dev/null
@@ -0,0 +1,133 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief DebugModules public header */
+
+#ifndef HH_DebugModules_
+#define HH_DebugModules_ 1
+
+// Custom includes
+#include <deque>
+#include "Packets/Packets.hh"
+#include "Module.hh"
+
+//#include "DebugModules.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+namespace module {
+namespace debug {
+    
+    class ActivePacketSource
+        : public Module
+    {
+    public:
+        connector::ActiveOutput output;
+
+        ActivePacketSource();
+
+        void submit(Packet packet);
+    };
+
+    class PassivePacketSource
+        : public Module
+    {
+        typedef std::deque<Packet> Queue;
+
+    public:
+        typedef Queue::size_type size_type;
+        
+        PassivePacketSource();
+        
+        connector::PassiveOutput output;
+        
+        void submit(Packet packet);
+
+        bool empty();
+        size_type size();
+
+    private:
+        void request();
+        
+        Queue packets_;
+    };
+
+    class ActivePacketSink
+        : public Module
+    {
+    public:
+        connector::ActiveInput input;
+
+        ActivePacketSink();
+
+        Packet request();
+    };
+
+    class PassivePacketSink
+        : public Module
+    {
+        typedef std::deque<Packet> Queue;
+
+    public:
+        typedef Queue::size_type size_type;
+        typedef Queue::const_iterator iterator;
+
+        connector::PassiveInput input;
+        
+        PassivePacketSink();
+
+        bool empty();
+        size_type size();
+        iterator begin();
+        iterator end();
+
+        Packet back();
+        Packet pop_back();
+
+        void clear();
+
+    private:
+        void request();
+        
+        Queue packets_;
+    };
+
+}}}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+#include "DebugModules.cci"
+//#include "DebugModules.ct"
+//#include "DebugModules.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
diff --git a/PPI/DebugModules.test.cc b/PPI/DebugModules.test.cc
new file mode 100644 (file)
index 0000000..fd95c32
--- /dev/null
@@ -0,0 +1,101 @@
+// $Id$
+//
+// Copyright (C) 2007 
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+//     Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief DebubgModules.test unit tests */
+
+//#include "DebubgModules.test.hh"
+//#include "DebubgModules.test.ih"
+
+// Custom includes
+#include <algorithm>
+#include "Packets/Packets.hh"
+#include "DebugModules.hh"
+#include "Setup.hh"
+
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/test/test_tools.hpp>
+
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+BOOST_AUTO_UNIT_TEST(debugModules)
+{
+    namespace debug = senf::ppi::module::debug;
+    namespace ppi = senf::ppi;
+
+    {
+        debug::ActivePacketSource source;
+        debug::PassivePacketSink sink;
+
+        ppi::connect(source.output, sink.input);
+        
+        senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u };
+        senf::Packet p (senf::DataPacket::create(data));
+
+        source.submit(p);
+        
+        BOOST_CHECK_EQUAL( sink.size(), 1u );
+        BOOST_CHECK( ! sink.empty() );
+        BOOST_CHECK_EQUAL( 
+            debug::PassivePacketSink::size_type(std::distance(sink.begin(),sink.end())),
+            sink.size() );
+        BOOST_CHECK( *sink.begin() == p );
+        BOOST_CHECK( sink.back() == p );
+
+        sink.clear();
+
+        BOOST_CHECK( ! sink.back() );
+        BOOST_CHECK( sink.empty() );
+    }
+
+    {
+        debug::PassivePacketSource source;
+        debug::ActivePacketSink sink;
+
+        ppi::connect(source.output, sink.input);
+
+        senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u };
+        senf::Packet p (senf::DataPacket::create(data));
+
+        source.submit(p);
+        
+        BOOST_CHECK_EQUAL( source.size(), 1u );
+        BOOST_CHECK_EQUAL( sink.request(), p );
+        BOOST_CHECK_EQUAL( source.size(), 0u );
+        BOOST_CHECK( source.empty() );
+    }
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
index 0fc3339..b88a98c 100644 (file)
@@ -48,6 +48,7 @@ prefix_ void senf::ppi::module::Module::registerEvent(Target target, Descriptor
         *this,
         EventManager::Callback<Descriptor>::make(target,*this),
         descriptor);
+    descriptor.enabled(true);
 }
 
 ///////////////////////////////////////////////////////////////////////////
index b3de6ab..143a7a4 100644 (file)
@@ -48,6 +48,8 @@ namespace detail {
         connector::OutputConnector * target_;
     };
 
+#   ifndef DOXYGEN
+
     template <>
     class RouteImplementation<true,false>
         : public RouteBase
@@ -76,6 +78,8 @@ namespace detail {
         EventDescriptor * target_;
     };
 
+#   endif
+
 }}}
 
 ///////////////////////////////ih.e////////////////////////////////////////
index b56cf21..63faed0 100644 (file)
@@ -45,7 +45,8 @@ prefix_ Packet senf::ppi::PacketReader<Packet>::operator()(Handle handle)
 // senf::ppi::module::ActiveSocketReader<Reader>
 
 template <class Reader>
-prefix_ senf::ppi::module::ActiveSocketReader<Reader>::ActiveSocketReader(Handle handle)
+prefix_ senf::ppi::module::ActiveSocketReader<Reader>::
+ActiveSocketReader(Handle handle)
     : handle_(handle), event_(handle_, IOEvent::Read), reader_()
 {
     registerEvent( &ActiveSocketReader::read, event_ );
index 723daef..7c348f2 100644 (file)
@@ -88,7 +88,8 @@ namespace module {
         \endcode
      */
     template <class Reader=PacketReader<> >
-    class ActiveSocketReader : public Module
+    class ActiveSocketReader 
+        : public Module
     {
     public:
         typedef typename Reader::Handle Handle; ///< Handle type requested by the reader
index 205db94..f38ad88 100644 (file)
@@ -29,6 +29,7 @@
 // Custom includes
 #include "Socket/Protocols/INet/UDPSocketHandle.hh"
 #include "Socket/Protocols/INet/ConnectedUDPSocketHandle.hh"
+#include "Socket/Protocols/INet/INetAddressing.hh"
 #include "PPI/SocketReader.hh"
 #include "PPI/SocketWriter.hh"
 #include "PPI/Setup.hh"
@@ -43,9 +44,11 @@ int main(int argc, char * argv[])
     namespace ppi = senf::ppi;
 
     senf::UDPv4ClientSocketHandle inputSocket;
+    inputSocket.bind(senf::INet4SocketAddress("0.0.0.0:44344"));
     module::ActiveSocketReader<> udpReader (inputSocket);
 
-    senf::ConnectedUDPv4ClientSocketHandle outputSocket;
+    senf::ConnectedUDPv4ClientSocketHandle outputSocket(
+        senf::INet4SocketAddress("localhost:44345"));
     module::PassiveSocketWriter<> udpWriter (outputSocket);
     
     ppi::connect(udpReader.output, udpWriter.input);