From: tho Date: Tue, 29 Jun 2010 09:57:16 +0000 (+0000) Subject: added PassiveQueueingSocketSink X-Git-Url: http://g0dil.de/git?a=commitdiff_plain;h=ed55b722ff15975fb0b090a1b132c9b830829124;p=senf.git added PassiveQueueingSocketSink git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@1642 270642c3-0616-0410-b53a-bc976706d245 --- diff --git a/senf/PPI/Jack.test.cc b/senf/PPI/Jack.test.cc index a85beca..0625db3 100644 --- a/senf/PPI/Jack.test.cc +++ b/senf/PPI/Jack.test.cc @@ -27,7 +27,8 @@ //#include "Jack.test.ih" // Custom includes -#include "PPI.hh" +#include "Jack.hh" +#include "DebugModules.hh" #include #include diff --git a/senf/PPI/MultiConnectorMixin.test.cc b/senf/PPI/MultiConnectorMixin.test.cc index 4847285..b387e24 100644 --- a/senf/PPI/MultiConnectorMixin.test.cc +++ b/senf/PPI/MultiConnectorMixin.test.cc @@ -27,7 +27,10 @@ //#include "MultiConnectorMixin.test.ih" // Custom includes -#include "PPI.hh" +#include "MultiConnectorMixin.hh" +#include "DebugModules.hh" +#include "Joins.hh" +#include "AnnotationRouter.hh" #include #include diff --git a/senf/PPI/QueueingSocketSink.ct b/senf/PPI/QueueingSocketSink.ct new file mode 100644 index 0000000..027673d --- /dev/null +++ b/senf/PPI/QueueingSocketSink.ct @@ -0,0 +1,133 @@ +// $Id$ +// +// Copyright (C) 2010 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Thorsten Horstmann +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief QueueingSocketSink non-inline template implementation */ + +//#include "QueueingSocketSink.ih" + +// Custom includes + +#define prefix_ +///////////////////////////////ct.p//////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::FIFOQueueingAlgorithm + +template +prefix_ senf::ppi::FIFOQueueingAlgorithm::FIFOQueueingAlgorithm(unsigned size) + : size_( size) +{ } + +template +prefix_ PacketType senf::ppi::FIFOQueueingAlgorithm::dequeue() +{ + if (queue_.size() > 0) { + PacketType const & p = queue_.front(); + queue_.pop(); + return p; + } + return PacketType(); +} + +template +prefix_ bool senf::ppi::FIFOQueueingAlgorithm::enqueue(PacketType const & packet) +{ + if (queue_.size() < size_) { + queue_.push( packet); + return true; + } + return false; +} + +template +prefix_ void senf::ppi::FIFOQueueingAlgorithm::clear() +{ + while (! queue_.empty()) + queue_.pop(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::module::PassiveQueueingSocketSink + +template +template +prefix_ senf::ppi::module::PassiveQueueingSocketSink::PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm) + : handle_( handle), writer_( ), + qAlgo_( new QAlgorithm(qAlgorithm)), + event_( handle_, IOEvent::Write) +{ + registerEvent( event_, &PassiveQueueingSocketSink::writable ); + event_.enabled( false); + noroute(input); + input.onRequest( &PassiveQueueingSocketSink::write); + checkThrottle(); +} + +template +prefix_ void senf::ppi::module::PassiveQueueingSocketSink::write() +{ + PacketType p ( input.read()); + if (qAlgo_->size() > 0) { + qAlgo_->enqueue( p); + return; + } + if (! writer_( handle_, p)) { + if (qAlgo_->enqueue( p) && !event_.enabled()) { + event_.enabled( true); + } + } +} + +template +prefix_ void senf::ppi::module::PassiveQueueingSocketSink::writable() +{ + PacketType p (qAlgo_->dequeue()); + if (p) + writer_( handle_, p); + if (qAlgo_->size() == 0) { + event_.enabled( false); + } +} + +template +prefix_ void senf::ppi::module::PassiveQueueingSocketSink::checkThrottle() +{ + if (handle_.valid()) + input.unthrottle(); + else + input.throttle(); +} + +///////////////////////////////ct.e//////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/senf/PPI/QueueingSocketSink.cti b/senf/PPI/QueueingSocketSink.cti new file mode 100644 index 0000000..1fb6bc4 --- /dev/null +++ b/senf/PPI/QueueingSocketSink.cti @@ -0,0 +1,86 @@ +// $Id$ +// +// Copyright (C) 2010 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Thorsten Horstmann +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief QueueingSocketSink inline template implementation */ + +//#include "QueueingSocketSink.ih" + +// Custom includes + +#define prefix_ inline +///////////////////////////////cti.p/////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::FIFOQueueingAlgorithm + +template +prefix_ unsigned senf::ppi::FIFOQueueingAlgorithm::size() + const +{ + return queue_.size(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::module::PassiveQueueingSocketSink + +template +prefix_ Writer & senf::ppi::module::PassiveQueueingSocketSink::writer() +{ + return writer_; +} + +template +prefix_ typename Writer::Handle & senf::ppi::module::PassiveQueueingSocketSink::handle() +{ + return handle_; +} + +template +prefix_ void senf::ppi::module::PassiveQueueingSocketSink::handle(Handle handle) +{ + handle_ = handle; + qAlgo_->clear(); + checkThrottle(); +} + +template +prefix_ senf::ppi::QueueingAlgorithm::PacketType> & +senf::ppi::module::PassiveQueueingSocketSink::qAlgorithm() +{ + return *qAlgo_; +} + + +///////////////////////////////cti.e/////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/senf/PPI/QueueingSocketSink.hh b/senf/PPI/QueueingSocketSink.hh new file mode 100644 index 0000000..6c5c15c --- /dev/null +++ b/senf/PPI/QueueingSocketSink.hh @@ -0,0 +1,122 @@ +// $Id$ +// +// Copyright (C) 2010 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Thorsten Horstmann +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief QueueingSocketSink public header */ + +#ifndef HH_SENF_PPI_QueueingSocketSink_ +#define HH_SENF_PPI_QueueingSocketSink_ 1 + +// Custom includes +#include +#include "SocketSink.hh" + +//#include "QueueingSocketSink.mpp" +///////////////////////////////hh.p//////////////////////////////////////// + +namespace senf { +namespace ppi { + + template + class QueueingAlgorithm + { + public: + virtual ~QueueingAlgorithm() {}; + virtual PacketType dequeue() = 0; + virtual bool enqueue(PacketType const & packet) = 0; + virtual unsigned size() const = 0; + virtual void clear() = 0; + }; + + template + class FIFOQueueingAlgorithm : public QueueingAlgorithm + { + public: + FIFOQueueingAlgorithm(unsigned size); + + virtual PacketType dequeue(); + virtual bool enqueue(PacketType const & packet); + virtual unsigned size() const; + virtual void clear(); + + private: + std::queue queue_; + unsigned size_; + }; + + +namespace module { + + /** \brief QueueingSocketSink + + \ingroup io_modules + */ + template + class PassiveQueueingSocketSink : public Module + { + SENF_PPI_MODULE(PassiveQueueingSocketSink); + + public: + typedef typename Writer::Handle Handle; ///< Handle type requested by writer + typedef typename Writer::PacketType PacketType; + + connector::PassiveInput input; ///< Input connector from which data is received + + template + explicit PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm); + + Writer & writer(); ///< Access the Writer + Handle & handle(); ///< Access handle + void handle(Handle handle); ///< Set handle + /**< Assigning an empty or in-valid() handle will disable + the module until a new valid handle is assigned. */ + QueueingAlgorithm & qAlgorithm(); + + private: + void write(); + void writable(); + void checkThrottle(); + + Handle handle_; + Writer writer_; + boost::scoped_ptr > qAlgo_; + IOEvent event_; + }; + +}}} + +///////////////////////////////hh.e//////////////////////////////////////// +//#include "QueueingSocketSink.cci" +#include "QueueingSocketSink.ct" +#include "QueueingSocketSink.cti" +#endif + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// comment-column: 40 +// End: diff --git a/senf/PPI/QueueingSocketSink.test.cc b/senf/PPI/QueueingSocketSink.test.cc new file mode 100644 index 0000000..180af5b --- /dev/null +++ b/senf/PPI/QueueingSocketSink.test.cc @@ -0,0 +1,132 @@ +// $Id$ +// +// Copyright (C) 2010 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Thorsten Horstmann +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief QueueingSocketSink unit tests */ + +#include "QueueingSocketSink.hh" + +// Custom includes +#include +#include +#include "DebugModules.hh" +#include "SocketSink.hh" +#include "Setup.hh" + +#include +#include + +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// +namespace ppi = senf::ppi; +namespace module = ppi::module; +namespace debug = module::debug; +namespace scheduler = senf::scheduler; + +namespace { + void runPPI(senf::ClockService::clock_type t) + { + scheduler::TimerEvent timeout( + "test-timeout", &scheduler::terminate, scheduler::now() + t); + ppi::run(); + } + + int base_pid = 0; + + unsigned port(unsigned i) + { + if (! base_pid) + base_pid = ::getpid(); + return 23456u + (((base_pid^(base_pid>>8)^(base_pid>>16)^(base_pid>>24))&0xff)<<2) + i; + } + + std::string localhost4str(unsigned i) + { + return (boost::format("localhost:%d") % port(i)).str(); + } + + struct TestingConnectedDgramWriter + : public ppi::ConnectedDgramWriter + { + bool throttled; + + bool operator()(Handle handle, PacketType const & packet) + { + if (throttled) + return false; + return ConnectedDgramWriter::operator()( handle, packet); + } + }; +} + +SENF_AUTO_UNIT_TEST(passiveQueueingSocketSink) +{ + senf::ConnectedUDPv4ClientSocketHandle outputSocket ( + senf::INet4SocketAddress( localhost4str(0))); + ppi::FIFOQueueingAlgorithm queueingAlgorithm ( 100); + module::PassiveQueueingSocketSink udpSink ( + outputSocket, queueingAlgorithm); + udpSink.writer().throttled = false; + debug::ActiveSource source; + ppi::connect(source, udpSink); + senf::ppi::init(); + + std::string data ("TEST"); + senf::Packet p (senf::DataPacket::create(data)); + + senf::UDPv4ClientSocketHandle inputSocket; + inputSocket.bind(senf::INet4SocketAddress(localhost4str(0))); + + source.submit(p); + + std::string input (inputSocket.read()); + BOOST_CHECK_EQUAL( data, input ); + BOOST_CHECK_EQUAL( udpSink.qAlgorithm().size(), 0); + + udpSink.writer().throttled = true; + + source.submit(p); + BOOST_CHECK_EQUAL( udpSink.qAlgorithm().size(), 1); + + udpSink.writer().throttled = false; + + runPPI( senf::ClockService::milliseconds(200)); + + input = inputSocket.read(); + BOOST_CHECK_EQUAL( data, input ); + BOOST_CHECK_EQUAL( udpSink.qAlgorithm().size(), 0); +} + + +///////////////////////////////cc.e//////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/senf/PPI/RateAnalyzer.test.cc b/senf/PPI/RateAnalyzer.test.cc index 5cad705..bcf6e54 100644 --- a/senf/PPI/RateAnalyzer.test.cc +++ b/senf/PPI/RateAnalyzer.test.cc @@ -28,7 +28,9 @@ // Custom includes #include "RateAnalyzer.hh" -#include "PPI.hh" +#include "CloneSource.hh" +#include "RateFilter.hh" +#include "Setup.hh" #include #include diff --git a/senf/PPI/SocketSink.cc b/senf/PPI/SocketSink.cc index bd7b42e..b02f7e9 100644 --- a/senf/PPI/SocketSink.cc +++ b/senf/PPI/SocketSink.cc @@ -54,16 +54,16 @@ prefix_ void senf::ppi::IPv4SourceForcingDgramWriter::destination(senf::INet4Soc protocolId_ = dest.port(); } -prefix_ void senf::ppi::IPv4SourceForcingDgramWriter::operator()(Handle handle, +prefix_ bool senf::ppi::IPv4SourceForcingDgramWriter::operator()(Handle handle, Packet const & packet) { - sendtoandfrom( + return sendtoandfrom( handle.fd(), reinterpret_cast (&*packet.data().begin()), packet.size(), reinterpret_cast (&destination_), protocolId_, - reinterpret_cast (&source_)); + reinterpret_cast (&source_)) > 0; } prefix_ int senf::ppi::IPv4SourceForcingDgramWriter::sendtoandfrom( @@ -133,16 +133,16 @@ prefix_ void senf::ppi::IPv6SourceForcingDgramWriter::destination(senf::INet6Soc protocolId_ = dest.port(); } -prefix_ void senf::ppi::IPv6SourceForcingDgramWriter::operator()(Handle handle, +prefix_ bool senf::ppi::IPv6SourceForcingDgramWriter::operator()(Handle handle, Packet const & packet) { - sendtoandfrom( + return sendtoandfrom( handle.fd(), reinterpret_cast (&*packet.data().begin()), packet.size(), reinterpret_cast (&destination_), protocolId_, - reinterpret_cast (&source_)); + reinterpret_cast (&source_)) > 0; } prefix_ int senf::ppi::IPv6SourceForcingDgramWriter::sendtoandfrom( diff --git a/senf/PPI/SocketSink.cci b/senf/PPI/SocketSink.cci index c4f4802..b137100 100644 --- a/senf/PPI/SocketSink.cci +++ b/senf/PPI/SocketSink.cci @@ -33,10 +33,10 @@ /////////////////////////////////////////////////////////////////////////// // senf::ppi::ConnectedDgramWriter -prefix_ void senf::ppi::ConnectedDgramWriter::operator()(Handle handle, +prefix_ bool senf::ppi::ConnectedDgramWriter::operator()(Handle handle, Packet const & packet) { - handle.write(packet.data()); + return handle.write(packet.data()) != packet.data().begin(); } ///////////////////////////////cci.e/////////////////////////////////////// diff --git a/senf/PPI/SocketSink.cti b/senf/PPI/SocketSink.cti index 07dba5b..4e2ebf7 100644 --- a/senf/PPI/SocketSink.cti +++ b/senf/PPI/SocketSink.cti @@ -59,11 +59,12 @@ senf::ppi::TargetDgramWriter::target(typename Handle::Address const } template -prefix_ void senf::ppi::TargetDgramWriter::operator()(Handle handle, +prefix_ bool senf::ppi::TargetDgramWriter::operator()(Handle handle, Packet const & packet) { if (target_) - handle.writeto(target_, packet.data()); + return handle.writeto(target_, packet.data()) != packet.data().begin(); + return false; } /////////////////////////////////////////////////////////////////////////// diff --git a/senf/PPI/SocketSink.hh b/senf/PPI/SocketSink.hh index 03b64c1..ab68990 100644 --- a/senf/PPI/SocketSink.hh +++ b/senf/PPI/SocketSink.hh @@ -58,7 +58,7 @@ namespace ppi { ///< Handle type supported by this writer typedef Packet PacketType; - void operator()(Handle handle, Packet const & packet); + bool operator()(Handle handle, Packet const & packet); ///< Write \a packet to \a handle /**< Write the complete \a packet as a datagram to \a handle. @@ -87,7 +87,7 @@ namespace ppi { typename Handle::Address target() const; ///< Get current target address void target(typename Handle::Address const & target); ///< Set target address - void operator()(Handle handle, Packet const & packet); ///< Write \a packet to \a handle + bool operator()(Handle handle, Packet const & packet); ///< Write \a packet to \a handle /**< Write the complete \a packet as a datagram to \a handle. \param[in] handle Handle to write data to @@ -113,7 +113,7 @@ namespace ppi { void destination(senf::INet4SocketAddress & dest); senf::INet4SocketAddress destination(); - void operator()(Handle handle, Packet const & packet); + bool operator()(Handle handle, Packet const & packet); ///< Write \a packet to \a handle /**< Write the complete \a packet as a datagram to \a handle. @@ -142,7 +142,7 @@ namespace ppi { void destination(senf::INet6SocketAddress & dest); senf::INet6SocketAddress destination(); - void operator()(Handle handle, Packet const & packet); + bool operator()(Handle handle, Packet const & packet); ///< Write \a packet to \a handle /**< Write the complete \a packet as a datagram to \a handle. @@ -184,7 +184,7 @@ namespace module { SomeWriter(); // EITHER default constructible OR SomeWriter(SomeWriter const & other); // copy constructible - void operator()(Handle handle, Packet packet); // insertion function + bool operator()(Handle handle, Packet packet); // insertion function }; \endcode Whenever a packet is received for sending, the \a Writer's \c operator() is called. @@ -260,7 +260,7 @@ namespace module { SomeWriter(); // EITHER default constructible SomeWriter(SomeWriter const & other); // OR copy constructible - void operator()(Handle handle, Packet packet); // insertion function + bool operator()(Handle handle, Packet packet); // insertion function }; \endcode Whenever a packet is received for sending, the \a Writer's \c operator() is called.