From: g0dil Date: Tue, 21 Aug 2007 13:59:06 +0000 (+0000) Subject: PPI: Clean up time interface X-Git-Url: http://g0dil.de/git?a=commitdiff_plain;h=3d16600678b948ff3bd0e4fd2a1a800fcc629a03;p=senf.git PPI: Clean up time interface Scheduler: Implement ClockService clockSkew detection / fixup git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@403 270642c3-0616-0410-b53a-bc976706d245 --- diff --git a/Examples/RateStuffer/Mainpage.dox b/Examples/RateStuffer/Mainpage.dox index 9f51da2..85bbd9b 100644 --- a/Examples/RateStuffer/Mainpage.dox +++ b/Examples/RateStuffer/Mainpage.dox @@ -145,8 +145,8 @@ \until 44345 The \a inputSocket is listening on port 44344 while the \a outputSocket will send packets to - port 44345 on localhost. The \a outputSocket uses a ConnectedUDPv4SocketHandle which is - compatible with the senf::ppi::module::PassiveSocketWriter module. + port 44345 on localhost. The \a outputSocket uses the senf::ConnectedUDPv4SocketProtocol which + is compatible with the senf::ppi::module::PassiveSocketWriter module. \until udpWriter diff --git a/PPI/EventManager.cci b/PPI/EventManager.cci index 0829b38..f75fa38 100644 --- a/PPI/EventManager.cci +++ b/PPI/EventManager.cci @@ -26,6 +26,7 @@ //#include "EventManager.ih" // Custom includes +#include "Scheduler/Scheduler.hh" #define prefix_ inline ///////////////////////////////cci.p/////////////////////////////////////// @@ -39,7 +40,12 @@ prefix_ senf::ppi::EventManager & senf::ppi::EventManager::instance() return manager; } -prefix_ senf::ClockService::clock_type senf::ppi::EventManager::eventTime() +prefix_ senf::ClockService::clock_type senf::ppi::EventManager::now() +{ + return Scheduler::instance().eventTime(); +} + +prefix_ senf::ClockService::clock_type senf::ppi::EventManager::time() { return eventTime_; } diff --git a/PPI/EventManager.hh b/PPI/EventManager.hh index 5723de9..2df295f 100644 --- a/PPI/EventManager.hh +++ b/PPI/EventManager.hh @@ -79,7 +79,8 @@ namespace ppi { typename Callback::type callback, Descriptor & descriptor); - ClockService::clock_type eventTime(); + ClockService::clock_type now(); + ClockService::clock_type time(); protected: diff --git a/PPI/Events.cti b/PPI/Events.cti index 4a128b4..b1bd1fc 100644 --- a/PPI/Events.cti +++ b/PPI/Events.cti @@ -95,6 +95,20 @@ senf::ppi::EventImplementationHelper::binding() /////////////////////////////////////////////////////////////////////////// // senf::ppi::EventImplementation +template +prefix_ senf::ppi::module::Module & senf::ppi::EventImplementation::module() + const +{ + return binding_->module(); +} + +template +prefix_ senf::ppi::EventManager & senf::ppi::EventImplementation::manager() + const +{ + return binding_->manager(); +} + //////////////////////////////////////// // protected members diff --git a/PPI/Events.hh b/PPI/Events.hh index 160d1b7..9d518ff 100644 --- a/PPI/Events.hh +++ b/PPI/Events.hh @@ -111,6 +111,9 @@ namespace ppi { typedef EventType Event; typedef typename detail::EventArgType::type EventArg; + module::Module & module() const; + EventManager & manager() const; + protected: EventImplementation(); diff --git a/PPI/IntervalTimer.cc b/PPI/IntervalTimer.cc index ecda76d..766fc16 100644 --- a/PPI/IntervalTimer.cc +++ b/PPI/IntervalTimer.cc @@ -28,6 +28,7 @@ // Custom includes #include "Scheduler/Scheduler.hh" +#include "EventManager.hh" //#include "IntervalTimer.mpp" #define prefix_ @@ -41,7 +42,7 @@ prefix_ void senf::ppi::IntervalTimer::v_enable() { - info_.intervalStart = ClockService::now(); + info_.intervalStart = manager().now(); info_.number = 0; schedule(); } diff --git a/PPI/Module.cci b/PPI/Module.cci index f436070..9a852db 100644 --- a/PPI/Module.cci +++ b/PPI/Module.cci @@ -40,9 +40,16 @@ prefix_ senf::ppi::module::Module::~Module() moduleManager().unregisterModule(*this); } -prefix_ senf::ClockService::clock_type senf::ppi::module::Module::eventTime() +prefix_ senf::ClockService::clock_type senf::ppi::module::Module::time() + const { - return eventManager().eventTime(); + return eventManager().time(); +} + +prefix_ senf::ClockService::clock_type senf::ppi::module::Module::now() + const +{ + return eventManager().now(); } //////////////////////////////////////// @@ -71,11 +78,13 @@ prefix_ void senf::ppi::module::Module::init() {} prefix_ senf::ppi::EventManager & senf::ppi::module::Module::eventManager() + const { return EventManager::instance(); } prefix_ senf::ppi::ModuleManager & senf::ppi::module::Module::moduleManager() + const { return ModuleManager::instance(); } diff --git a/PPI/Module.hh b/PPI/Module.hh index 5ae07e9..24fdf0c 100644 --- a/PPI/Module.hh +++ b/PPI/Module.hh @@ -121,8 +121,10 @@ namespace module { event is signaled \param[in] descriptor The type of event to register */ - ClockService::clock_type eventTime(); ///< Return timestamp of the currently processing - ///< event + ClockService::clock_type time() const; ///< Return timestamp of the currently processing + ///< event + + ClockService::clock_type now() const; void destroy(); @@ -133,8 +135,8 @@ namespace module { private: virtual void init(); - EventManager & eventManager(); - ModuleManager & moduleManager(); + EventManager & eventManager() const; + ModuleManager & moduleManager() const; void registerConnector(connector::Connector & connector); RouteBase & addRoute(std::auto_ptr route); diff --git a/PPI/Module.test.cc b/PPI/Module.test.cc index 7c25134..96d65da 100644 --- a/PPI/Module.test.cc +++ b/PPI/Module.test.cc @@ -60,7 +60,7 @@ namespace { output(senf::DataPacket::create()); } - using ppi::module::Module::eventTime; + using ppi::module::Module::time; }; } @@ -75,7 +75,7 @@ BOOST_AUTO_UNIT_TEST(module) tester.event.trigger(); BOOST_CHECK_EQUAL( sink.size(), 1u ); - BOOST_CHECK( senf::ClockService::now() - tester.eventTime() < 1000000000L ); + BOOST_CHECK( senf::ClockService::now() - tester.time() < senf::ClockService::seconds(1) ); } ///////////////////////////////cc.e//////////////////////////////////////// diff --git a/PPI/SocketReader.test.cc b/PPI/SocketReader.test.cc index 14d283f..ca0631c 100644 --- a/PPI/SocketReader.test.cc +++ b/PPI/SocketReader.test.cc @@ -64,7 +64,8 @@ BOOST_AUTO_UNIT_TEST(socketReader) senf::UDPv4ClientSocketHandle outputSocket; outputSocket.writeto(senf::INet4SocketAddress("localhost:44344"),data); - senf::Scheduler::instance().timeout(senf::ClockService::now() + 100000000, &timeout); + senf::Scheduler::instance().timeout( + senf::ClockService::now() + senf::ClockService::milliseconds(100), &timeout); senf::ppi::run(); BOOST_REQUIRE( ! sink.empty() ); diff --git a/PPI/SocketWriter.test.cc b/PPI/SocketWriter.test.cc index e053d43..63dd2a8 100644 --- a/PPI/SocketWriter.test.cc +++ b/PPI/SocketWriter.test.cc @@ -85,7 +85,7 @@ BOOST_AUTO_UNIT_TEST(activeSocketWriter) senf::UDPv4ClientSocketHandle inputSocket; inputSocket.bind(senf::INet4SocketAddress("localhost:44344")); senf::Scheduler::instance().timeout( - senf::ClockService::now() + 100000000, &timeout); + senf::ClockService::now() + senf::ClockService::milliseconds(100), &timeout); source.submit(p); senf::ppi::run(); diff --git a/PPI/detail/EventBinding.cci b/PPI/detail/EventBinding.cci index 7e27b3a..9a4adc4 100644 --- a/PPI/detail/EventBinding.cci +++ b/PPI/detail/EventBinding.cci @@ -37,6 +37,18 @@ prefix_ senf::ppi::detail::EventBindingBase::~EventBindingBase() descriptor_->enabled(false); } +prefix_ senf::ppi::EventManager & senf::ppi::detail::EventBindingBase::manager() + const +{ + return *manager_; +} + +prefix_ senf::ppi::module::Module & senf::ppi::detail::EventBindingBase::module() + const +{ + return *module_; +} + //////////////////////////////////////// // protected members diff --git a/PPI/detail/EventBinding.cti b/PPI/detail/EventBinding.cti index b59906d..fb50ae4 100644 --- a/PPI/detail/EventBinding.cti +++ b/PPI/detail/EventBinding.cti @@ -26,6 +26,7 @@ //#include "EventBinding.ih" // Custom includes +#include "../EventManager.hh" #define prefix_ inline ///////////////////////////////cti.p/////////////////////////////////////// @@ -45,7 +46,7 @@ senf::ppi::detail::EventBindingHelper::callback(EventArg event, template prefix_ void senf::ppi::detail::EventBindingHelper::callback(EventArg event) { - callback(event, ClockService::now()); + callback(event, self().manager().now()); } //////////////////////////////////////// @@ -71,7 +72,7 @@ senf::ppi::detail::EventBindingHelper::callback(ClockService::clock_t template prefix_ void senf::ppi::detail::EventBindingHelper::callback() { - callback(ClockService::now()); + callback(self().manager().now()); } //////////////////////////////////////// diff --git a/PPI/detail/EventBinding.hh b/PPI/detail/EventBinding.hh index 4698c81..0840b03 100644 --- a/PPI/detail/EventBinding.hh +++ b/PPI/detail/EventBinding.hh @@ -42,6 +42,9 @@ namespace detail { { public: ~EventBindingBase(); + + EventManager & manager() const; + module::Module & module() const; protected: EventBindingBase(EventManager & manager, module::Module & module, diff --git a/Scheduler/ClockService.cc b/Scheduler/ClockService.cc new file mode 100644 index 0000000..010bd20 --- /dev/null +++ b/Scheduler/ClockService.cc @@ -0,0 +1,164 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief ClockService non-inline non-template implementation */ + +#include "ClockService.hh" +//#include "ClockService.ih" + +// Custom includes +#include +#include +#include +#include +#include "Utils/Exception.hh" + +//#include "ClockService.mpp" +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// + +#define CheckErrno(op,args) if (op args < 0) throw SystemException(# op, errno) + +struct senf::ClockService::Impl +{ + Impl(); + + void block(); + void unblock(); + + struct Blocker { + Blocker(Impl * i) : impl(i) { impl->block(); } + ~Blocker() { impl->unblock(); } + Impl * impl; + }; + + static void timer(int); + + struct sigaction oldaction; + struct itimerval olditimer; + sigset_t alrm_set; +}; + +prefix_ senf::ClockService::Impl::Impl() +{ + CheckErrno( sigemptyset, (&alrm_set) ); + CheckErrno( sigaddset, (&alrm_set, SIGALRM) ); +} + +prefix_ void senf::ClockService::Impl::block() +{ + CheckErrno( sigprocmask, (SIG_BLOCK, &alrm_set, 0) ); +} + +prefix_ void senf::ClockService::Impl::unblock() +{ + CheckErrno( sigprocmask, (SIG_UNBLOCK, &alrm_set, 0) ); +} + +prefix_ void senf::ClockService::Impl::timer(int) +{ + boost::posix_time::ptime time (boost::posix_time::microsec_clock::universal_time()); + if (ClockService::instance().checkSkew(time)) + ClockService::instance().clockSkew( + time, ClockService::instance().heartbeat_ + boost::posix_time::seconds( + ClockService::CheckInterval)); + ClockService::instance().heartbeat_ = time; +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ClockService + +prefix_ senf::ClockService::~ClockService() +{ + setitimer(ITIMER_REAL, &impl_->olditimer, 0); + sigaction(SIGALRM, &impl_->oldaction, 0); +} + +//////////////////////////////////////// +// private members + +prefix_ senf::ClockService::ClockService() + : base_ (boost::posix_time::microsec_clock::universal_time()), + heartbeat_ (base_), impl_(new ClockService::Impl()) +{ + struct sigaction action; + action.sa_handler = & senf::ClockService::Impl::timer; + CheckErrno( sigemptyset, (&action.sa_mask) ); + action.sa_flags = SA_RESTART; + CheckErrno( sigaction, (SIGALRM, &action, &impl_->oldaction) ); + + struct itimerval itimer; + itimer.it_interval.tv_sec = CheckInterval; + itimer.it_interval.tv_usec = 0; + itimer.it_value.tv_sec = CheckInterval; + itimer.it_value.tv_usec = 0; + CheckErrno( setitimer, (ITIMER_REAL, &itimer, &impl_->olditimer) ); + impl_->unblock(); +} + +prefix_ void senf::ClockService::restart_i() +{ + impl_->block(); // if any syscall fails, the alarm signal stays blocked which is correct + base_ = boost::posix_time::microsec_clock::universal_time(); + heartbeat_ = base_; + + struct sigaction action; + action.sa_handler = & senf::ClockService::Impl::timer; + CheckErrno( sigemptyset, (&action.sa_mask) ); + action.sa_flags = SA_RESTART; + CheckErrno( sigaction, (SIGALRM, &action, 0) ); + + struct itimerval itimer; + itimer.it_interval.tv_sec = CheckInterval; + itimer.it_interval.tv_usec = 0; + itimer.it_value.tv_sec = CheckInterval; + itimer.it_value.tv_usec = 0; + CheckErrno( setitimer, (ITIMER_REAL, &itimer, 0) ); + impl_->unblock(); +} + +prefix_ void senf::ClockService::updateSkew(boost::posix_time::ptime time) +{ + Impl::Blocker alrmBlocker (impl_.get()); + struct itimerval itimer; + CheckErrno( getitimer, (ITIMER_REAL, &itimer) ); + clockSkew(time, (heartbeat_ + + boost::posix_time::seconds(CheckInterval) + - boost::posix_time::seconds(itimer.it_value.tv_sec) + - boost::posix_time::microseconds(itimer.it_value.tv_usec))); +} + +///////////////////////////////cc.e//////////////////////////////////////// +#undef prefix_ +//#include "ClockService.mpp" + + +// Local Variables: +// mode: c++ +// fill-column: 100 +// comment-column: 40 +// c-file-style: "senf" +// indent-tabs-mode: nil +// ispell-local-dictionary: "american" +// compile-command: "scons -u test" +// End: diff --git a/Scheduler/ClockService.cci b/Scheduler/ClockService.cci index 9628ec6..48f989d 100644 --- a/Scheduler/ClockService.cci +++ b/Scheduler/ClockService.cci @@ -32,14 +32,13 @@ /////////////////////////////////////////////////////////////////////////// // senf::ClockService -prefix_ senf::ClockService::~ClockService() -{ - -} - prefix_ senf::ClockService::clock_type senf::ClockService::now() { - return clock(boost::posix_time::microsec_clock::universal_time()); + // We must make sure to call instance() before fetching the current time since the first call to + // instance() will construct the singleton and initialize heartbeat_. If this happens *after* + // fetching the current time checkSkew() will detect clock skew since heartbeat_ will be < + // current time. + return instance().now_i(); } prefix_ senf::ClockService::abstime_type senf::ClockService::abstime(clock_type clock) @@ -58,19 +57,74 @@ prefix_ senf::ClockService::clock_type senf::ClockService::clock(abstime_type ti * clock_type( 1000000000UL / boost::posix_time::time_duration::ticks_per_second() ); } +prefix_ senf::ClockService::clock_type senf::ClockService::nanoseconds(clock_type v) +{ + return v; +} + +prefix_ senf::ClockService::clock_type senf::ClockService::microseconds(clock_type v) +{ + return nanoseconds(1000*v); +} + +prefix_ senf::ClockService::clock_type senf::ClockService::milliseconds(clock_type v) +{ + return microseconds(1000*v); +} + +prefix_ senf::ClockService::clock_type senf::ClockService::seconds(clock_type v) +{ + return milliseconds(1000*v); +} + +prefix_ senf::ClockService::clock_type senf::ClockService::minutes(clock_type v) +{ + return seconds(60*v); +} + +prefix_ senf::ClockService::clock_type senf::ClockService::hours(clock_type v) +{ + return minutes(60*v); +} + +prefix_ senf::ClockService::clock_type senf::ClockService::days(clock_type v) +{ + return hours(24*v); +} + +prefix_ void senf::ClockService::restart() +{ + instance().restart_i(); +} + //////////////////////////////////////// // private members -prefix_ senf::ClockService::ClockService() - : base_ (boost::posix_time::microsec_clock::universal_time()) -{} - prefix_ senf::ClockService & senf::ClockService::instance() { static ClockService instance; return instance; } +prefix_ senf::ClockService::clock_type senf::ClockService::now_i() +{ + boost::posix_time::ptime time (boost::posix_time::microsec_clock::universal_time()); + if (checkSkew(time)) + updateSkew(time); + return clock(time); +} + +prefix_ bool senf::ClockService::checkSkew(boost::posix_time::ptime time) +{ + return time < heartbeat_ || (time - heartbeat_) > boost::posix_time::seconds(2*CheckInterval); +} + +prefix_ void senf::ClockService::clockSkew(boost::posix_time::ptime time, + boost::posix_time::ptime expected) +{ + base_ += (time - expected); +} + ///////////////////////////////cci.e/////////////////////////////////////// #undef prefix_ diff --git a/Scheduler/ClockService.hh b/Scheduler/ClockService.hh index 6ad51b5..3d5f40f 100644 --- a/Scheduler/ClockService.hh +++ b/Scheduler/ClockService.hh @@ -28,13 +28,18 @@ // Custom includes #include -#include +#include +#include //#include "ClockService.mpp" ///////////////////////////////hh.p//////////////////////////////////////// namespace senf { +#ifndef DOXYGEN + namespace detail { class ClockServiceTest; } +#endif + // Implementation note: // // The clock value is represented as a 64bit unsigned integer number of nanosecods elapsed since @@ -72,8 +77,6 @@ namespace senf { The ClockService provides a highly accurate monotonous clock source based on gettimeofday(). However, it takes additional precautions to detect clock skew. - - \fixme Implement the clock-skew detection */ class ClockService : boost::noncopyable @@ -121,14 +124,40 @@ namespace senf { corresponding clock value. \see abstime */ + static clock_type nanoseconds(clock_type v); + static clock_type microseconds(clock_type v); + static clock_type milliseconds(clock_type v); + static clock_type seconds(clock_type v); + static clock_type minutes(clock_type v); + static clock_type hours(clock_type v); + static clock_type days(clock_type v); + + static void restart(); + protected: private: + ClockService(); static ClockService & instance(); + clock_type now_i(); + void restart_i(); + + bool checkSkew(boost::posix_time::ptime time); + void clockSkew(boost::posix_time::ptime time, boost::posix_time::ptime expected); + void updateSkew(boost::posix_time::ptime time); boost::posix_time::ptime base_; + boost::posix_time::ptime heartbeat_; + + struct Impl; + boost::scoped_ptr impl_; + + friend class Impl; +#ifndef DOXYGEN + friend class senf::detail::ClockServiceTest; +#endif }; diff --git a/Scheduler/ClockService.test.cc b/Scheduler/ClockService.test.cc new file mode 100644 index 0000000..3061229 --- /dev/null +++ b/Scheduler/ClockService.test.cc @@ -0,0 +1,130 @@ +// $Id$ +// +// Copyright (C) 2007 +// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) +// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Stefan Bund +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the +// Free Software Foundation, Inc., +// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +/** \file + \brief ClockService.test unit tests */ + +//#include "ClockService.test.hh" +//#include "ClockService.test.ih" + +// Custom includes +#include "ClockService.hh" +#include + +#include +#include + +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// + +namespace senf { +namespace detail { + + struct ClockServiceTest + { + static boost::posix_time::ptime & base() + { return senf::ClockService::instance().base_; } + static boost::posix_time::ptime & heartbeat() + { return senf::ClockService::instance().heartbeat_; } + }; + +}} + +namespace { + + bool is_close_clock(senf::ClockService::clock_type a, senf::ClockService::clock_type b, + unsigned long delta = 10100000ul) + { + return (asecond.timeout <= timeNow ) { + while ( ! timerQueue_.empty() && timerQueue_.top()->second.timeout <= eventTime_ ) { TimerMap::iterator i (timerQueue_.top()); if (! i->second.canceled) i->second.cb(); @@ -170,7 +170,7 @@ prefix_ void senf::Scheduler::process() int timeout (MinTimeout); if (! timerQueue_.empty()) { ClockService::clock_type delta ( - (timerQueue_.top()->second.timeout - timeNow)/1000000UL); + (timerQueue_.top()->second.timeout - eventTime_)/1000000UL); if (delta