namespace senf {
namespace ppi {
-
- ///@{
- ///\addtogroup connectors
+namespace connector {
/** \brief Connector baseclass
{
public:
template <class Handler>
- void onThrottle(Handler); ///< Register throttle notification handler
+ void onThrottle(Handler handle); ///< Register throttle notification handler
/**< The handler register here will be called, whenever a
throttle notification comes in. The \a handler argument
is either an arbitrary callable object or it is a
notifications. */
template <class Handler>
- void onUnthrottle(Handler); ///< Register unthrottle notification handler
+ void onUnthrottle(Handler handler); ///< Register unthrottle notification handler
/**< The handler register here will be called, whenever an
unthrottle notification comes in. The \a handler
argument is either an arbitrary callable object or it
PassiveConnector();
~PassiveConnector();
};
+
+ ///@{
+ ///\addtogroup connectors
/** \brief Combination of PassiveConnector and InputConnector
///@}
-}}
+}}}
///////////////////////////////hh.e////////////////////////////////////////
//#include "Conenctors.cci"
\li The PPI is built around reusable \ref modules. Each module is completely independent.
\li Each module has an arbitrary number of \ref connectors, inputs and outputs.
\li The modules are connected to each other using flexible \ref connections.
- \li Data flow throughout the network is governed via flexible automatic or manual \ref throttling.
+ \li Data flow throughout the network is governed via flexible automatic or manual \ref
+ throttling.
\li Modules may register additional external \ref events (file descriptor events or timers).
The PPI thereby builds on the facilities provided by the other components of the SENF
members so they can be accessed from the outside. This is important as we will see below.
\code
- class RateStuffer
- : public senf::ppi::Module
+ class RateStuffer
+ : public senf::ppi::module::Module
{
senf::ppi::IntervalTimer timer_;
public:
- senf::ppi::ActiveInput payload;
- senf::ppi::ActiveInput stuffing;
- senf::ppi::ActiveOutput output;
+ senf::ppi::connector::ActiveInput payload;
+ senf::ppi::connector::ActiveInput stuffing;
+ senf::ppi::connector::ActiveOutput output;
RateStuffer(unsigned packetsPerSecond)
: timer_(1000u, packetsPerSecond)
};
\endcode
- On module instantiation, it will declare it's flow information with <tt>route</tt> (which
- is inherited from <tt>senf::ppi::Module</tt>). Then the module registers an interval timer which
- will fire <tt>packetsPerSecond</tt> times every <tt>1000</tt> milliseconds.
+ On module instantiation, it will declare it's flow information with <tt>route</tt> (which is
+ inherited from <tt>senf::ppi::module::Module</tt>). Then the module registers an interval timer
+ which will fire <tt>packetsPerSecond</tt> times every <tt>1000</tt> milliseconds.
The processing of the module is very simple: Whenever a timer tick arrives a packet is sent. If
the <tt>payload</tt> input is ready (see throttling below), a payload packet is sent, otherwise
\code
class CopyPacketGenerator
- : public senf::ppi::Module
+ : public senf::ppi::module::Module
{
public:
- senf::ppi::PassiveOutput output;
+ senf::ppi::connector::PassiveOutput output;
CopyPacketGenerator(Packet::ptr template)
: template_ (template)
CopyPacketGenerator generator (stuffingPacket);
senf::UDPv4ClientSocketHandle inputSocket (1111);
- senf::ppi::ActiveSocketInput udpInput (inputSocket);
+ senf::ppi::module::ActiveSocketReader udpInput (inputSocket);
senf::UDPv4ClientSocketHandle outputSocket ("2.3.4.5:2222");
- senf::ppi::PassiveSocketOutput udpOutput (outputSocket);
+ senf::ppi::module::PassiveSocketWriter udpOutput (outputSocket);
- senf::ppi::connect(udpInput.output, rateStuffer.payload,
- dynamicModule<PassiveQueue>()
- -> qdisc(ThresholdQueueing(10,5)) );
+ senf::ppi::module::PassiveQueue adaptor;
+
+ senf::ppi::connect(udpInput.output, adaptor.input);
+ senf::ppi::connect(adaptor.output, rateStuffer.payload);
+ adaptor.qdisc(ThresholdQueueing(10,5));
senf::ppi::connect(generator.output, rateStuffer.stuffing);
senf::ppi::connect(rateStuffer.output, udpOutput.input);
<em>forward throttling</em> since it is forwarded along the \e same direction the data
is. Forward throttling notifications are therefore sent towards the output modules.
- Since throttling a passive input may not disable all further packet delivery immediately, any
- passive input contains an input queue. In it's default configuration, the queue will send out
- throttle notifications when it becomes non-empty and unthrottle notifications when it becomes
- empty again. This automatic behavior may however be disabled. This allows a module to collect
- incoming packets in it's input queue before processing a bunch of them in one go.
+ Since throttling a passive input may not disable all further packet delivery immediately, all
+ inputs contains an input queue. In it's default configuration, the queue will send out throttle
+ notifications when it becomes non-empty and unthrottle notifications when it becomes empty
+ again. This automatic behavior may however be disabled. This allows a module to collect incoming
+ packets in it's input queue before processing a bunch of them in one go.
\section events Events
- Modules may register additional events. These external events are very important since the drive
- the PPI framework. Possible event sources are
+ Modules may register additional events. These external events are very important since they
+ drive the PPI framework. Possible event sources are
\li time based events
\li file descriptors.
+ \li internal events (e.g. IdleEvent)
Here some example code implementing the ActiveSocketInput Module:
\code
- class ActiveSocketInput
- : public senf::ppi::Module
+ class ActiveSocketReader
+ : public senf::ppi::module::Module
{
typedef senf::ClientSocketHandle<
senf::MakeSocketPolicy< senf::ReadablePolicy,
- senf::DatagramFramingPolicy > > Socket;
- static PacketParser<senf::DataPacket> defaultParser_;
-
- Socket socket_;
+ senf::DatagramFramingPolicy > > SocketHandle;
+ SocketHandle socket_;
DataParser const & parser_;
senf::ppi:IOSignaler event_;
+ static PacketParser<senf::DataPacket> defaultParser_;
+
public:
- senf::ppi::ActiveOutput output;
+ senf::ppi::connector::ActiveOutput output;
// I hestitate taking parser by const & since a const & can be bound to
// a temporary even though a const & is all we need. The real implementation
// will probably make this a template arg. This simplifies the memory management
// from the users pov.
- ActiveSocketInput(Socket socket, DataParser & parser = SocketInput::defaultParser_)
+ ActiveSocketReader(SocketHandle socket,
+ DataParser & parser = ActiveSocketReader::defaultParser_)
: socket_ (socket),
parser_ (parser)
event_ (socket, senf::ppi::IOSignaler::Read)
{
- registerEvent( &ActiveSocketInput::data, event_ );
+ registerEvent( &ActiveSocketReader::data, event_ );
route(event_, output);
}
Processing arriving packets happens in the \c data() member: This member simple reads a packet
from the socket. It passes this packet to the \c parser_ and sends the generated packet out.
- \note Open Issues
- \li Exception handling. It would be great to have a sane default exception handling freeing us
- from most manual work. However, I don't think this is feasible.
+ \section flows Information Flow
+
+ The above description conceptually introduces three different flow levels:
+
+ \li The <em>data flow</em> is, where the packets are flowing. This flow always goes from output
+ to input connector.
+ \li The <em>execution flow</em> describes the flow of execution from one module to another. This
+ flow always proceeds from active to passive connector.
+ \li The <em>control flow</em> is the flow of throttling notifications. This flow always proceeds
+ \e opposite to the execution flow, from passive to active connector.
+
+ This is the outside view, from without any module. These flows are set up using
+ senf::ppi::connect() statements.
+
+ Within a module, the different flow levels are defined differently depending on the type of
+ flow:
+
+ \li The <em>data flow</em> is defined by how data is processed. The different event and
+ connector callbacks will pass packets around and thereby define the data flow
+ \li Likewise, the <em>execution flow</em> is defined parallel to the data flow (however possible
+ in opposite direction) by how the handler of one connector calls other connectors.
+ \li The <em>control flow</em> is set up using senf::ppi::Module::route statements (as long as
+ automatic throttling is used. Manual throttling defines the control flow within the
+ respective callbacks).
+
+ In nearly all cases, these flows will be parallel. Therefore it makes sense to define the \c
+ route statement as defining the 'conceptual data flow' since this is also how control messages
+ should flow (sans the direction, which is defined by the connectors active/passive property).
\see \ref ppi_implementation \n
<a href="http://openfacts.berlios.de/index-en.phtml?title=SENF:_Packet_Processing_Infrastructure">Implementation plan</a>
\section processing Data Processing
- The processing in the PPI is driven by external events. Without external events <em>nothing will
- happen</em>. When an external event is generated, the module called will probably either send or
- receive data from an active connector.
+ The processing in the PPI is driven by events. Without events <em>nothing will happen</em>. When
+ an event is generated, the called module will probably call one of it's active connectors.
Calling an active connector will directly call the handler registered at the connected passive
connector. This way the call and data are handed across the connections until an I/O module will
finally handle the request (by not calling any other connectors).
Throttling is handled in the same way: Throttling a passive connector will call a corresponding
- (internal) method of the connector active connector. This method will call registered handlers
+ (internal) method of the connected active connector. This method will call registered handlers
and will analyze the routing information of the module for other (passive) connectors to call
and throttle. This will again create a call chain which terminates at the I/O modules. An event
which is called to be throttled will disable the event temporarily. Unthrottling works in the
// mode: flyspell
// mode: auto-fill
// End:
+
+// LocalWords: callbacks
namespace senf {
namespace ppi {
+namespace module {
/** \brief Module baseclass
may be a timer event or some type of I/O event on a
file descriptor or socket.
- \param[in] target The handler to call whenever the event
- is signaled
+ \param[in] target The handler to call whenever the
+ event is signaled
\param[in] descriptor The type of event to register */
- boost::posix_time::ptime eventTime(); ///< Return timestamp of the currently processing event
+ boost::posix_time::ptime eventTime(); ///< Return timestamp of the currently processing
+ ///< event
};
- /** \brief Automatically manage dynamic module deallocation
-
- The dynamicModule helper will create a new dynamically managed module instance.
-
- The \a args template parameter is only a placeholder. All arguments to dynamicModule will be
- passed to the Module constructor.
-
- \implementation dynamicModule should just register the Instance in a different way with the
- Infrastructure and return a reference to the new module.
- */
- template <class Module, class Args>
- unspecified dynamicModule(Args args);
-
-
/** \brief Connect compatible connectors
connect() will connect two compatible connectors: One connector must be active, the other
template <class Source, class Target>
void connect(Source const & source, Target const & target);
- /** \brief Connect connectors via an adaptor module
-
- This connect() overload will insert an additional adaptor module into the connection. The
- Adaptor module must have two connectors, \a input and \a output. The call will setup the
- connections \a source to \a input and \a output to \a target. Each connector pair must be
- compatible.
- */
- template <class Source, class Target, class Adaptor)
- Adaptor const & connect(Source const & source, Target const & target,
- Adaptor const & adaptor);
-
-}}
+}}}
///////////////////////////////hh.e////////////////////////////////////////
//#include "Module.cci"
--- /dev/null
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief PassiveQueue public header */
+
+#ifndef HH_PassiveQueue_
+#define HH_PassiveQueue_ 1
+
+// Custom includes
+
+//#include "PassiveQueue.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+namespace module {
+
+ /** \brief Adaptor to connect active a pair of active connectors.
+
+ This adaptor queue is used to connect two active connectors to each other. The queue
+ receives data in it's passive input and places them in it's queue. Whenever data is
+ requested from the passive output, a packet is dequeued.
+
+ The PassiveQueue will automatically throttle in both directions. Throttling on the input
+ connector is the standard throttling as implemented in connector::PassiveInput. Additional,
+ forward throttling notifications are sent out whenever the queue is empty.
+ */
+ class PassiveQueue
+ {
+ public:
+ connector::PassiveInput input;
+ connector::PassiveOutput output;
+
+ PassiveQueue();
+
+ void qdisc(QueueingDiscipline const & disc); ///< Change the queueing discipline
+ /**< This call changs the queueing discipline of the queue. This call is just forwarded to
+ the \a input connector.
+
+ \see connector::PassiveInput::qdisc() */
+ };
+
+}}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+//#include "PassiveQueue.cci"
+//#include "PassiveQueue.ct"
+//#include "PassiveQueue.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// End:
QueueingDiscipline is called whenever the packets are entered or removed from the queue. The
queueing discipline then determines the new throttling state of the queue.
- \important The QueueingDiscipline will \e never drop packets explicitly. This is left to the
+ \note The QueueingDiscipline will \e never drop packets explicitly. This is left to the
operating system by sending throttling events. The PPI will never loose a packet internally
(if not a module explicitly does so), however it may disable reception of new incoming
- packets which will then probably dropped by the operating system.
+ packets which will then probably be dropped by the operating system.
*/
class QueueingDiscipline
{
be disabled by setting the authoThrottling state to \c
false.
- This member only exists if
- \li \a Source or \a Target is an event
- \li one of \a Source and \a Target is an active
- connector and the other is a passive connector.
-
Routing from/to an event to/from a passive connector
will automatically create throttling notifications on
the connector whenever the event is disabled. Routing
--- /dev/null
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief SocketReader public header */
+
+#ifndef HH_SocketReader_
+#define HH_SocketReader_ 1
+
+// Custom includes
+#include "Packets/Packet.hh"
+#include "Packets/DataPacket.hh"
+#include "Module.hh"
+#include "Connector.hh"
+
+//#include "SocketReader.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+
+ /** \brief Read helper for module::ActiveSocketReader
+
+ This read helper will read a datagram from a datagram socket. This datagram will then be
+ interpreted as a packet of type \a Packet as defined in the packet library. \a Packet
+ defaults to \ref DataPacket, which will place the data uninterpreted into a packet data
+ structure.
+ */
+ template <class Packet=DataPacket>
+ class PacketReader
+ {
+ public:
+ typedef senf::ClientSocketHandle<
+ senf::MakeSocketPolicy< senf::ReadablePolicy,
+ senf::DatagramFramingPolicy > > Handle;
+ ///< Handle type supported by this reader
+
+ Packet::ptr operator()(Handle handle);
+ ///< Read packet from \a handle
+ /**< Read a datagram from \a handle and interpret is as
+ packet of type \c Packet.
+ \param[in] handle Handle to read data from
+ \returns Pointer to new packet instance or 0, if no
+ packet could be read */
+ };
+
+}}
+
+namespace senf {
+namespace ppi {
+namespace module {
+
+ /** \brief Input module reading data from an arbitrary FileHandle
+
+ This input module will read data from a FileHandle object and parse the data according to
+ the \a Reader.
+
+ The \a Reader must fulfill the following interface:
+ \code
+ class SomeReader
+ {
+ public:
+ typedef unspecified_type Handle; // type of handle reqeusted
+ SomeReader(); // default constructible
+ Packet::ptr operator()(Handle handle); // extraction function
+ };
+ \endcode
+ */
+ template <class Reader=PacketReader<> >
+ class ActiveSocketReader : public Module
+ {
+ public:
+ typedef typename Reader::Handle Handle; ///< Handle type requested by the reader
+
+ connector::ActiveOutput output; ///< Output connector to which the data recevied is writtten
+
+ ActiveSocketReader(Handle handle); ///< Create new reader for the given handle
+ /**< Data will be read from \a handle and be parsed by \a
+ Reader.
+ \param[in] handle Handle to read data from */
+ };
+
+}}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+//#include "SocketReader.cci"
+//#include "SocketReader.ct"
+//#include "SocketReader.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// End:
--- /dev/null
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief SocketWriter public header */
+
+#ifndef HH_SocketWriter_
+#define HH_SocketWriter_ 1
+
+// Custom includes
+#include "Packets/Packet.hh"
+#include "Module.hh"
+#include "Connector.hh"
+
+//#include "SocketWriter.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+
+ /** \brief Write helper for module::ActiveSocketWriter / module::PassiveSocketWriter
+
+ This write helper will write the packets completely as datagrmas to the given socket.
+ */
+ class PacketWriter
+ {
+ public:
+ typedef senf::ClientSocketHandle<
+ senf::MakeSocketPolicy< senf::WriteablePolicy,
+ senf::DatagramFramingPolicy > > Handle;
+ ///< Handle type supported by this writer
+
+ void operator()(Handle handle, Packet::ptr packet);
+ ///< Write \a packet to \a handle
+ /**< Write the complete \a packet as a datagram to \a
+ handle.
+ \param[in] handle Handle to write data to
+ \param[in] packet Packet to write */
+ };
+
+}}
+
+namespace senf {
+namespace ppi {
+namespace module {
+
+ /** \brief Output module writing data to arbitrary FileHandle
+
+ This output module will write data to a FileHandle object using a given \a Writer. This
+ output module is active. This requires the file handle to be able to signal its readiness to
+ accept more data via the Scheduler.
+
+ The \a Writer must fulfill the following interface:
+ \code
+ class SomeWriter
+ {
+ public:
+ typedef unspecified Handle; // type of handle requested
+ SomeWriter(); // default constructible
+ void operator()(Handle handle, Packet::ptr packet); // insertion function
+ };
+ \endcode
+ */
+ template <class Writer=PacketWriter>
+ class ActiveSocketWriter : public Module
+ {
+ public:
+ typedef typename Writer:Handle Handle; ///< Handle type requested by writer
+
+ connector::ActiveInput input; ///< Input connector from which data is received
+
+ ActiveSocketWriter(Handle handle); ///< Create new writer for the given handle
+ /**< Data will be written to \a handle using \a Writer.
+ \param[in] handle Handle to write data to */
+ };
+
+ /** \brief Output module writing data to arbitrary FileHandle
+
+ This output module will write data to a FileHandle object using a given \a Writer. This
+ output module is passive. This implies, that the output handle may not block. This also
+ implies, that data will probably get lost if written to fast for the underlying transport
+ mechanism. Either this is desired (like for a UDP socket) or some additional bandwidth
+ shaping needs to be used.
+
+ The \a Writer must fulfill the following interface:
+ \code
+ class SomeWriter
+ {
+ public:
+ typedef unspecified Handle; // type of handle requested
+ SomeWriter(); // default constructible
+ void operator()(Handle handle, Packet::ptr packet); // insertion function
+ };
+ \endcode
+ */
+ template <class Writer=PacketWriter>
+ class PassiveSocketWriter : public Module
+ {
+ public:
+ typedef typename Writer:Handle Handle; ///< Handle type requested by writer
+
+ connector::PassiveInput input; ///< Input connector from which data is received
+
+ ActiveSocketWriter(Handle handle); ///< Create new writer for the given handle
+ /**< Data will be written to \a handle using \a Writer.
+ \param[in] handle Handle to write data to */
+ };
+
+}}}
+
+
+///////////////////////////////hh.e////////////////////////////////////////
+//#include "SocketWriter.cci"
+//#include "SocketWriter.ct"
+//#include "SocketWriter.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// End: