From: tho Date: Wed, 7 Jul 2010 12:03:47 +0000 (+0000) Subject: PPI: PassiveQueueingSocketSink: added registry for QueueingAlgorithms X-Git-Url: http://g0dil.de/git?a=commitdiff_plain;h=fc1569e67cfda71bc8c406617a5e0b05f28f1b47;p=senf.git PPI: PassiveQueueingSocketSink: added registry for QueueingAlgorithms git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@1644 270642c3-0616-0410-b53a-bc976706d245 --- diff --git a/senf/PPI/QueueingSocketSink.cc b/senf/PPI/QueueingSocketSink.cc new file mode 100644 index 0000000..4267bb8 --- /dev/null +++ b/senf/PPI/QueueingSocketSink.cc @@ -0,0 +1,113 @@ +// $Id$ +// +// Copyright (C) 2010 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Thorsten Horstmann +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +// + +/** \file + \brief QueueingSocketSink non-inline non-template implementation */ + +#include "QueueingSocketSink.hh" +//#include "QueueingSocketSink.ih" + +// Custom includes + +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// + +SENF_PPI_REGISTER_QALGORITHM( "FIFOQueueingAlgorithm", senf::ppi::FIFOQueueingAlgorithm); + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::QueueingAlgorithm + +prefix_ senf::ppi::QueueingAlgorithm::QueueingAlgorithm() + : dir_(this) +{ + namespace fty = console::factory; + dir_.add("size", fty::Command( &QueueingAlgorithm::size, this)); + dir_.add("clear", fty::Command( &QueueingAlgorithm::clear, this)); +} + +prefix_ senf::console::DirectoryNode & senf::ppi::QueueingAlgorithm::consoleDir() +{ + return dir_; +} + +///////////////////////////////////////////////////////////////////////////// +// senf::ppi::QueueingAlgorithmRegistry + +prefix_ void senf::ppi::QueueingAlgorithmRegistry::dump(std::ostream & os) + const +{ + for (QAlgoMap::const_iterator i = qAlgoMap_.begin(); i != qAlgoMap_.end(); ++i) { + os << i->first << std::endl; + } +} + +prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::QueueingAlgorithmRegistry::createQAlgorithm(std::string const & key) + const +{ + QAlgoMap::const_iterator i (qAlgoMap_.find( key)); + if (i != qAlgoMap_.end()) + return i->second->create(); + else + throw Exception("QueueingAlgorithm not registered: ") << key; +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::FIFOQueueingAlgorithm + +prefix_ senf::ppi::FIFOQueueingAlgorithm::FIFOQueueingAlgorithm() + : max_size_( 64) +{ + consoleDir().add("max-size", console::factory::Variable(max_size_) ); +} + +prefix_ senf::Packet senf::ppi::FIFOQueueingAlgorithm::v_dequeue() +{ + if (queue_.size() > 0) { + Packet const & p = queue_.front(); + queue_.pop(); + return p; + } + return Packet(); +} + +prefix_ bool senf::ppi::FIFOQueueingAlgorithm::v_enqueue(Packet const & packet) +{ + if (queue_.size() == max_size_) + queue_.pop(); + queue_.push( packet); + return true; +} + +prefix_ void senf::ppi::FIFOQueueingAlgorithm::v_clear() +{ + while (! queue_.empty()) + queue_.pop(); +} + +prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::FIFOQueueingAlgorithm::create() +{ + return new FIFOQueueingAlgorithm(); +} + +///////////////////////////////cc.e//////////////////////////////////////// +#undef prefix_ diff --git a/senf/PPI/QueueingSocketSink.cci b/senf/PPI/QueueingSocketSink.cci new file mode 100644 index 0000000..965302c --- /dev/null +++ b/senf/PPI/QueueingSocketSink.cci @@ -0,0 +1,79 @@ +// $Id$ +// +// Copyright (C) 2010 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY +// Thorsten Horstmann +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief QueueingSocketSink inline non-template implementation */ + +//#include "QueueingSocketSink.ih" + +// Custom includes + +#define prefix_ inline +///////////////////////////////cci.p/////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::QueueingAlgorithm + +prefix_ unsigned senf::ppi::QueueingAlgorithm::size() +{ + return v_size(); +} + +prefix_ void senf::ppi::QueueingAlgorithm::clear() +{ + v_clear(); +} + +prefix_ senf::Packet senf::ppi::QueueingAlgorithm::dequeue() +{ + return v_dequeue(); +} + +prefix_ bool senf::ppi::QueueingAlgorithm::enqueue(Packet const & packet) +{ + return v_enqueue( packet); +} + + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::FIFOQueueingAlgorithm + +prefix_ unsigned senf::ppi::FIFOQueueingAlgorithm::v_size() + const +{ + return queue_.size(); +} + + +///////////////////////////////cci.e/////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/senf/PPI/QueueingSocketSink.ct b/senf/PPI/QueueingSocketSink.ct index 027673d..3ae8cbf 100644 --- a/senf/PPI/QueueingSocketSink.ct +++ b/senf/PPI/QueueingSocketSink.ct @@ -31,51 +31,43 @@ ///////////////////////////////ct.p//////////////////////////////////////// /////////////////////////////////////////////////////////////////////////// -// senf::ppi::FIFOQueueingAlgorithm +// senf::ppi::QueueingAlgorithmRegistry -template -prefix_ senf::ppi::FIFOQueueingAlgorithm::FIFOQueueingAlgorithm(unsigned size) - : size_( size) -{ } - -template -prefix_ PacketType senf::ppi::FIFOQueueingAlgorithm::dequeue() +template +prefix_ void senf::ppi::QueueingAlgorithmRegistry::registerQAlgorithm(std::string key) { - if (queue_.size() > 0) { - PacketType const & p = queue_.front(); - queue_.pop(); - return p; - } - return PacketType(); + if (qAlgoMap_.find( key) == qAlgoMap_.end() ) + qAlgoMap_.insert(key, new detail::QueueingAlgorithmRegistry_Entry() ); + else + throw Exception("Duplicated QAlgorithm Registration ") << key; } -template -prefix_ bool senf::ppi::FIFOQueueingAlgorithm::enqueue(PacketType const & packet) -{ - if (queue_.size() < size_) { - queue_.push( packet); - return true; - } - return false; -} +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::detail::QueueingAlgorithmRegistry_Entry -template -prefix_ void senf::ppi::FIFOQueueingAlgorithm::clear() +template +prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::detail::QueueingAlgorithmRegistry_Entry::create() + const { - while (! queue_.empty()) - queue_.pop(); + return QAlgorithm::create(); } /////////////////////////////////////////////////////////////////////////// // senf::ppi::module::PassiveQueueingSocketSink template -template -prefix_ senf::ppi::module::PassiveQueueingSocketSink::PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm) - : handle_( handle), writer_( ), - qAlgo_( new QAlgorithm(qAlgorithm)), +prefix_ senf::ppi::module::PassiveQueueingSocketSink::PassiveQueueingSocketSink(Handle handle, QueueingAlgorithm::ptr qAlgorithm) + : dir( this), + handle_( handle), writer_( ), + qAlgo_( qAlgorithm), event_( handle_, IOEvent::Write) { + namespace fty = console::factory; + dir.add( "active", qAlgo_->consoleDir()); + dir.add( "set", fty::Command( + &PassiveQueueingSocketSink::setQAlgorithm, this) ); + dir.add( "list", fty::Command( + &QueueingAlgorithmRegistry::dump, &QueueingAlgorithmRegistry::instance())); registerEvent( event_, &PassiveQueueingSocketSink::writable ); event_.enabled( false); noroute(input); @@ -118,6 +110,22 @@ prefix_ void senf::ppi::module::PassiveQueueingSocketSink::checkThrottle input.throttle(); } +template +prefix_ void senf::ppi::module::PassiveQueueingSocketSink::qAlgorithm(QueueingAlgorithm::ptr qAlgorithm) +{ +// dir.remove( "active"); + qAlgo_.reset( qAlgorithm); + dir.add( "active", qAlgo_->consoleDir()); + if (event_.enabled()) + event_.enabled( false); +} + +template +prefix_ void senf::ppi::module::PassiveQueueingSocketSink::setQAlgorithm(std::string const & key) +{ + qAlgorithm( QueueingAlgorithmRegistry::instance().createQAlgorithm( key)); +} + ///////////////////////////////ct.e//////////////////////////////////////// #undef prefix_ diff --git a/senf/PPI/QueueingSocketSink.cti b/senf/PPI/QueueingSocketSink.cti index 1fb6bc4..f1ae028 100644 --- a/senf/PPI/QueueingSocketSink.cti +++ b/senf/PPI/QueueingSocketSink.cti @@ -31,14 +31,14 @@ ///////////////////////////////cti.p/////////////////////////////////////// /////////////////////////////////////////////////////////////////////////// -// senf::ppi::FIFOQueueingAlgorithm +// senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy -template -prefix_ unsigned senf::ppi::FIFOQueueingAlgorithm::size() - const +template +prefix_ senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy::RegistrationProxy(std::string const & key) { - return queue_.size(); -} + QueueingAlgorithmRegistry::instance().registerQAlgorithm( key); +}; + /////////////////////////////////////////////////////////////////////////// // senf::ppi::module::PassiveQueueingSocketSink @@ -64,8 +64,7 @@ prefix_ void senf::ppi::module::PassiveQueueingSocketSink::handle(Handle } template -prefix_ senf::ppi::QueueingAlgorithm::PacketType> & -senf::ppi::module::PassiveQueueingSocketSink::qAlgorithm() +prefix_ senf::ppi::QueueingAlgorithm & senf::ppi::module::PassiveQueueingSocketSink::qAlgorithm() { return *qAlgo_; } diff --git a/senf/PPI/QueueingSocketSink.hh b/senf/PPI/QueueingSocketSink.hh index 6c5c15c..83de20f 100644 --- a/senf/PPI/QueueingSocketSink.hh +++ b/senf/PPI/QueueingSocketSink.hh @@ -29,6 +29,7 @@ // Custom includes #include #include "SocketSink.hh" +#include //#include "QueueingSocketSink.mpp" ///////////////////////////////hh.p//////////////////////////////////////// @@ -36,31 +37,94 @@ namespace senf { namespace ppi { - template class QueueingAlgorithm + : private boost::noncopyable { + console::ScopedDirectory dir_; + public: + typedef QueueingAlgorithm * ptr; + virtual ~QueueingAlgorithm() {}; - virtual PacketType dequeue() = 0; - virtual bool enqueue(PacketType const & packet) = 0; - virtual unsigned size() const = 0; - virtual void clear() = 0; + + console::DirectoryNode & consoleDir(); + Packet dequeue(); + bool enqueue(Packet const & packet); + unsigned size(); + void clear(); + + protected: + QueueingAlgorithm(); + + virtual Packet v_dequeue() = 0; + virtual bool v_enqueue(Packet const & packet) = 0; + virtual unsigned v_size() const = 0; + virtual void v_clear() = 0; }; - template - class FIFOQueueingAlgorithm : public QueueingAlgorithm + + namespace detail { + struct QueueingAlgorithmRegistry_EntryBase + { + virtual QueueingAlgorithm::ptr create() const = 0; + }; + + template + struct QueueingAlgorithmRegistry_Entry : QueueingAlgorithmRegistry_EntryBase + { + virtual QueueingAlgorithm::ptr create() const; + }; + } + + class QueueingAlgorithmRegistry + : public senf::singleton { + typedef boost::ptr_map QAlgoMap; + QAlgoMap qAlgoMap_; + + QueueingAlgorithmRegistry() {}; public: - FIFOQueueingAlgorithm(unsigned size); + using senf::singleton::instance; + friend class senf::singleton; - virtual PacketType dequeue(); - virtual bool enqueue(PacketType const & packet); - virtual unsigned size() const; - virtual void clear(); + struct Exception : public senf::Exception { + Exception(std::string const & descr) : senf::Exception(descr) {} + }; - private: - std::queue queue_; - unsigned size_; + template + struct RegistrationProxy { + RegistrationProxy(std::string const & key); + }; + + template + void registerQAlgorithm(std::string key); + + QueueingAlgorithm::ptr createQAlgorithm(std::string const & key) const; + void dump(std::ostream & os) const; + }; + + +# define SENF_PPI_REGISTER_QALGORITHM( key, QAlgorithm ) \ + namespace { \ + senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy \ + BOOST_PP_CAT(qAlgorithmRegistration_, __LINE__)( key); \ + } + + + class FIFOQueueingAlgorithm : public QueueingAlgorithm + { + std::queue queue_; + unsigned max_size_; + + FIFOQueueingAlgorithm(); + + virtual Packet v_dequeue(); + virtual bool v_enqueue(Packet const & packet); + virtual unsigned v_size() const; + virtual void v_clear(); + + public: + static QueueingAlgorithm::ptr create(); }; @@ -80,32 +144,34 @@ namespace module { typedef typename Writer::PacketType PacketType; connector::PassiveInput input; ///< Input connector from which data is received + console::ScopedDirectory > dir; - template - explicit PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm); + explicit PassiveQueueingSocketSink(Handle handle, QueueingAlgorithm::ptr qAlgorithm); Writer & writer(); ///< Access the Writer Handle & handle(); ///< Access handle void handle(Handle handle); ///< Set handle /**< Assigning an empty or in-valid() handle will disable the module until a new valid handle is assigned. */ - QueueingAlgorithm & qAlgorithm(); + QueueingAlgorithm & qAlgorithm(); + void qAlgorithm(QueueingAlgorithm::ptr qAlgorithm); private: void write(); void writable(); void checkThrottle(); + void setQAlgorithm(std::string const & key); Handle handle_; Writer writer_; - boost::scoped_ptr > qAlgo_; + boost::scoped_ptr qAlgo_; IOEvent event_; }; }}} ///////////////////////////////hh.e//////////////////////////////////////// -//#include "QueueingSocketSink.cci" +#include "QueueingSocketSink.cci" #include "QueueingSocketSink.ct" #include "QueueingSocketSink.cti" #endif diff --git a/senf/PPI/QueueingSocketSink.test.cc b/senf/PPI/QueueingSocketSink.test.cc index 180af5b..38cfcff 100644 --- a/senf/PPI/QueueingSocketSink.test.cc +++ b/senf/PPI/QueueingSocketSink.test.cc @@ -82,9 +82,8 @@ SENF_AUTO_UNIT_TEST(passiveQueueingSocketSink) { senf::ConnectedUDPv4ClientSocketHandle outputSocket ( senf::INet4SocketAddress( localhost4str(0))); - ppi::FIFOQueueingAlgorithm queueingAlgorithm ( 100); module::PassiveQueueingSocketSink udpSink ( - outputSocket, queueingAlgorithm); + outputSocket, ppi::FIFOQueueingAlgorithm::create()); udpSink.writer().throttled = false; debug::ActiveSource source; ppi::connect(source, udpSink); diff --git a/senf/Utils/Console/Mainpage.dox b/senf/Utils/Console/Mainpage.dox index 9b25f2e..48341bc 100644 --- a/senf/Utils/Console/Mainpage.dox +++ b/senf/Utils/Console/Mainpage.dox @@ -1085,7 +1085,7 @@ senf::console::ScopedDirectory dir; Test2() : dir(this), var_(0) - { dir.add("var", fty::Variabl(var_) ); } + { dir.add("var", fty::Variable(var_) ); } private: int var_;