\until 44345
The \a inputSocket is listening on port 44344 while the \a outputSocket will send packets to
- port 44345 on localhost. The \a outputSocket uses a ConnectedUDPv4SocketHandle which is
- compatible with the senf::ppi::module::PassiveSocketWriter module.
+ port 44345 on localhost. The \a outputSocket uses the senf::ConnectedUDPv4SocketProtocol which
+ is compatible with the senf::ppi::module::PassiveSocketWriter module.
\until udpWriter
//#include "EventManager.ih"
// Custom includes
+#include "Scheduler/Scheduler.hh"
#define prefix_ inline
///////////////////////////////cci.p///////////////////////////////////////
return manager;
}
-prefix_ senf::ClockService::clock_type senf::ppi::EventManager::eventTime()
+prefix_ senf::ClockService::clock_type senf::ppi::EventManager::now()
+{
+ return Scheduler::instance().eventTime();
+}
+
+prefix_ senf::ClockService::clock_type senf::ppi::EventManager::time()
{
return eventTime_;
}
typename Callback<Descriptor>::type callback,
Descriptor & descriptor);
- ClockService::clock_type eventTime();
+ ClockService::clock_type now();
+ ClockService::clock_type time();
protected:
///////////////////////////////////////////////////////////////////////////
// senf::ppi::EventImplementation<EventType>
+template <class EventType>
+prefix_ senf::ppi::module::Module & senf::ppi::EventImplementation<EventType>::module()
+ const
+{
+ return binding_->module();
+}
+
+template <class EventType>
+prefix_ senf::ppi::EventManager & senf::ppi::EventImplementation<EventType>::manager()
+ const
+{
+ return binding_->manager();
+}
+
////////////////////////////////////////
// protected members
typedef EventType Event;
typedef typename detail::EventArgType<EventType>::type EventArg;
+ module::Module & module() const;
+ EventManager & manager() const;
+
protected:
EventImplementation();
// Custom includes
#include "Scheduler/Scheduler.hh"
+#include "EventManager.hh"
//#include "IntervalTimer.mpp"
#define prefix_
prefix_ void senf::ppi::IntervalTimer::v_enable()
{
- info_.intervalStart = ClockService::now();
+ info_.intervalStart = manager().now();
info_.number = 0;
schedule();
}
moduleManager().unregisterModule(*this);
}
-prefix_ senf::ClockService::clock_type senf::ppi::module::Module::eventTime()
+prefix_ senf::ClockService::clock_type senf::ppi::module::Module::time()
+ const
{
- return eventManager().eventTime();
+ return eventManager().time();
+}
+
+prefix_ senf::ClockService::clock_type senf::ppi::module::Module::now()
+ const
+{
+ return eventManager().now();
}
////////////////////////////////////////
{}
prefix_ senf::ppi::EventManager & senf::ppi::module::Module::eventManager()
+ const
{
return EventManager::instance();
}
prefix_ senf::ppi::ModuleManager & senf::ppi::module::Module::moduleManager()
+ const
{
return ModuleManager::instance();
}
event is signaled
\param[in] descriptor The type of event to register */
- ClockService::clock_type eventTime(); ///< Return timestamp of the currently processing
- ///< event
+ ClockService::clock_type time() const; ///< Return timestamp of the currently processing
+ ///< event
+
+ ClockService::clock_type now() const;
void destroy();
private:
virtual void init();
- EventManager & eventManager();
- ModuleManager & moduleManager();
+ EventManager & eventManager() const;
+ ModuleManager & moduleManager() const;
void registerConnector(connector::Connector & connector);
RouteBase & addRoute(std::auto_ptr<RouteBase> route);
output(senf::DataPacket::create());
}
- using ppi::module::Module::eventTime;
+ using ppi::module::Module::time;
};
}
tester.event.trigger();
BOOST_CHECK_EQUAL( sink.size(), 1u );
- BOOST_CHECK( senf::ClockService::now() - tester.eventTime() < 1000000000L );
+ BOOST_CHECK( senf::ClockService::now() - tester.time() < senf::ClockService::seconds(1) );
}
///////////////////////////////cc.e////////////////////////////////////////
senf::UDPv4ClientSocketHandle outputSocket;
outputSocket.writeto(senf::INet4SocketAddress("localhost:44344"),data);
- senf::Scheduler::instance().timeout(senf::ClockService::now() + 100000000, &timeout);
+ senf::Scheduler::instance().timeout(
+ senf::ClockService::now() + senf::ClockService::milliseconds(100), &timeout);
senf::ppi::run();
BOOST_REQUIRE( ! sink.empty() );
senf::UDPv4ClientSocketHandle inputSocket;
inputSocket.bind(senf::INet4SocketAddress("localhost:44344"));
senf::Scheduler::instance().timeout(
- senf::ClockService::now() + 100000000, &timeout);
+ senf::ClockService::now() + senf::ClockService::milliseconds(100), &timeout);
source.submit(p);
senf::ppi::run();
descriptor_->enabled(false);
}
+prefix_ senf::ppi::EventManager & senf::ppi::detail::EventBindingBase::manager()
+ const
+{
+ return *manager_;
+}
+
+prefix_ senf::ppi::module::Module & senf::ppi::detail::EventBindingBase::module()
+ const
+{
+ return *module_;
+}
+
////////////////////////////////////////
// protected members
//#include "EventBinding.ih"
// Custom includes
+#include "../EventManager.hh"
#define prefix_ inline
///////////////////////////////cti.p///////////////////////////////////////
template <class EventType, class Self>
prefix_ void senf::ppi::detail::EventBindingHelper<EventType,Self>::callback(EventArg event)
{
- callback(event, ClockService::now());
+ callback(event, self().manager().now());
}
////////////////////////////////////////
template <class Self>
prefix_ void senf::ppi::detail::EventBindingHelper<void,Self>::callback()
{
- callback(ClockService::now());
+ callback(self().manager().now());
}
////////////////////////////////////////
{
public:
~EventBindingBase();
+
+ EventManager & manager() const;
+ module::Module & module() const;
protected:
EventBindingBase(EventManager & manager, module::Module & module,
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief ClockService non-inline non-template implementation */
+
+#include "ClockService.hh"
+//#include "ClockService.ih"
+
+// Custom includes
+#include <errno.h>
+#include <signal.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include "Utils/Exception.hh"
+
+//#include "ClockService.mpp"
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+#define CheckErrno(op,args) if (op args < 0) throw SystemException(# op, errno)
+
+struct senf::ClockService::Impl
+{
+ Impl();
+
+ void block();
+ void unblock();
+
+ struct Blocker {
+ Blocker(Impl * i) : impl(i) { impl->block(); }
+ ~Blocker() { impl->unblock(); }
+ Impl * impl;
+ };
+
+ static void timer(int);
+
+ struct sigaction oldaction;
+ struct itimerval olditimer;
+ sigset_t alrm_set;
+};
+
+prefix_ senf::ClockService::Impl::Impl()
+{
+ CheckErrno( sigemptyset, (&alrm_set) );
+ CheckErrno( sigaddset, (&alrm_set, SIGALRM) );
+}
+
+prefix_ void senf::ClockService::Impl::block()
+{
+ CheckErrno( sigprocmask, (SIG_BLOCK, &alrm_set, 0) );
+}
+
+prefix_ void senf::ClockService::Impl::unblock()
+{
+ CheckErrno( sigprocmask, (SIG_UNBLOCK, &alrm_set, 0) );
+}
+
+prefix_ void senf::ClockService::Impl::timer(int)
+{
+ boost::posix_time::ptime time (boost::posix_time::microsec_clock::universal_time());
+ if (ClockService::instance().checkSkew(time))
+ ClockService::instance().clockSkew(
+ time, ClockService::instance().heartbeat_ + boost::posix_time::seconds(
+ ClockService::CheckInterval));
+ ClockService::instance().heartbeat_ = time;
+}
+
+///////////////////////////////////////////////////////////////////////////
+// senf::ClockService
+
+prefix_ senf::ClockService::~ClockService()
+{
+ setitimer(ITIMER_REAL, &impl_->olditimer, 0);
+ sigaction(SIGALRM, &impl_->oldaction, 0);
+}
+
+////////////////////////////////////////
+// private members
+
+prefix_ senf::ClockService::ClockService()
+ : base_ (boost::posix_time::microsec_clock::universal_time()),
+ heartbeat_ (base_), impl_(new ClockService::Impl())
+{
+ struct sigaction action;
+ action.sa_handler = & senf::ClockService::Impl::timer;
+ CheckErrno( sigemptyset, (&action.sa_mask) );
+ action.sa_flags = SA_RESTART;
+ CheckErrno( sigaction, (SIGALRM, &action, &impl_->oldaction) );
+
+ struct itimerval itimer;
+ itimer.it_interval.tv_sec = CheckInterval;
+ itimer.it_interval.tv_usec = 0;
+ itimer.it_value.tv_sec = CheckInterval;
+ itimer.it_value.tv_usec = 0;
+ CheckErrno( setitimer, (ITIMER_REAL, &itimer, &impl_->olditimer) );
+ impl_->unblock();
+}
+
+prefix_ void senf::ClockService::restart_i()
+{
+ impl_->block(); // if any syscall fails, the alarm signal stays blocked which is correct
+ base_ = boost::posix_time::microsec_clock::universal_time();
+ heartbeat_ = base_;
+
+ struct sigaction action;
+ action.sa_handler = & senf::ClockService::Impl::timer;
+ CheckErrno( sigemptyset, (&action.sa_mask) );
+ action.sa_flags = SA_RESTART;
+ CheckErrno( sigaction, (SIGALRM, &action, 0) );
+
+ struct itimerval itimer;
+ itimer.it_interval.tv_sec = CheckInterval;
+ itimer.it_interval.tv_usec = 0;
+ itimer.it_value.tv_sec = CheckInterval;
+ itimer.it_value.tv_usec = 0;
+ CheckErrno( setitimer, (ITIMER_REAL, &itimer, 0) );
+ impl_->unblock();
+}
+
+prefix_ void senf::ClockService::updateSkew(boost::posix_time::ptime time)
+{
+ Impl::Blocker alrmBlocker (impl_.get());
+ struct itimerval itimer;
+ CheckErrno( getitimer, (ITIMER_REAL, &itimer) );
+ clockSkew(time, (heartbeat_
+ + boost::posix_time::seconds(CheckInterval)
+ - boost::posix_time::seconds(itimer.it_value.tv_sec)
+ - boost::posix_time::microseconds(itimer.it_value.tv_usec)));
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+//#include "ClockService.mpp"
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
///////////////////////////////////////////////////////////////////////////
// senf::ClockService
-prefix_ senf::ClockService::~ClockService()
-{
-
-}
-
prefix_ senf::ClockService::clock_type senf::ClockService::now()
{
- return clock(boost::posix_time::microsec_clock::universal_time());
+ // We must make sure to call instance() before fetching the current time since the first call to
+ // instance() will construct the singleton and initialize heartbeat_. If this happens *after*
+ // fetching the current time checkSkew() will detect clock skew since heartbeat_ will be <
+ // current time.
+ return instance().now_i();
}
prefix_ senf::ClockService::abstime_type senf::ClockService::abstime(clock_type clock)
* clock_type( 1000000000UL / boost::posix_time::time_duration::ticks_per_second() );
}
+prefix_ senf::ClockService::clock_type senf::ClockService::nanoseconds(clock_type v)
+{
+ return v;
+}
+
+prefix_ senf::ClockService::clock_type senf::ClockService::microseconds(clock_type v)
+{
+ return nanoseconds(1000*v);
+}
+
+prefix_ senf::ClockService::clock_type senf::ClockService::milliseconds(clock_type v)
+{
+ return microseconds(1000*v);
+}
+
+prefix_ senf::ClockService::clock_type senf::ClockService::seconds(clock_type v)
+{
+ return milliseconds(1000*v);
+}
+
+prefix_ senf::ClockService::clock_type senf::ClockService::minutes(clock_type v)
+{
+ return seconds(60*v);
+}
+
+prefix_ senf::ClockService::clock_type senf::ClockService::hours(clock_type v)
+{
+ return minutes(60*v);
+}
+
+prefix_ senf::ClockService::clock_type senf::ClockService::days(clock_type v)
+{
+ return hours(24*v);
+}
+
+prefix_ void senf::ClockService::restart()
+{
+ instance().restart_i();
+}
+
////////////////////////////////////////
// private members
-prefix_ senf::ClockService::ClockService()
- : base_ (boost::posix_time::microsec_clock::universal_time())
-{}
-
prefix_ senf::ClockService & senf::ClockService::instance()
{
static ClockService instance;
return instance;
}
+prefix_ senf::ClockService::clock_type senf::ClockService::now_i()
+{
+ boost::posix_time::ptime time (boost::posix_time::microsec_clock::universal_time());
+ if (checkSkew(time))
+ updateSkew(time);
+ return clock(time);
+}
+
+prefix_ bool senf::ClockService::checkSkew(boost::posix_time::ptime time)
+{
+ return time < heartbeat_ || (time - heartbeat_) > boost::posix_time::seconds(2*CheckInterval);
+}
+
+prefix_ void senf::ClockService::clockSkew(boost::posix_time::ptime time,
+ boost::posix_time::ptime expected)
+{
+ base_ += (time - expected);
+}
+
///////////////////////////////cci.e///////////////////////////////////////
#undef prefix_
// Custom includes
#include <boost/utility.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/scoped_ptr.hpp>
//#include "ClockService.mpp"
///////////////////////////////hh.p////////////////////////////////////////
namespace senf {
+#ifndef DOXYGEN
+ namespace detail { class ClockServiceTest; }
+#endif
+
// Implementation note:
//
// The clock value is represented as a 64bit unsigned integer number of nanosecods elapsed since
The ClockService provides a highly accurate monotonous clock source based on
gettimeofday(). However, it takes additional precautions to detect clock skew.
-
- \fixme Implement the clock-skew detection
*/
class ClockService
: boost::noncopyable
corresponding clock value.
\see abstime */
+ static clock_type nanoseconds(clock_type v);
+ static clock_type microseconds(clock_type v);
+ static clock_type milliseconds(clock_type v);
+ static clock_type seconds(clock_type v);
+ static clock_type minutes(clock_type v);
+ static clock_type hours(clock_type v);
+ static clock_type days(clock_type v);
+
+ static void restart();
+
protected:
private:
+
ClockService();
static ClockService & instance();
+ clock_type now_i();
+ void restart_i();
+
+ bool checkSkew(boost::posix_time::ptime time);
+ void clockSkew(boost::posix_time::ptime time, boost::posix_time::ptime expected);
+ void updateSkew(boost::posix_time::ptime time);
boost::posix_time::ptime base_;
+ boost::posix_time::ptime heartbeat_;
+
+ struct Impl;
+ boost::scoped_ptr<Impl> impl_;
+
+ friend class Impl;
+#ifndef DOXYGEN
+ friend class senf::detail::ClockServiceTest;
+#endif
};
--- /dev/null
+// $Id$
+//
+// Copyright (C) 2007
+// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS)
+// Kompetenzzentrum fuer Satelitenkommunikation (SatCom)
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief ClockService.test unit tests */
+
+//#include "ClockService.test.hh"
+//#include "ClockService.test.ih"
+
+// Custom includes
+#include "ClockService.hh"
+#include <errno.h>
+
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/test/test_tools.hpp>
+
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+namespace senf {
+namespace detail {
+
+ struct ClockServiceTest
+ {
+ static boost::posix_time::ptime & base()
+ { return senf::ClockService::instance().base_; }
+ static boost::posix_time::ptime & heartbeat()
+ { return senf::ClockService::instance().heartbeat_; }
+ };
+
+}}
+
+namespace {
+
+ bool is_close_clock(senf::ClockService::clock_type a, senf::ClockService::clock_type b,
+ unsigned long delta = 10100000ul)
+ {
+ return (a<b ? b-a : a-b ) < delta;
+ }
+
+ bool is_close_pt(boost::posix_time::ptime a, boost::posix_time::ptime b,
+ boost::posix_time::time_duration delta = boost::posix_time::milliseconds(10) )
+ {
+ return (a<b ? b-a : a-b ) < delta;
+ }
+
+ void delay(unsigned long milliseconds)
+ {
+ struct timespec ts;
+ ts.tv_sec = milliseconds / 1000;
+ ts.tv_nsec = (milliseconds % 1000) * 1000000;
+ while (nanosleep(&ts,&ts) < 0 && errno == EINTR) ;
+ }
+
+}
+
+BOOST_AUTO_UNIT_TEST(clockService)
+{
+ senf::ClockService::restart(); // So we know, when the signal will be delivered
+
+ senf::ClockService::clock_type t (senf::ClockService::now());
+ delay(100);
+ BOOST_CHECK_PREDICATE( is_close_clock,
+ (t + senf::ClockService::milliseconds(100))
+ (senf::ClockService::now()) );
+
+ // We shift both heartbeat() and base() back 1 minute. This is the same as
+ // moving the current time forward 1 minute.
+ boost::posix_time::ptime b (senf::detail::ClockServiceTest::base());
+ boost::posix_time::ptime h (senf::detail::ClockServiceTest::heartbeat());
+ senf::detail::ClockServiceTest::heartbeat() -= boost::posix_time::minutes(1);
+ senf::detail::ClockServiceTest::base() -= boost::posix_time::minutes(1);
+
+ // Wait for SIGALRM and let the signal handler do the clock-skew detection
+ delay(senf::ClockService::CheckInterval*1000);
+
+ BOOST_CHECK_PREDICATE( is_close_pt,
+ (b)
+ (senf::detail::ClockServiceTest::base()) );
+ BOOST_CHECK_PREDICATE( is_close_pt,
+ (h+boost::posix_time::seconds(senf::ClockService::CheckInterval))
+ (senf::detail::ClockServiceTest::heartbeat()) );
+
+ BOOST_CHECK_PREDICATE( is_close_clock,
+ (t + senf::ClockService::milliseconds(1100))
+ (senf::ClockService::now()) );
+
+ senf::detail::ClockServiceTest::heartbeat() -= boost::posix_time::minutes(1);
+ senf::detail::ClockServiceTest::base() -= boost::posix_time::minutes(1);
+
+ // Let now() do the clock skew detection using getitimer() ...
+ delay(100);
+ BOOST_CHECK_PREDICATE( is_close_clock,
+ (t + senf::ClockService::milliseconds(1200))
+ (senf::ClockService::now()) );
+
+}
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u test"
+// End:
///////////////////////////////cc.p////////////////////////////////////////
prefix_ senf::Scheduler::Scheduler()
- : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false)
+ : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false),
+ eventTime_(0)
{
if (epollFd_<0)
throw SystemException(errno);
prefix_ void senf::Scheduler::process()
{
terminate_ = false;
+ eventTime_ = ClockService::now();
while (! terminate_) {
- ClockService::clock_type timeNow = ClockService::now();
-
- while ( ! timerQueue_.empty() && timerQueue_.top()->second.timeout <= timeNow ) {
+ while ( ! timerQueue_.empty() && timerQueue_.top()->second.timeout <= eventTime_ ) {
TimerMap::iterator i (timerQueue_.top());
if (! i->second.canceled)
i->second.cb();
int timeout (MinTimeout);
if (! timerQueue_.empty()) {
ClockService::clock_type delta (
- (timerQueue_.top()->second.timeout - timeNow)/1000000UL);
+ (timerQueue_.top()->second.timeout - eventTime_)/1000000UL);
if (delta<MinTimeout)
timeout = int(delta);
}
if (events<0)
// 'man epoll' says, epoll will not return with EINTR.
throw SystemException(errno);
+
+ /// \fixme Fix unneeded timer delays
+ // Hmm ... I remember, I purposely moved the timeout-handlers to the loop top ... but why?
+ // This delays possible time-critical handlers even further ...
+
+ eventTime_ = ClockService::now();
if (events==0)
// Timeout .. the handler will be run when going back to the loop top
continue;
terminate_ = true;
}
+prefix_ senf::ClockService::clock_type senf::Scheduler::eventTime()
+ const
+{
+ return eventTime_;
+}
+
prefix_ int senf::retrieve_filehandle(int fd)
{
return fd;
TimerMap timerMap_;
int epollFd_;
bool terminate_;
+ ClockService::clock_type eventTime_;
};
/** \brief Default file descriptor accessor
BOOST_CHECK_EQUAL( buffer, "READ" );
BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(
- ClockService::now()+100000000UL,&timeout) );
+ ClockService::now()+ClockService::milliseconds(100),&timeout) );
BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(
- ClockService::now()+200000000UL,&timeout) );
+ ClockService::now()+ClockService::milliseconds(200),&timeout) );
ClockService::clock_type t (ClockService::now());
BOOST_CHECK_NO_THROW( Scheduler::instance().process() );
- BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+100000000UL) );
+ BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+ClockService::milliseconds(100)) );
BOOST_CHECK_NO_THROW( Scheduler::instance().process() );
- BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+200000000UL) );
+ BOOST_CHECK_PREDICATE( is_close, (ClockService::now()) (t+ClockService::milliseconds(200)) );
HandleWrapper handle(sock,"TheTag");
BOOST_CHECK_NO_THROW( Scheduler::instance().add(handle,&handleCallback,Scheduler::EV_WRITE) );