From: g0dil Date: Thu, 9 Aug 2007 12:25:07 +0000 (+0000) Subject: PPI: Implement PassiveOuput <-> ActiveInput functionality X-Git-Url: http://g0dil.de/git?a=commitdiff_plain;h=026887c8f93c5dc89fb68446ae014205de559451;p=senf.git PPI: Implement PassiveOuput <-> ActiveInput functionality PPI: Some small bugfixes to make ppitest work (not only compile :-) ) PPI: Add DebugModules git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@386 270642c3-0616-0410-b53a-bc976706d245 --- diff --git a/PPI/Connectors.cc b/PPI/Connectors.cc index d8e74dc..8b02972 100644 --- a/PPI/Connectors.cc +++ b/PPI/Connectors.cc @@ -50,6 +50,9 @@ prefix_ void senf::ppi::connector::Connector::connect(Connector & target) //////////////////////////////////////// // private members +prefix_ void senf::ppi::connector::InputConnector::v_requestEvent() +{} + prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent() {} @@ -57,6 +60,17 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent() {} /////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveInput + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::ActiveInput::v_requestEvent() +{ + request(); +} + +/////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::PassiveInput //////////////////////////////////////// @@ -65,14 +79,20 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent() prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent() { ///\fixme Emit notifications when qstate_ changes - qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE); + if (qdisc_) + qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE); + else + qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED; emit(); } prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent() { ///\fixme Emit notifications when qstate_ changes - qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE); + if (qdisc_) + qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE); + else + qstate_ = empty()?QueueingDiscipline::UNTHROTTLED:QueueingDiscipline::THROTTLED; } ///////////////////////////////cc.e//////////////////////////////////////// diff --git a/PPI/Connectors.cci b/PPI/Connectors.cci index 60be4bd..8d24f26 100644 --- a/PPI/Connectors.cci +++ b/PPI/Connectors.cci @@ -93,7 +93,8 @@ prefix_ void senf::ppi::connector::PassiveConnector::emit() prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()() { - Packet p (queue_.back()); + v_requestEvent(); + Packet p (peek()); queue_.pop_back(); v_dequeueEvent(); return p; @@ -124,6 +125,7 @@ senf::ppi::connector::InputConnector::end() prefix_ senf::Packet senf::ppi::connector::InputConnector::peek() { + BOOST_ASSERT( ! queue_.empty() ); return queue_.back(); } @@ -185,6 +187,32 @@ prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput: } /////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::PassiveOutput + +prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer() +{ + return dynamic_cast(Connector::peer()); +} + +prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target) +{ + Connector::connect(target); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveInput + +prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer() +{ + return dynamic_cast(Connector::peer()); +} + +prefix_ void senf::ppi::connector::ActiveInput::request() +{ + peer().emit(); +} + +/////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::ActiveOutput prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::ActiveOutput::peer() diff --git a/PPI/Connectors.hh b/PPI/Connectors.hh index 655e225..553ff96 100644 --- a/PPI/Connectors.hh +++ b/PPI/Connectors.hh @@ -252,6 +252,7 @@ namespace connector { private: void enqueue(Packet p); + virtual void v_requestEvent(); virtual void v_enqueueEvent(); virtual void v_dequeueEvent(); @@ -323,6 +324,8 @@ namespace connector { ActiveInput & peer(); void connect(ActiveInput & target); + + friend class ActiveInput; }; /** \brief Combination of ActiveConnector and InputConnector @@ -334,6 +337,9 @@ namespace connector { PassiveOutput & peer(); void request(); ///< request more packets without dequeuing any packet + + private: + void v_requestEvent(); }; /** \brief Combination of ActiveConnector and OutputConnector diff --git a/PPI/DebugModules.cci b/PPI/DebugModules.cci new file mode 100644 index 0000000..6accafa --- /dev/null +++ b/PPI/DebugModules.cci @@ -0,0 +1,164 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief DebugModules inline non-template implementation */ + +// Custom includes + +#define prefix_ inline +///////////////////////////////cci.p/////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::module::debug::ActivePacketSource + +prefix_ senf::ppi::module::debug::ActivePacketSource::ActivePacketSource() +{ + noroute(output); +} + +prefix_ void senf::ppi::module::debug::ActivePacketSource::submit(Packet packet) +{ + output(packet); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::module::debug::PassivePacketSource + +prefix_ senf::ppi::module::debug::PassivePacketSource::PassivePacketSource() +{ + noroute(output); + output.onRequest(&PassivePacketSource::request); +} + +prefix_ void senf::ppi::module::debug::PassivePacketSource::submit(Packet packet) +{ + packets_.push_back(packet); +} + +prefix_ bool senf::ppi::module::debug::PassivePacketSource::empty() +{ + return packets_.empty(); +} + +prefix_ senf::ppi::module::debug::PassivePacketSource::size_type +senf::ppi::module::debug::PassivePacketSource::size() +{ + return packets_.size(); +} + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::module::debug::PassivePacketSource::request() +{ + output(packets_.front()); + packets_.pop_front(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::module::debug::ActivePacketSink + +prefix_ senf::ppi::module::debug::ActivePacketSink::ActivePacketSink() +{ + noroute(input); +} + +prefix_ senf::Packet senf::ppi::module::debug::ActivePacketSink::request() +{ + return input(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::module::debug::PassivePacketSink + +prefix_ senf::ppi::module::debug::PassivePacketSink::PassivePacketSink() +{ + noroute(input); + input.onRequest(&PassivePacketSink::request); +} + +prefix_ bool senf::ppi::module::debug::PassivePacketSink::empty() +{ + return packets_.empty(); +} + +prefix_ senf::ppi::module::debug::PassivePacketSink::size_type +senf::ppi::module::debug::PassivePacketSink::size() +{ + return packets_.size(); +} + +prefix_ senf::ppi::module::debug::PassivePacketSink::iterator +senf::ppi::module::debug::PassivePacketSink::begin() +{ + return packets_.begin(); +} + +prefix_ senf::ppi::module::debug::PassivePacketSink::iterator +senf::ppi::module::debug::PassivePacketSink::end() +{ + return packets_.end(); +} + +prefix_ senf::Packet senf::ppi::module::debug::PassivePacketSink::back() +{ + if (empty()) + return Packet(); + else + return packets_.back(); +} + +prefix_ senf::Packet senf::ppi::module::debug::PassivePacketSink::pop_back() +{ + Packet p (back()); + if (p) + packets_.pop_back(); + return p; +} + +prefix_ void senf::ppi::module::debug::PassivePacketSink::clear() +{ + packets_.erase(packets_.begin(), packets_.end()); +} + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::module::debug::PassivePacketSink::request() +{ + packets_.push_back(input()); +} + +///////////////////////////////cci.e/////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/PPI/DebugModules.hh b/PPI/DebugModules.hh new file mode 100644 index 0000000..5545871 --- /dev/null +++ b/PPI/DebugModules.hh @@ -0,0 +1,133 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief DebugModules public header */ + +#ifndef HH_DebugModules_ +#define HH_DebugModules_ 1 + +// Custom includes +#include +#include "Packets/Packets.hh" +#include "Module.hh" + +//#include "DebugModules.mpp" +///////////////////////////////hh.p//////////////////////////////////////// + +namespace senf { +namespace ppi { +namespace module { +namespace debug { + + class ActivePacketSource + : public Module + { + public: + connector::ActiveOutput output; + + ActivePacketSource(); + + void submit(Packet packet); + }; + + class PassivePacketSource + : public Module + { + typedef std::deque Queue; + + public: + typedef Queue::size_type size_type; + + PassivePacketSource(); + + connector::PassiveOutput output; + + void submit(Packet packet); + + bool empty(); + size_type size(); + + private: + void request(); + + Queue packets_; + }; + + class ActivePacketSink + : public Module + { + public: + connector::ActiveInput input; + + ActivePacketSink(); + + Packet request(); + }; + + class PassivePacketSink + : public Module + { + typedef std::deque Queue; + + public: + typedef Queue::size_type size_type; + typedef Queue::const_iterator iterator; + + connector::PassiveInput input; + + PassivePacketSink(); + + bool empty(); + size_type size(); + iterator begin(); + iterator end(); + + Packet back(); + Packet pop_back(); + + void clear(); + + private: + void request(); + + Queue packets_; + }; + +}}}} + +///////////////////////////////hh.e//////////////////////////////////////// +#include "DebugModules.cci" +//#include "DebugModules.ct" +//#include "DebugModules.cti" +#endif + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/PPI/DebugModules.test.cc b/PPI/DebugModules.test.cc new file mode 100644 index 0000000..fd95c32 --- /dev/null +++ b/PPI/DebugModules.test.cc @@ -0,0 +1,101 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief DebubgModules.test unit tests */ + +//#include "DebubgModules.test.hh" +//#include "DebubgModules.test.ih" + +// Custom includes +#include +#include "Packets/Packets.hh" +#include "DebugModules.hh" +#include "Setup.hh" + +#include +#include + +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// + +BOOST_AUTO_UNIT_TEST(debugModules) +{ + namespace debug = senf::ppi::module::debug; + namespace ppi = senf::ppi; + + { + debug::ActivePacketSource source; + debug::PassivePacketSink sink; + + ppi::connect(source.output, sink.input); + + senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u }; + senf::Packet p (senf::DataPacket::create(data)); + + source.submit(p); + + BOOST_CHECK_EQUAL( sink.size(), 1u ); + BOOST_CHECK( ! sink.empty() ); + BOOST_CHECK_EQUAL( + debug::PassivePacketSink::size_type(std::distance(sink.begin(),sink.end())), + sink.size() ); + BOOST_CHECK( *sink.begin() == p ); + BOOST_CHECK( sink.back() == p ); + + sink.clear(); + + BOOST_CHECK( ! sink.back() ); + BOOST_CHECK( sink.empty() ); + } + + { + debug::PassivePacketSource source; + debug::ActivePacketSink sink; + + ppi::connect(source.output, sink.input); + + senf::PacketData::byte data[] = { 0x13u, 0x24u, 0x35u }; + senf::Packet p (senf::DataPacket::create(data)); + + source.submit(p); + + BOOST_CHECK_EQUAL( source.size(), 1u ); + BOOST_CHECK_EQUAL( sink.request(), p ); + BOOST_CHECK_EQUAL( source.size(), 0u ); + BOOST_CHECK( source.empty() ); + } +} + +///////////////////////////////cc.e//////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/PPI/Module.ct b/PPI/Module.ct index 0fc3339..b88a98c 100644 --- a/PPI/Module.ct +++ b/PPI/Module.ct @@ -48,6 +48,7 @@ prefix_ void senf::ppi::module::Module::registerEvent(Target target, Descriptor *this, EventManager::Callback::make(target,*this), descriptor); + descriptor.enabled(true); } /////////////////////////////////////////////////////////////////////////// diff --git a/PPI/Route.ih b/PPI/Route.ih index b3de6ab..143a7a4 100644 --- a/PPI/Route.ih +++ b/PPI/Route.ih @@ -48,6 +48,8 @@ namespace detail { connector::OutputConnector * target_; }; +# ifndef DOXYGEN + template <> class RouteImplementation : public RouteBase @@ -76,6 +78,8 @@ namespace detail { EventDescriptor * target_; }; +# endif + }}} ///////////////////////////////ih.e//////////////////////////////////////// diff --git a/PPI/SocketReader.ct b/PPI/SocketReader.ct index b56cf21..63faed0 100644 --- a/PPI/SocketReader.ct +++ b/PPI/SocketReader.ct @@ -45,7 +45,8 @@ prefix_ Packet senf::ppi::PacketReader::operator()(Handle handle) // senf::ppi::module::ActiveSocketReader template -prefix_ senf::ppi::module::ActiveSocketReader::ActiveSocketReader(Handle handle) +prefix_ senf::ppi::module::ActiveSocketReader:: +ActiveSocketReader(Handle handle) : handle_(handle), event_(handle_, IOEvent::Read), reader_() { registerEvent( &ActiveSocketReader::read, event_ ); diff --git a/PPI/SocketReader.hh b/PPI/SocketReader.hh index 723daef..7c348f2 100644 --- a/PPI/SocketReader.hh +++ b/PPI/SocketReader.hh @@ -88,7 +88,8 @@ namespace module { \endcode */ template > - class ActiveSocketReader : public Module + class ActiveSocketReader + : public Module { public: typedef typename Reader::Handle Handle; ///< Handle type requested by the reader diff --git a/PPI/ppitest/ppitest.cc b/PPI/ppitest/ppitest.cc index 205db94..f38ad88 100644 --- a/PPI/ppitest/ppitest.cc +++ b/PPI/ppitest/ppitest.cc @@ -29,6 +29,7 @@ // Custom includes #include "Socket/Protocols/INet/UDPSocketHandle.hh" #include "Socket/Protocols/INet/ConnectedUDPSocketHandle.hh" +#include "Socket/Protocols/INet/INetAddressing.hh" #include "PPI/SocketReader.hh" #include "PPI/SocketWriter.hh" #include "PPI/Setup.hh" @@ -43,9 +44,11 @@ int main(int argc, char * argv[]) namespace ppi = senf::ppi; senf::UDPv4ClientSocketHandle inputSocket; + inputSocket.bind(senf::INet4SocketAddress("0.0.0.0:44344")); module::ActiveSocketReader<> udpReader (inputSocket); - senf::ConnectedUDPv4ClientSocketHandle outputSocket; + senf::ConnectedUDPv4ClientSocketHandle outputSocket( + senf::INet4SocketAddress("localhost:44345")); module::PassiveSocketWriter<> udpWriter (outputSocket); ppi::connect(udpReader.output, udpWriter.input);