PPI: PassiveQueueingSocketSink: added registry for QueueingAlgorithms
tho [Wed, 7 Jul 2010 12:03:47 +0000 (12:03 +0000)]
git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@1644 270642c3-0616-0410-b53a-bc976706d245

senf/PPI/QueueingSocketSink.cc [new file with mode: 0644]
senf/PPI/QueueingSocketSink.cci [new file with mode: 0644]
senf/PPI/QueueingSocketSink.ct
senf/PPI/QueueingSocketSink.cti
senf/PPI/QueueingSocketSink.hh
senf/PPI/QueueingSocketSink.test.cc
senf/Utils/Console/Mainpage.dox

diff --git a/senf/PPI/QueueingSocketSink.cc b/senf/PPI/QueueingSocketSink.cc
new file mode 100644 (file)
index 0000000..4267bb8
--- /dev/null
@@ -0,0 +1,113 @@
+// $Id$
+//
+// Copyright (C) 2010
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+//     Thorsten Horstmann <tho@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+//
+
+/** \file
+    \brief QueueingSocketSink non-inline non-template implementation */
+
+#include "QueueingSocketSink.hh"
+//#include "QueueingSocketSink.ih"
+
+// Custom includes
+
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+SENF_PPI_REGISTER_QALGORITHM( "FIFOQueueingAlgorithm", senf::ppi::FIFOQueueingAlgorithm);
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingAlgorithm
+
+prefix_ senf::ppi::QueueingAlgorithm::QueueingAlgorithm()
+    : dir_(this)
+{
+    namespace fty = console::factory;
+    dir_.add("size", fty::Command( &QueueingAlgorithm::size, this));
+    dir_.add("clear", fty::Command( &QueueingAlgorithm::clear, this));
+}
+
+prefix_ senf::console::DirectoryNode & senf::ppi::QueueingAlgorithm::consoleDir()
+{
+    return dir_;
+}
+
+/////////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingAlgorithmRegistry
+
+prefix_ void senf::ppi::QueueingAlgorithmRegistry::dump(std::ostream & os)
+    const
+{
+    for (QAlgoMap::const_iterator i = qAlgoMap_.begin(); i != qAlgoMap_.end(); ++i) {
+        os << i->first << std::endl;
+    }
+}
+
+prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::QueueingAlgorithmRegistry::createQAlgorithm(std::string const & key)
+    const
+{
+    QAlgoMap::const_iterator i (qAlgoMap_.find( key));
+    if (i != qAlgoMap_.end())
+        return i->second->create();
+    else
+        throw Exception("QueueingAlgorithm not registered: ") << key;
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::FIFOQueueingAlgorithm
+
+prefix_ senf::ppi::FIFOQueueingAlgorithm::FIFOQueueingAlgorithm()
+    : max_size_( 64)
+{
+    consoleDir().add("max-size", console::factory::Variable(max_size_) );
+}
+
+prefix_ senf::Packet senf::ppi::FIFOQueueingAlgorithm::v_dequeue()
+{
+    if (queue_.size() > 0) {
+        Packet const & p = queue_.front();
+        queue_.pop();
+        return p;
+    }
+    return Packet();
+}
+
+prefix_ bool senf::ppi::FIFOQueueingAlgorithm::v_enqueue(Packet const & packet)
+{
+    if (queue_.size() == max_size_)
+        queue_.pop();
+    queue_.push( packet);
+    return true;
+}
+
+prefix_ void senf::ppi::FIFOQueueingAlgorithm::v_clear()
+{
+    while (! queue_.empty())
+        queue_.pop();
+}
+
+prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::FIFOQueueingAlgorithm::create()
+{
+    return new FIFOQueueingAlgorithm();
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
diff --git a/senf/PPI/QueueingSocketSink.cci b/senf/PPI/QueueingSocketSink.cci
new file mode 100644 (file)
index 0000000..965302c
--- /dev/null
@@ -0,0 +1,79 @@
+// $Id$
+//
+// Copyright (C) 2010
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+//     Thorsten Horstmann <tho@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+
+/** \file
+    \brief QueueingSocketSink inline non-template implementation */
+
+//#include "QueueingSocketSink.ih"
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingAlgorithm
+
+prefix_ unsigned senf::ppi::QueueingAlgorithm::size()
+{
+    return v_size();
+}
+
+prefix_ void senf::ppi::QueueingAlgorithm::clear()
+{
+    v_clear();
+}
+
+prefix_ senf::Packet senf::ppi::QueueingAlgorithm::dequeue()
+{
+    return v_dequeue();
+}
+
+prefix_ bool senf::ppi::QueueingAlgorithm::enqueue(Packet const & packet)
+{
+    return v_enqueue( packet);
+}
+
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::FIFOQueueingAlgorithm
+
+prefix_ unsigned senf::ppi::FIFOQueueingAlgorithm::v_size()
+    const
+{
+    return queue_.size();
+}
+
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
index 027673d..3ae8cbf 100644 (file)
 ///////////////////////////////ct.p////////////////////////////////////////
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::FIFOQueueingAlgorithm<PacketType>
+// senf::ppi::QueueingAlgorithmRegistry
 
-template <typename PacketType>
-prefix_ senf::ppi::FIFOQueueingAlgorithm<PacketType>::FIFOQueueingAlgorithm(unsigned size)
-    : size_( size)
-{ }
-
-template <typename PacketType>
-prefix_ PacketType senf::ppi::FIFOQueueingAlgorithm<PacketType>::dequeue()
+template <class QAlgorithm>
+prefix_ void senf::ppi::QueueingAlgorithmRegistry::registerQAlgorithm(std::string key)
 {
-    if (queue_.size() > 0) {
-        PacketType const & p = queue_.front();
-        queue_.pop();
-        return p;
-    }
-    return PacketType();
+    if (qAlgoMap_.find( key) == qAlgoMap_.end() )
+        qAlgoMap_.insert(key, new detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>() );
+    else
+        throw Exception("Duplicated QAlgorithm Registration ") << key;
 }
 
-template <typename PacketType>
-prefix_ bool senf::ppi::FIFOQueueingAlgorithm<PacketType>::enqueue(PacketType const & packet)
-{
-    if (queue_.size() < size_) {
-        queue_.push( packet);
-        return true;
-    }
-    return false;
-}
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>
 
-template <typename PacketType>
-prefix_ void senf::ppi::FIFOQueueingAlgorithm<PacketType>::clear()
+template <class QAlgorithm>
+prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>::create()
+    const
 {
-    while (! queue_.empty())
-        queue_.pop();
+    return QAlgorithm::create();
 }
 
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::module::PassiveQueueingSocketSink<Writer>
 
 template <class Writer>
-template <class QAlgorithm>
-prefix_ senf::ppi::module::PassiveQueueingSocketSink<Writer>::PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm)
-    : handle_( handle), writer_( ),
-      qAlgo_( new QAlgorithm(qAlgorithm)),
+prefix_ senf::ppi::module::PassiveQueueingSocketSink<Writer>::PassiveQueueingSocketSink(Handle handle, QueueingAlgorithm::ptr qAlgorithm)
+    : dir( this),
+      handle_( handle), writer_( ),
+      qAlgo_( qAlgorithm),
       event_( handle_, IOEvent::Write)
 {
+    namespace fty = console::factory;
+    dir.add( "active", qAlgo_->consoleDir());
+    dir.add( "set", fty::Command(
+            &PassiveQueueingSocketSink<Writer>::setQAlgorithm, this) );
+    dir.add( "list", fty::Command(
+            &QueueingAlgorithmRegistry::dump, &QueueingAlgorithmRegistry::instance()));
     registerEvent( event_, &PassiveQueueingSocketSink::writable );
     event_.enabled( false);
     noroute(input);
@@ -118,6 +110,22 @@ prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::checkThrottle
         input.throttle();
 }
 
+template <class Writer>
+prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::qAlgorithm(QueueingAlgorithm::ptr qAlgorithm)
+{
+//    dir.remove( "active");
+    qAlgo_.reset( qAlgorithm);
+    dir.add( "active", qAlgo_->consoleDir());
+    if (event_.enabled())
+        event_.enabled( false);
+}
+
+template <class Writer>
+prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::setQAlgorithm(std::string const & key)
+{
+    qAlgorithm( QueueingAlgorithmRegistry::instance().createQAlgorithm( key));
+}
+
 ///////////////////////////////ct.e////////////////////////////////////////
 #undef prefix_
 
index 1fb6bc4..f1ae028 100644 (file)
 ///////////////////////////////cti.p///////////////////////////////////////
 
 ///////////////////////////////////////////////////////////////////////////
-// senf::ppi::FIFOQueueingAlgorithm<PacketType>
+// senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm>
 
-template <typename PacketType>
-prefix_ unsigned senf::ppi::FIFOQueueingAlgorithm<PacketType>::size()
-    const
+template <class QAlgorithm>
+prefix_ senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm>::RegistrationProxy(std::string const & key)
 {
-    return queue_.size();
-}
+    QueueingAlgorithmRegistry::instance().registerQAlgorithm<QAlgorithm>( key);
+};
+
 
 ///////////////////////////////////////////////////////////////////////////
 // senf::ppi::module::PassiveQueueingSocketSink<Writer>
@@ -64,8 +64,7 @@ prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::handle(Handle
 }
 
 template <class Writer>
-prefix_ senf::ppi::QueueingAlgorithm<typename senf::ppi::module::PassiveQueueingSocketSink<Writer>::PacketType> &
-senf::ppi::module::PassiveQueueingSocketSink<Writer>::qAlgorithm()
+prefix_ senf::ppi::QueueingAlgorithm & senf::ppi::module::PassiveQueueingSocketSink<Writer>::qAlgorithm()
 {
     return *qAlgo_;
 }
index 6c5c15c..83de20f 100644 (file)
@@ -29,6 +29,7 @@
 // Custom includes
 #include <queue>
 #include "SocketSink.hh"
+#include <senf/Utils/Console/Console.hh>
 
 //#include "QueueingSocketSink.mpp"
 ///////////////////////////////hh.p////////////////////////////////////////
 namespace senf {
 namespace ppi {
 
-    template <typename PacketType=Packet>
     class QueueingAlgorithm
+        : private boost::noncopyable
     {
+        console::ScopedDirectory<QueueingAlgorithm> dir_;
+
     public:
+        typedef QueueingAlgorithm * ptr;
+
         virtual ~QueueingAlgorithm() {};
-        virtual PacketType dequeue() = 0;
-        virtual bool enqueue(PacketType const & packet) = 0;
-        virtual unsigned size() const = 0;
-        virtual void clear() = 0;
+
+        console::DirectoryNode & consoleDir();
+        Packet dequeue();
+        bool enqueue(Packet const & packet);
+        unsigned size();
+        void clear();
+
+    protected:
+        QueueingAlgorithm();
+
+        virtual Packet v_dequeue() = 0;
+        virtual bool v_enqueue(Packet const & packet) = 0;
+        virtual unsigned v_size() const = 0;
+        virtual void v_clear() = 0;
     };
 
-    template <typename PacketType=Packet>
-    class FIFOQueueingAlgorithm : public QueueingAlgorithm<PacketType>
+
+    namespace detail {
+        struct QueueingAlgorithmRegistry_EntryBase
+        {
+            virtual QueueingAlgorithm::ptr create() const = 0;
+        };
+
+        template <class QAlgorithm>
+        struct QueueingAlgorithmRegistry_Entry : QueueingAlgorithmRegistry_EntryBase
+        {
+            virtual QueueingAlgorithm::ptr create() const;
+        };
+    }
+
+    class QueueingAlgorithmRegistry
+        : public senf::singleton<QueueingAlgorithmRegistry>
     {
+        typedef boost::ptr_map<std::string, detail::QueueingAlgorithmRegistry_EntryBase> QAlgoMap;
+        QAlgoMap qAlgoMap_;
+
+        QueueingAlgorithmRegistry() {};
     public:
-        FIFOQueueingAlgorithm(unsigned size);
+        using senf::singleton<QueueingAlgorithmRegistry>::instance;
+        friend class senf::singleton<QueueingAlgorithmRegistry>;
 
-        virtual PacketType dequeue();
-        virtual bool enqueue(PacketType const & packet);
-        virtual unsigned size() const;
-        virtual void clear();
+        struct Exception : public senf::Exception {
+            Exception(std::string const & descr) : senf::Exception(descr) {}
+        };
 
-    private:
-        std::queue<PacketType> queue_;
-        unsigned size_;
+        template <class QAlgorithm>
+        struct RegistrationProxy {
+            RegistrationProxy(std::string const & key);
+        };
+
+        template <class QAlgorithm>
+        void registerQAlgorithm(std::string key);
+
+        QueueingAlgorithm::ptr createQAlgorithm(std::string const & key) const;
+        void dump(std::ostream & os) const;
+    };
+
+
+#   define SENF_PPI_REGISTER_QALGORITHM( key, QAlgorithm )                          \
+        namespace {                                                                 \
+            senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm>     \
+                BOOST_PP_CAT(qAlgorithmRegistration_, __LINE__)( key);              \
+        }
+
+
+    class FIFOQueueingAlgorithm : public QueueingAlgorithm
+    {
+        std::queue<Packet> queue_;
+        unsigned max_size_;
+
+        FIFOQueueingAlgorithm();
+
+        virtual Packet v_dequeue();
+        virtual bool v_enqueue(Packet const & packet);
+        virtual unsigned v_size() const;
+        virtual void v_clear();
+
+    public:
+        static QueueingAlgorithm::ptr create();
     };
 
 
@@ -80,32 +144,34 @@ namespace module {
         typedef typename Writer::PacketType PacketType;
 
         connector::PassiveInput<PacketType> input; ///< Input connector from which data is received
+        console::ScopedDirectory<PassiveQueueingSocketSink<Writer> > dir;
 
-        template <class QAlgorithm>
-        explicit PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm);
+        explicit PassiveQueueingSocketSink(Handle handle, QueueingAlgorithm::ptr qAlgorithm);
 
         Writer & writer();              ///< Access the Writer
         Handle & handle();              ///< Access handle
         void handle(Handle handle);     ///< Set handle
                                         /**< Assigning an empty or in-valid() handle will disable
                                              the module until a new valid handle is assigned. */
-        QueueingAlgorithm<PacketType> & qAlgorithm();
+        QueueingAlgorithm & qAlgorithm();
+        void qAlgorithm(QueueingAlgorithm::ptr qAlgorithm);
 
     private:
         void write();
         void writable();
         void checkThrottle();
+        void setQAlgorithm(std::string const & key);
 
         Handle handle_;
         Writer writer_;
-        boost::scoped_ptr<QueueingAlgorithm<PacketType> > qAlgo_;
+        boost::scoped_ptr<QueueingAlgorithm> qAlgo_;
         IOEvent event_;
     };
 
 }}}
 
 ///////////////////////////////hh.e////////////////////////////////////////
-//#include "QueueingSocketSink.cci"
+#include "QueueingSocketSink.cci"
 #include "QueueingSocketSink.ct"
 #include "QueueingSocketSink.cti"
 #endif
index 180af5b..38cfcff 100644 (file)
@@ -82,9 +82,8 @@ SENF_AUTO_UNIT_TEST(passiveQueueingSocketSink)
 {
     senf::ConnectedUDPv4ClientSocketHandle outputSocket (
             senf::INet4SocketAddress( localhost4str(0)));
-    ppi::FIFOQueueingAlgorithm<TestingConnectedDgramWriter::PacketType> queueingAlgorithm ( 100);
     module::PassiveQueueingSocketSink<TestingConnectedDgramWriter> udpSink (
-            outputSocket, queueingAlgorithm);
+            outputSocket, ppi::FIFOQueueingAlgorithm::create());
     udpSink.writer().throttled = false;
     debug::ActiveSource source;
     ppi::connect(source, udpSink);
index 9b25f2e..48341bc 100644 (file)
         senf::console::ScopedDirectory<Test2> dir;
 
         Test2() : dir(this), var_(0)
-            { dir.add("var", fty::Variabl(var_) ); }
+            { dir.add("var", fty::Variable(var_) ); }
 
     private:
         int var_;