--- /dev/null
+// $Id$
+//
+// Copyright (C) 2010
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+// Thorsten Horstmann <tho@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+//
+
+/** \file
+ \brief QueueingSocketSink non-inline non-template implementation */
+
+#include "QueueingSocketSink.hh"
+//#include "QueueingSocketSink.ih"
+
+// Custom includes
+
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+SENF_PPI_REGISTER_QALGORITHM( "FIFOQueueingAlgorithm", senf::ppi::FIFOQueueingAlgorithm);
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingAlgorithm
+
+prefix_ senf::ppi::QueueingAlgorithm::QueueingAlgorithm()
+ : dir_(this)
+{
+ namespace fty = console::factory;
+ dir_.add("size", fty::Command( &QueueingAlgorithm::size, this));
+ dir_.add("clear", fty::Command( &QueueingAlgorithm::clear, this));
+}
+
+prefix_ senf::console::DirectoryNode & senf::ppi::QueueingAlgorithm::consoleDir()
+{
+ return dir_;
+}
+
+/////////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingAlgorithmRegistry
+
+prefix_ void senf::ppi::QueueingAlgorithmRegistry::dump(std::ostream & os)
+ const
+{
+ for (QAlgoMap::const_iterator i = qAlgoMap_.begin(); i != qAlgoMap_.end(); ++i) {
+ os << i->first << std::endl;
+ }
+}
+
+prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::QueueingAlgorithmRegistry::createQAlgorithm(std::string const & key)
+ const
+{
+ QAlgoMap::const_iterator i (qAlgoMap_.find( key));
+ if (i != qAlgoMap_.end())
+ return i->second->create();
+ else
+ throw Exception("QueueingAlgorithm not registered: ") << key;
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::FIFOQueueingAlgorithm
+
+prefix_ senf::ppi::FIFOQueueingAlgorithm::FIFOQueueingAlgorithm()
+ : max_size_( 64)
+{
+ consoleDir().add("max-size", console::factory::Variable(max_size_) );
+}
+
+prefix_ senf::Packet senf::ppi::FIFOQueueingAlgorithm::v_dequeue()
+{
+ if (queue_.size() > 0) {
+ Packet const & p = queue_.front();
+ queue_.pop();
+ return p;
+ }
+ return Packet();
+}
+
+prefix_ bool senf::ppi::FIFOQueueingAlgorithm::v_enqueue(Packet const & packet)
+{
+ if (queue_.size() == max_size_)
+ queue_.pop();
+ queue_.push( packet);
+ return true;
+}
+
+prefix_ void senf::ppi::FIFOQueueingAlgorithm::v_clear()
+{
+ while (! queue_.empty())
+ queue_.pop();
+}
+
+prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::FIFOQueueingAlgorithm::create()
+{
+ return new FIFOQueueingAlgorithm();
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2010
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+// Thorsten Horstmann <tho@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief QueueingSocketSink inline non-template implementation */
+
+//#include "QueueingSocketSink.ih"
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::QueueingAlgorithm
+
+prefix_ unsigned senf::ppi::QueueingAlgorithm::size()
+{
+ return v_size();
+}
+
+prefix_ void senf::ppi::QueueingAlgorithm::clear()
+{
+ v_clear();
+}
+
+prefix_ senf::Packet senf::ppi::QueueingAlgorithm::dequeue()
+{
+ return v_dequeue();
+}
+
+prefix_ bool senf::ppi::QueueingAlgorithm::enqueue(Packet const & packet)
+{
+ return v_enqueue( packet);
+}
+
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::FIFOQueueingAlgorithm
+
+prefix_ unsigned senf::ppi::FIFOQueueingAlgorithm::v_size()
+ const
+{
+ return queue_.size();
+}
+
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
///////////////////////////////ct.p////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::FIFOQueueingAlgorithm<PacketType>
+// senf::ppi::QueueingAlgorithmRegistry
-template <typename PacketType>
-prefix_ senf::ppi::FIFOQueueingAlgorithm<PacketType>::FIFOQueueingAlgorithm(unsigned size)
- : size_( size)
-{ }
-
-template <typename PacketType>
-prefix_ PacketType senf::ppi::FIFOQueueingAlgorithm<PacketType>::dequeue()
+template <class QAlgorithm>
+prefix_ void senf::ppi::QueueingAlgorithmRegistry::registerQAlgorithm(std::string key)
{
- if (queue_.size() > 0) {
- PacketType const & p = queue_.front();
- queue_.pop();
- return p;
- }
- return PacketType();
+ if (qAlgoMap_.find( key) == qAlgoMap_.end() )
+ qAlgoMap_.insert(key, new detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>() );
+ else
+ throw Exception("Duplicated QAlgorithm Registration ") << key;
}
-template <typename PacketType>
-prefix_ bool senf::ppi::FIFOQueueingAlgorithm<PacketType>::enqueue(PacketType const & packet)
-{
- if (queue_.size() < size_) {
- queue_.push( packet);
- return true;
- }
- return false;
-}
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>
-template <typename PacketType>
-prefix_ void senf::ppi::FIFOQueueingAlgorithm<PacketType>::clear()
+template <class QAlgorithm>
+prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>::create()
+ const
{
- while (! queue_.empty())
- queue_.pop();
+ return QAlgorithm::create();
}
///////////////////////////////////////////////////////////////////////////
// senf::ppi::module::PassiveQueueingSocketSink<Writer>
template <class Writer>
-template <class QAlgorithm>
-prefix_ senf::ppi::module::PassiveQueueingSocketSink<Writer>::PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm)
- : handle_( handle), writer_( ),
- qAlgo_( new QAlgorithm(qAlgorithm)),
+prefix_ senf::ppi::module::PassiveQueueingSocketSink<Writer>::PassiveQueueingSocketSink(Handle handle, QueueingAlgorithm::ptr qAlgorithm)
+ : dir( this),
+ handle_( handle), writer_( ),
+ qAlgo_( qAlgorithm),
event_( handle_, IOEvent::Write)
{
+ namespace fty = console::factory;
+ dir.add( "active", qAlgo_->consoleDir());
+ dir.add( "set", fty::Command(
+ &PassiveQueueingSocketSink<Writer>::setQAlgorithm, this) );
+ dir.add( "list", fty::Command(
+ &QueueingAlgorithmRegistry::dump, &QueueingAlgorithmRegistry::instance()));
registerEvent( event_, &PassiveQueueingSocketSink::writable );
event_.enabled( false);
noroute(input);
input.throttle();
}
+template <class Writer>
+prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::qAlgorithm(QueueingAlgorithm::ptr qAlgorithm)
+{
+// dir.remove( "active");
+ qAlgo_.reset( qAlgorithm);
+ dir.add( "active", qAlgo_->consoleDir());
+ if (event_.enabled())
+ event_.enabled( false);
+}
+
+template <class Writer>
+prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::setQAlgorithm(std::string const & key)
+{
+ qAlgorithm( QueueingAlgorithmRegistry::instance().createQAlgorithm( key));
+}
+
///////////////////////////////ct.e////////////////////////////////////////
#undef prefix_
///////////////////////////////cti.p///////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
-// senf::ppi::FIFOQueueingAlgorithm<PacketType>
+// senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm>
-template <typename PacketType>
-prefix_ unsigned senf::ppi::FIFOQueueingAlgorithm<PacketType>::size()
- const
+template <class QAlgorithm>
+prefix_ senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm>::RegistrationProxy(std::string const & key)
{
- return queue_.size();
-}
+ QueueingAlgorithmRegistry::instance().registerQAlgorithm<QAlgorithm>( key);
+};
+
///////////////////////////////////////////////////////////////////////////
// senf::ppi::module::PassiveQueueingSocketSink<Writer>
}
template <class Writer>
-prefix_ senf::ppi::QueueingAlgorithm<typename senf::ppi::module::PassiveQueueingSocketSink<Writer>::PacketType> &
-senf::ppi::module::PassiveQueueingSocketSink<Writer>::qAlgorithm()
+prefix_ senf::ppi::QueueingAlgorithm & senf::ppi::module::PassiveQueueingSocketSink<Writer>::qAlgorithm()
{
return *qAlgo_;
}
// Custom includes
#include <queue>
#include "SocketSink.hh"
+#include <senf/Utils/Console/Console.hh>
//#include "QueueingSocketSink.mpp"
///////////////////////////////hh.p////////////////////////////////////////
namespace senf {
namespace ppi {
- template <typename PacketType=Packet>
class QueueingAlgorithm
+ : private boost::noncopyable
{
+ console::ScopedDirectory<QueueingAlgorithm> dir_;
+
public:
+ typedef QueueingAlgorithm * ptr;
+
virtual ~QueueingAlgorithm() {};
- virtual PacketType dequeue() = 0;
- virtual bool enqueue(PacketType const & packet) = 0;
- virtual unsigned size() const = 0;
- virtual void clear() = 0;
+
+ console::DirectoryNode & consoleDir();
+ Packet dequeue();
+ bool enqueue(Packet const & packet);
+ unsigned size();
+ void clear();
+
+ protected:
+ QueueingAlgorithm();
+
+ virtual Packet v_dequeue() = 0;
+ virtual bool v_enqueue(Packet const & packet) = 0;
+ virtual unsigned v_size() const = 0;
+ virtual void v_clear() = 0;
};
- template <typename PacketType=Packet>
- class FIFOQueueingAlgorithm : public QueueingAlgorithm<PacketType>
+
+ namespace detail {
+ struct QueueingAlgorithmRegistry_EntryBase
+ {
+ virtual QueueingAlgorithm::ptr create() const = 0;
+ };
+
+ template <class QAlgorithm>
+ struct QueueingAlgorithmRegistry_Entry : QueueingAlgorithmRegistry_EntryBase
+ {
+ virtual QueueingAlgorithm::ptr create() const;
+ };
+ }
+
+ class QueueingAlgorithmRegistry
+ : public senf::singleton<QueueingAlgorithmRegistry>
{
+ typedef boost::ptr_map<std::string, detail::QueueingAlgorithmRegistry_EntryBase> QAlgoMap;
+ QAlgoMap qAlgoMap_;
+
+ QueueingAlgorithmRegistry() {};
public:
- FIFOQueueingAlgorithm(unsigned size);
+ using senf::singleton<QueueingAlgorithmRegistry>::instance;
+ friend class senf::singleton<QueueingAlgorithmRegistry>;
- virtual PacketType dequeue();
- virtual bool enqueue(PacketType const & packet);
- virtual unsigned size() const;
- virtual void clear();
+ struct Exception : public senf::Exception {
+ Exception(std::string const & descr) : senf::Exception(descr) {}
+ };
- private:
- std::queue<PacketType> queue_;
- unsigned size_;
+ template <class QAlgorithm>
+ struct RegistrationProxy {
+ RegistrationProxy(std::string const & key);
+ };
+
+ template <class QAlgorithm>
+ void registerQAlgorithm(std::string key);
+
+ QueueingAlgorithm::ptr createQAlgorithm(std::string const & key) const;
+ void dump(std::ostream & os) const;
+ };
+
+
+# define SENF_PPI_REGISTER_QALGORITHM( key, QAlgorithm ) \
+ namespace { \
+ senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm> \
+ BOOST_PP_CAT(qAlgorithmRegistration_, __LINE__)( key); \
+ }
+
+
+ class FIFOQueueingAlgorithm : public QueueingAlgorithm
+ {
+ std::queue<Packet> queue_;
+ unsigned max_size_;
+
+ FIFOQueueingAlgorithm();
+
+ virtual Packet v_dequeue();
+ virtual bool v_enqueue(Packet const & packet);
+ virtual unsigned v_size() const;
+ virtual void v_clear();
+
+ public:
+ static QueueingAlgorithm::ptr create();
};
typedef typename Writer::PacketType PacketType;
connector::PassiveInput<PacketType> input; ///< Input connector from which data is received
+ console::ScopedDirectory<PassiveQueueingSocketSink<Writer> > dir;
- template <class QAlgorithm>
- explicit PassiveQueueingSocketSink(Handle handle, QAlgorithm const & qAlgorithm);
+ explicit PassiveQueueingSocketSink(Handle handle, QueueingAlgorithm::ptr qAlgorithm);
Writer & writer(); ///< Access the Writer
Handle & handle(); ///< Access handle
void handle(Handle handle); ///< Set handle
/**< Assigning an empty or in-valid() handle will disable
the module until a new valid handle is assigned. */
- QueueingAlgorithm<PacketType> & qAlgorithm();
+ QueueingAlgorithm & qAlgorithm();
+ void qAlgorithm(QueueingAlgorithm::ptr qAlgorithm);
private:
void write();
void writable();
void checkThrottle();
+ void setQAlgorithm(std::string const & key);
Handle handle_;
Writer writer_;
- boost::scoped_ptr<QueueingAlgorithm<PacketType> > qAlgo_;
+ boost::scoped_ptr<QueueingAlgorithm> qAlgo_;
IOEvent event_;
};
}}}
///////////////////////////////hh.e////////////////////////////////////////
-//#include "QueueingSocketSink.cci"
+#include "QueueingSocketSink.cci"
#include "QueueingSocketSink.ct"
#include "QueueingSocketSink.cti"
#endif
{
senf::ConnectedUDPv4ClientSocketHandle outputSocket (
senf::INet4SocketAddress( localhost4str(0)));
- ppi::FIFOQueueingAlgorithm<TestingConnectedDgramWriter::PacketType> queueingAlgorithm ( 100);
module::PassiveQueueingSocketSink<TestingConnectedDgramWriter> udpSink (
- outputSocket, queueingAlgorithm);
+ outputSocket, ppi::FIFOQueueingAlgorithm::create());
udpSink.writer().throttled = false;
debug::ActiveSource source;
ppi::connect(source, udpSink);
senf::console::ScopedDirectory<Test2> dir;
Test2() : dir(this), var_(0)
- { dir.add("var", fty::Variabl(var_) ); }
+ { dir.add("var", fty::Variable(var_) ); }
private:
int var_;