return manager;
}
-prefix_ boost::posix_time::ptime senf::ppi::EventManager::eventTime()
+prefix_ senf::ClockService::clock_type senf::ppi::EventManager::eventTime()
{
return eventTime_;
}
////////////////////////////////////////
// private members
-prefix_ void senf::ppi::EventManager::eventTime(boost::posix_time::ptime time)
+prefix_ void senf::ppi::EventManager::eventTime(ClockService::clock_type time)
{
eventTime_ = time;
}
#define HH_EventManager_ 1
// Custom includes
-#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
+#include "Scheduler/ClockService.hh"
#include "predecl.hh"
#include "detail/Callback.hh"
#include "detail/EventBinding.hh"
typename Callback<Descriptor>::type callback,
Descriptor & descriptor);
- boost::posix_time::ptime eventTime();
+ ClockService::clock_type eventTime();
protected:
typedef boost::ptr_vector<detail::EventBindingBase> EventRegistrations;
EventRegistrations registrations_;
- void eventTime(boost::posix_time::ptime time);
+ void eventTime(ClockService::clock_type time);
- boost::posix_time::ptime eventTime_;
+ ClockService::clock_type eventTime_;
friend class detail::EventBindingBase;
friend class module::Module;
template <class EventType, class Self>
prefix_ void
senf::ppi::EventImplementationHelper<EventType,Self>::callback(EventArg event,
- boost::posix_time::ptime time)
+ ClockService::clock_type time)
{
binding().callback(event,time);
}
template <class Self>
prefix_ void
-senf::ppi::EventImplementationHelper<void,Self>::callback(boost::posix_time::ptime time)
+senf::ppi::EventImplementationHelper<void,Self>::callback(ClockService::clock_type time)
{
binding().callback(time);
}
// Custom includes
#include <vector>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include "Scheduler/ClockService.hh"
#include "predecl.hh"
//#include "Events.mpp"
protected:
typedef typename detail::EventArgType<EventType>::type EventArg;
- void callback(EventArg event, boost::posix_time::ptime time);
+ void callback(EventArg event, ClockService::clock_type time);
void callback(EventArg event);
private:
class EventImplementationHelper<void,Self>
{
protected:
- void callback(boost::posix_time::ptime time);
+ void callback(ClockService::clock_type time);
void callback();
private:
#define HH_IOEvent_ 1
// Custom includes
-#include "Events.hh"
#include "Scheduler/Scheduler.hh"
+#include "Events.hh"
//#include "IOEvent.mpp"
///////////////////////////////hh.p////////////////////////////////////////
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief IntervalTimer non-inline non-template implementation */
+
+#include "IntervalTimer.hh"
+//#include "IntervalTimer.ih"
+
+// Custom includes
+#include "Scheduler/Scheduler.hh"
+
+//#include "IntervalTimer.mpp"
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::IntervalTimer
+
+////////////////////////////////////////
+// private members
+
+prefix_ void senf::ppi::IntervalTimer::v_enable()
+{
+ info_.intervalStart = ClockService::now();
+ info_.number = 0;
+ schedule();
+}
+
+prefix_ void senf::ppi::IntervalTimer::v_disable()
+{
+ Scheduler::instance().cancelTimeout(id_);
+}
+
+prefix_ void senf::ppi::IntervalTimer::schedule()
+{
+ info_.expected = info_.intervalStart + ( interval_ * (info_.number+1) ) / eventsPerInterval_;
+ id_ = Scheduler::instance().timeout(info_.expected, boost::bind(&IntervalTimer::cb,this));
+}
+
+prefix_ void senf::ppi::IntervalTimer::cb()
+{
+ callback(info_, info_.expected);
+ ++ info_.number;
+ if (info_.number >= eventsPerInterval_) {
+ info_.number = 0;
+ info_.intervalStart += interval_;
+ }
+ schedule();
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+//#include "IntervalTimer.mpp"
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief IntervalTimer inline non-template implementation */
+
+// Custom includes
+
+#define prefix_ inline
+///////////////////////////////cci.p///////////////////////////////////////
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ppi::IntervalTimer
+
+prefix_ senf::ppi::IntervalTimer::IntervalTimer(ClockService::clock_type interval,
+ unsigned eventsPerInterval)
+ : interval_ (interval), eventsPerInterval_ (eventsPerInterval)
+{}
+
+///////////////////////////////cci.e///////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief IntervalTimer public header */
+
+#ifndef HH_IntervalTimer_
+#define HH_IntervalTimer_ 1
+
+// Custom includes
+#include "Scheduler/ClockService.hh"
+#include "Events.hh"
+
+//#include "IntervalTimer.mpp"
+///////////////////////////////hh.p////////////////////////////////////////
+
+namespace senf {
+namespace ppi {
+
+ struct IntervalTimerEventInfo
+ {
+ ClockService::clock_type expected;
+ ClockService::clock_type intervalStart;
+ unsigned number;
+ };
+
+ /** \brief
+ */
+ class IntervalTimer
+ : public EventImplementation<IntervalTimerEventInfo>
+ {
+ public:
+ ///////////////////////////////////////////////////////////////////////////
+ // Types
+
+ ///////////////////////////////////////////////////////////////////////////
+ ///\name Structors and default members
+ ///@{
+
+ explicit IntervalTimer(ClockService::clock_type interval,
+ unsigned eventsPerInterval=1);
+
+ ///@}
+ ///////////////////////////////////////////////////////////////////////////
+
+ protected:
+
+ private:
+ virtual void v_enable();
+ virtual void v_disable();
+
+ void schedule();
+ void cb();
+
+ ClockService::clock_type interval_;
+ unsigned eventsPerInterval_;
+ IntervalTimerEventInfo info_;
+ unsigned id_;
+ };
+
+}}
+
+///////////////////////////////hh.e////////////////////////////////////////
+#include "IntervalTimer.cci"
+//#include "IntervalTimer.ct"
+//#include "IntervalTimer.cti"
+#endif
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
moduleManager().unregisterModule(*this);
}
-prefix_ boost::posix_time::ptime senf::ppi::module::Module::eventTime()
+prefix_ senf::ClockService::clock_type senf::ppi::module::Module::eventTime()
{
return eventManager().eventTime();
}
// Custom includes
#include <vector>
#include <boost/utility.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
+#include "Scheduler/ClockService.hh"
#include "predecl.hh"
//#include "Module.mpp"
event is signaled
\param[in] descriptor The type of event to register */
- boost::posix_time::ptime eventTime(); ///< Return timestamp of the currently processing
- ///< event
+ ClockService::clock_type eventTime(); ///< Return timestamp of the currently processing
+ ///< event
void destroy();
tester.event.trigger();
BOOST_CHECK_EQUAL( sink.size(), 1u );
- BOOST_CHECK_EQUAL( (boost::posix_time::microsec_clock::universal_time() -
- tester.eventTime()).total_seconds(), 0 );
+ BOOST_CHECK( senf::ClockService::now() - tester.eventTime() < 1000000000L );
}
///////////////////////////////cc.e////////////////////////////////////////
senf::UDPv4ClientSocketHandle outputSocket;
outputSocket.writeto(senf::INet4SocketAddress("localhost:44344"),data);
- senf::Scheduler::instance().timeout(100000, &timeout);
+ senf::Scheduler::instance().timeout(senf::ClockService::now() + 100000000, &timeout);
senf::ppi::run();
BOOST_REQUIRE( ! sink.empty() );
senf::UDPv4ClientSocketHandle inputSocket;
inputSocket.bind(senf::INet4SocketAddress("localhost:44344"));
- senf::Scheduler::instance().timeout(100000, &timeout);
+ senf::Scheduler::instance().timeout(
+ senf::ClockService::now() + 100000000, &timeout);
source.submit(p);
senf::ppi::run();
#define prefix_
///////////////////////////////cc.p////////////////////////////////////////
-prefix_ void senf::ppi::detail::EventBindingBase::eventTime(boost::posix_time::ptime time)
+prefix_ void senf::ppi::detail::EventBindingBase::eventTime(ClockService::clock_type time)
{
// It's hard to make this inline because of a circular header dependency ...
manager_->eventTime(time);
template <class EventType, class Self>
prefix_ void
senf::ppi::detail::EventBindingHelper<EventType,Self>::callback(EventArg event,
- boost::posix_time::ptime time)
+ ClockService::clock_type time)
{
self().eventTime(time);
self().callback_(event);
template <class EventType, class Self>
prefix_ void senf::ppi::detail::EventBindingHelper<EventType,Self>::callback(EventArg event)
{
- callback(event, boost::posix_time::microsec_clock::universal_time());
+ callback(event, ClockService::now());
}
////////////////////////////////////////
template <class Self>
prefix_ void
-senf::ppi::detail::EventBindingHelper<void,Self>::callback(boost::posix_time::ptime time)
+senf::ppi::detail::EventBindingHelper<void,Self>::callback(ClockService::clock_type time)
{
self().eventTime(time);
self().callback_();
template <class Self>
prefix_ void senf::ppi::detail::EventBindingHelper<void,Self>::callback()
{
- callback(boost::posix_time::microsec_clock::universal_time());
+ callback(ClockService::now());
}
////////////////////////////////////////
#define HH_EventBinding_ 1
// Custom includes
-#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include "Scheduler/ClockService.hh"
#include "../predecl.hh"
#include "Callback.hh"
EventBindingBase(EventManager & manager, module::Module & module,
EventDescriptor & descriptor);
- void eventTime(boost::posix_time::ptime time);
+ void eventTime(ClockService::clock_type time);
private:
EventManager * manager_;
public:
typedef typename detail::EventArgType<EventType>::type EventArg;
- void callback(EventArg event, boost::posix_time::ptime time);
+ void callback(EventArg event, ClockService::clock_type time);
void callback(EventArg event);
private:
class EventBindingHelper<void,Self>
{
public:
- void callback(boost::posix_time::ptime time);
+ void callback(ClockService::clock_type time);
void callback();
private:
// $Id$
//
-// Copyright (C) 2007
+// Copyright (C) 2007
// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
// Stefan Bund <g0dil@berlios.de>
#include "Socket/Protocols/INet/INetAddressing.hh"
#include "PPI/SocketReader.hh"
#include "PPI/SocketWriter.hh"
+#include "PPI/Module.hh"
+#include "PPI/IntervalTimer.hh"
+#include "PPI/Joins.hh"
+#include "PPI/PassiveQueue.hh"
#include "PPI/Setup.hh"
//#include "ppitest.mpp"
#define prefix_
///////////////////////////////cc.p////////////////////////////////////////
+namespace module = senf::ppi::module;
+namespace connector = senf::ppi::connector;
+namespace ppi = senf::ppi;
+
+namespace {
+
+ class RateFilter
+ : public module::Module
+ {
+ SENF_PPI_MODULE(RateFilter);
+ public:
+
+ connector::ActiveInput input;
+ connector::ActiveOutput output;
+
+ RateFilter(senf::ClockService::clock_type interval) : timer(interval) {
+ route(input,output);
+ route(input,timer);
+ registerEvent(&RateFilter::timeout, timer);
+ }
+
+ private:
+ void timeout() {
+ output(input());
+ }
+
+ ppi::IntervalTimer timer;
+ };
+
+ class CopyPacketGenerator
+ : public module::Module
+ {
+ SENF_PPI_MODULE(CopyPacketGenerator);
+ public:
+
+ connector::PassiveOutput output;
+
+ CopyPacketGenerator(senf::Packet p) : packet(p) {
+ noroute(output);
+ output.onRequest(&CopyPacketGenerator::request);
+ }
+
+ private:
+ void request() {
+ output(packet);
+ }
+
+ senf::Packet packet;
+ };
+}
+
+// Module setup:
+//
+// 'O' = active connector
+// '>' or '<' = input connector
+//
+// [ udpReader ] O--> [ queue ] -->O [ ]
+// [ join ] -->O [ rateFilter] O--> [ udpWriter ]
+// [ generator ] -->O [ ]
+
int main(int argc, char * argv[])
{
- namespace module = senf::ppi::module;
- namespace ppi = senf::ppi;
-
senf::UDPv4ClientSocketHandle inputSocket;
inputSocket.bind(senf::INet4SocketAddress("0.0.0.0:44344"));
- module::ActiveSocketReader<> udpReader (inputSocket);
senf::ConnectedUDPv4ClientSocketHandle outputSocket(
senf::INet4SocketAddress("localhost:44345"));
- module::PassiveSocketWriter<> udpWriter (outputSocket);
-
- ppi::connect(udpReader.output, udpWriter.input);
+
+ module::ActiveSocketReader<> udpReader (inputSocket);
+ module::PassiveQueue queue;
+ CopyPacketGenerator generator (senf::DataPacket::create(std::string("<idle>\n")));
+ module::PriorityJoin join;
+ RateFilter rateFilter (1000000000ul);
+ module::PassiveSocketWriter<> udpWriter (outputSocket);
+
+ ppi::connect( udpReader, queue );
+ ppi::connect( queue, join );
+ ppi::connect( generator, join );
+ ppi::connect( join, rateFilter );
+ ppi::connect( rateFilter, udpWriter );
ppi::run();
-
+
return 0;
}
#define prefix_
///////////////////////////////cc.p////////////////////////////////////////
-prefix_ senf::Scheduler::Scheduler & senf::Scheduler::instance()
-{
- static Scheduler instance;
- return instance;
-}
-
-prefix_ void senf::Scheduler::timeout(ClockService::clock_type timeout, TimerCallback const & cb)
-{
- timerQueue_.push(TimerSpec(ClockService::now()+timeout,cb));
-}
-
prefix_ senf::Scheduler::Scheduler()
- : epollFd_ (epoll_create(EPollInitialSize))
+ : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false)
{
if (epollFd_<0)
throw SystemException(errno);
while (! terminate_) {
ClockService::clock_type timeNow = ClockService::now();
- while ( ! timerQueue_.empty() && timerQueue_.top().timeout <= timeNow ) {
- timerQueue_.top().cb();
+ while ( ! timerQueue_.empty() && timerQueue_.top()->second.timeout <= timeNow ) {
+ TimerMap::iterator i (timerQueue_.top());
+ if (! i->second.canceled)
+ i->second.cb();
+ timerMap_.erase(i);
timerQueue_.pop();
}
int timeout (MinTimeout);
if (! timerQueue_.empty()) {
- ClockService::clock_type delta ((timerQueue_.top().timeout - timeNow)/1000000UL);
+ ClockService::clock_type delta (
+ (timerQueue_.top()->second.timeout - timeNow)/1000000UL);
if (delta<MinTimeout)
timeout = int(delta);
}
#define prefix_ inline
///////////////////////////////cci.p///////////////////////////////////////
+prefix_ senf::Scheduler::Scheduler & senf::Scheduler::instance()
+{
+ static Scheduler instance;
+ return instance;
+}
+
+prefix_ unsigned senf::Scheduler::timeout(ClockService::clock_type timeout,
+ TimerCallback const & cb)
+{
+ TimerMap::iterator i (
+ timerMap_.insert(std::make_pair(timerIdCounter_,
+ TimerSpec(timeout,cb,timerIdCounter_))).first);
+ timerQueue_.push(i);
+ return timerIdCounter_++;
+}
+
+prefix_ void senf::Scheduler::cancelTimeout(unsigned id)
+{
+ TimerMap::iterator i (timerMap_.find(id));
+ if (i != timerMap_.end())
+ i->second.canceled = true;
+}
+
prefix_ void senf::Scheduler::terminate()
{
terminate_ = true;
return fd;
}
+prefix_ senf::Scheduler::TimerSpecCompare::result_type
+senf::Scheduler::TimerSpecCompare::operator()(first_argument_type a, second_argument_type b)
+{
+ return a->second < b->second;
+}
+
///////////////////////////////cci.e///////////////////////////////////////
#undef prefix_
\param[in] eventMask arbitrary combination via '|'
operator of EventId designators. */
- void timeout(ClockService::clock_type timeout, TimerCallback const & cb);
+ unsigned timeout(ClockService::clock_type timeout, TimerCallback const & cb);
///< Add timeout event
/**< \param[in] timeout timeout in nanoseconds
\param[in] cb callback to call after \a timeout
\todo Return some kind of handle/pointer and add
support to update or revoke a timeout */
+ void cancelTimeout(unsigned id);
+
void process(); ///< Event handler main loop
/**< This member must be called at some time to enter the
event handler main loop. Only while this function is
struct TimerSpec
{
TimerSpec() : timeout(), cb() {}
- TimerSpec(ClockService::clock_type timeout_, TimerCallback cb_)
- : timeout(timeout_), cb(cb_) {}
+ TimerSpec(ClockService::clock_type timeout_, TimerCallback cb_, unsigned id_)
+ : timeout(timeout_), cb(cb_), id(id_), canceled(false) {}
bool operator< (TimerSpec const & other) const
{ return timeout > other.timeout; }
ClockService::clock_type timeout;
TimerCallback cb;
+ unsigned id;
+ bool canceled;
};
typedef std::map<int,EventSpec> FdTable;
- typedef std::priority_queue<TimerSpec> TimerQueue;
+ typedef std::map<unsigned,TimerSpec> TimerMap;
+
+ struct TimerSpecCompare
+ {
+ typedef TimerMap::iterator first_argument_type;
+ typedef TimerMap::iterator second_argument_type;
+ typedef bool result_type;
+
+ result_type operator()(first_argument_type a, second_argument_type b);
+ };
+
+ typedef std::priority_queue<TimerMap::iterator, std::vector<TimerMap::iterator>,
+ TimerSpecCompare> TimerQueue;
FdTable fdTable_;
+ unsigned timerIdCounter_;
TimerQueue timerQueue_;
+ TimerMap timerMap_;
int epollFd_;
bool terminate_;
};
buffer[size]=0;
BOOST_CHECK_EQUAL( buffer, "READ" );
- BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(100000000UL,&timeout) );
- BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(200000000UL,&timeout) );
+ BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(
+ ClockService::now()+100000000UL,&timeout) );
+ BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(
+ ClockService::now()+200000000UL,&timeout) );
ClockService::clock_type t (ClockService::now());
BOOST_CHECK_NO_THROW( Scheduler::instance().process() );
BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+100000000UL) );