From: g0dil Date: Fri, 9 Nov 2007 22:23:26 +0000 (+0000) Subject: Scheduler: Hack suppoer for ordinary files into the scheduler (epoll does *not* suppo... X-Git-Url: http://g0dil.de/git?a=commitdiff_plain;h=9fabfe82429b675009a8109b03ccbd5e13f6ee0a;p=senf.git Scheduler: Hack suppoer for ordinary files into the scheduler (epoll does *not* support ordinary files ...) Scheduler: Daemon class: Add consoleLog (log file) support git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@505 270642c3-0616-0410-b53a-bc976706d245 --- diff --git a/Scheduler/Daemon.cc b/Scheduler/Daemon.cc index 5c94256..534b333 100644 --- a/Scheduler/Daemon.cc +++ b/Scheduler/Daemon.cc @@ -35,6 +35,7 @@ #include #include #include +#include #include "../Utils/Exception.hh" #include "../Utils/membind.hh" @@ -64,11 +65,11 @@ prefix_ bool senf::Daemon::daemon() return daemonize_; } -prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which) +prefix_ void senf::Daemon::consoleLog(std::string const & path, StdStream which) { int fd (-1); if (! path.empty()) { - int fd (::open(path.c_str(), O_WRONLY | O_APPEND)); + fd = ::open(path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666); if (fd < 0) throwErrno("::open()"); } @@ -86,7 +87,7 @@ prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which) } } -prefix_ void senf::Daemon::pidFile(std::string f) +prefix_ void senf::Daemon::pidFile(std::string const & f) { pidfile_ = f; } @@ -95,9 +96,15 @@ prefix_ void senf::Daemon::detach() { if (daemonize_) { LIBC_CALL_RV( nul, ::open, ("/dev/null", O_WRONLY) ); - LIBC_CALL( ::dup2, (nul, 1) ); - LIBC_CALL( ::dup2, (nul, 2) ); + LIBC_CALL( ::dup2, (stdout_ == -1 ? nul : stdout_, 1) ); + LIBC_CALL( ::dup2, (stderr_ == -1 ? nul : stderr_, 2) ); LIBC_CALL( ::close, (nul) ); + + // We need to wait here to give the daemon watcher time to flush all data to the log file. + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 100 * 1000000ul; + while (::nanosleep(&ts,&ts) < 0 && errno == EINTR) ; } } @@ -107,7 +114,9 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv) argv_ = argv; # ifdef NDEBUG + try { + # endif configure(); @@ -123,6 +132,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv) main(); # ifdef NDEBUG + } catch (std::exception & e) { std::cerr << "\n*** Fatal exception: " << e.what() << std::endl; @@ -132,6 +142,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv) std::cerr << "\n*** Fatal exception: (unknown)" << std::endl; return 1; } + # endif return 0; @@ -205,7 +216,7 @@ prefix_ void senf::Daemon::fork() LIBC_CALL( ::close, (coutpipe[1]) ); LIBC_CALL( ::close, (cerrpipe[1]) ); - detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0]); + detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0], stdout_, stderr_); watcher.run(); ::_exit(0); @@ -294,11 +305,20 @@ prefix_ bool senf::Daemon::pidfileCreate() /////////////////////////////////////////////////////////////////////////// // senf::detail::DaemonWatcher -prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe) - : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), sigChld_(false), - coutForwarder_(coutpipe_, 1, boost::bind(&DaemonWatcher::pipeClosed, this, 1)), - cerrForwarder_(cerrpipe_, 2, boost::bind(&DaemonWatcher::pipeClosed, this, 2)) -{} +prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe, + int stdout, int stderr) + : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), stdout_(stdout), + stderr_(stderr), sigChld_(false), + coutForwarder_(coutpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 1)), + cerrForwarder_(cerrpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 2)) +{ + coutForwarder_.addTarget(1); + if (stdout_ >= 0) + coutForwarder_.addTarget(stdout_); + cerrForwarder_.addTarget(2); + if (stderr_ >= 0) + cerrForwarder_.addTarget(stderr_); +} prefix_ void senf::detail::DaemonWatcher::run() { @@ -355,8 +375,8 @@ prefix_ void senf::detail::DaemonWatcher::childOk() /////////////////////////////////////////////////////////////////////////// // senf::detail::DaemonWatcher::Forwarder -prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, int dst, Callback cb) - : src_(src), dst_(dst), cb_(cb) +prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, Callback cb) + : src_(src), cb_(cb) { Scheduler::instance().add(src_, senf::membind(&Forwarder::readData, this), Scheduler::EV_READ); @@ -366,14 +386,23 @@ prefix_ senf::detail::DaemonWatcher::Forwarder::~Forwarder() { if (src_ != -1) Scheduler::instance().remove(src_); - if (dst_ != -1 && ! buffer_.empty()) - Scheduler::instance().remove(dst_); + + for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i) + if (i->offset >= buffer_.size()) + Scheduler::instance().remove(i->fd); +} + +prefix_ void senf::detail::DaemonWatcher::Forwarder::addTarget(int fd) +{ + Target target = { fd, 0 }; + targets_.push_back(target); } prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId event) { char buf[1024]; int n (0); + while (1) { n = ::read(src_,buf,1024); if (n<0) { @@ -381,6 +410,7 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId } else break; } + if (n == 0) { // Hangup Scheduler::instance().remove(src_); @@ -389,38 +419,55 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId src_ = -1; return; } - if (dst_ == -1) - // There was an error writing data -> drop it + + if (targets_.empty()) return; - if (buffer_.empty()) - Scheduler::instance().add(dst_, senf::membind(&Forwarder::writeData, this), - Scheduler::EV_WRITE); + + for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i) + if (i->offset >= buffer_.size()) + Scheduler::instance().add( i->fd, + boost::bind(&Forwarder::writeData, this, _1, i), + Scheduler::EV_WRITE ); + buffer_.insert(buffer_.end(), buf, buf+n); } -prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event) +prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event, + Targets::iterator target) { if (event != Scheduler::EV_WRITE) { // Broken pipe while writing data ? Not much, we can do here, we just drop the data - Scheduler::instance().remove(dst_); - dst_ = -1; - if (src_ == -1) cb_(); + Scheduler::instance().remove(target->fd); + targets_.erase(target); + if (targets_.empty() && src_ == -1) + cb_(); return; } + char buf[1024]; - int n (buffer_.size() > 1024 ? 1024 : buffer_.size()); - std::copy(buffer_.begin(), buffer_.begin() + n, buf); - int w (::write(dst_, buf, n)); + int n (buffer_.size() - target->offset > 1024 ? 1024 : buffer_.size() - target->offset); + std::copy(buffer_.begin() + target->offset, buffer_.begin() + target->offset + n, buf); + + int w (::write(target->fd, buf, n)); if (w < 0) { if (errno != EINTR) throwErrno("::write()"); return; } - buffer_.erase(buffer_.begin(), buffer_.begin()+w); - if (buffer_.empty()) { - Scheduler::instance().remove(dst_); - if (src_ == -1) - cb_(); - } + target->offset += w; + + n = std::min_element( + targets_.begin(), targets_.end(), + boost::bind(&Target::offset, _1) < boost::bind(&Target::offset, _2))->offset; + + buffer_.erase(buffer_.begin(), buffer_.begin()+n); + + for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i) + i->offset -= n; + + if (target->offset >= buffer_.size()) + Scheduler::instance().remove(target->fd); + if (src_ == -1 && (buffer_.empty() || targets_.empty())) + cb_(); } #undef LIBC_CALL diff --git a/Scheduler/Daemon.hh b/Scheduler/Daemon.hh index 38aed48..ab70527 100644 --- a/Scheduler/Daemon.hh +++ b/Scheduler/Daemon.hh @@ -97,7 +97,8 @@ namespace senf { void daemonize(bool); ///< Configure whether to run in fore- or background bool daemon(); ///< \c true, if running as daemon - void consoleLog(std::string, StdStream which = Both); ///< Configure console log file + void consoleLog(std::string const &, StdStream which = Both); + ///< Configure console log file /**< May be called multiple times to set the log file for stdout and stderr seperately. Any standard stream not assigned to a log file will be redirected to @@ -106,7 +107,7 @@ namespace senf { When running in the foreground, the log files will be ignored. */ - void pidFile(std::string); ///< Configure pid file + void pidFile(std::string const &); ///< Configure pid file ///\} ///\name Auxiliary helpers diff --git a/Scheduler/Daemon.ih b/Scheduler/Daemon.ih index 6f49232..5988bca 100644 --- a/Scheduler/Daemon.ih +++ b/Scheduler/Daemon.ih @@ -28,6 +28,7 @@ // Custom includes #include +#include #include #include #include "../Scheduler/Scheduler.hh" @@ -43,7 +44,7 @@ namespace detail { { public: - DaemonWatcher(int pid, int coutpipe, int cerrpipe); + DaemonWatcher(int pid, int coutpipe, int cerrpipe, int stdout, int stderr); void run(); @@ -54,18 +55,29 @@ namespace detail { public: typedef boost::function Callback; - Forwarder(int src, int dst, Callback cb); + Forwarder(int src, Callback cb); ~Forwarder(); + void addTarget(int fd); + private: + typedef std::deque Buffer; + struct Target + { + int fd; + Buffer::size_type offset; + }; + typedef std::list Targets; + void readData(Scheduler::EventId event); - void writeData(Scheduler::EventId event); + void writeData(Scheduler::EventId event, Targets::iterator target); - typedef std::deque Buffer; Buffer buffer_; int src_; - int dst_; + + Targets targets_; + Callback cb_; }; @@ -77,6 +89,8 @@ namespace detail { int childPid_; int coutpipe_; int cerrpipe_; + int stdout_; + int stderr_; bool sigChld_; Forwarder coutForwarder_; diff --git a/Scheduler/Daemon.test.cc b/Scheduler/Daemon.test.cc index 3e44b3e..ff44b22 100644 --- a/Scheduler/Daemon.test.cc +++ b/Scheduler/Daemon.test.cc @@ -31,6 +31,7 @@ #include #include #include +#include #include #include "Daemon.hh" #include "../Utils/Exception.hh" @@ -56,6 +57,7 @@ namespace { void configure() { std::cout << "Running configure()" << std::endl; pidFile("testDaemon.pid"); + consoleLog("testDaemon.log"); } void init() { @@ -83,7 +85,7 @@ namespace { } int status; if (::waitpid(pid, &status, 0) < 0) senf::throwErrno("::waitpid()"); - return WIFEXITED(status) ? WEXITSTATUS(status) : -1; + return WIFEXITED(status) ? WEXITSTATUS(status) : -1; } } @@ -96,6 +98,13 @@ BOOST_AUTO_UNIT_TEST(testDaemon) BOOST_CHECK( boost::filesystem::exists("testDaemon.pid") ); delay(1000); BOOST_CHECK( ! boost::filesystem::exists("testDaemon.pid") ); + BOOST_REQUIRE( boost::filesystem::exists("testDaemon.log") ); + + std::ifstream log ("testDaemon.log"); + std::stringstream data; + data << log.rdbuf(); + BOOST_CHECK_EQUAL( data.str(), "Running init()\nRunning run()\n" ); + BOOST_CHECK_NO_THROW( boost::filesystem::remove("testDaemon.log") ); } ///////////////////////////////cc.e//////////////////////////////////////// diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index 9f222bf..c720c55 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -54,7 +54,7 @@ static const int EPollInitialSize = 16; ///////////////////////////////cc.p//////////////////////////////////////// prefix_ senf::Scheduler::Scheduler() - : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), + : files_(0), timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false), eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0) { if (epollFd_<0) @@ -120,8 +120,15 @@ prefix_ void senf::Scheduler::do_add(int fd, FdCallback const & cb, int eventMas ev.events = i->second.epollMask(); ev.data.fd = fd; - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); + if (! i->second.file && epoll_ctl(epollFd_, action, fd, &ev) < 0) { + if (errno == EPERM) { + // Argh ... epoll does not support ordinary files :-( :-( + i->second.file = true; + ++ files_; + } + else + throwErrno("::epoll_ctl()"); + } } prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) @@ -140,13 +147,16 @@ prefix_ void senf::Scheduler::do_remove(int fd, int eventMask) ev.data.fd = fd; int action (EPOLL_CTL_MOD); + bool file (i->second.file); if (ev.events==0) { action = EPOLL_CTL_DEL; fdTable_.erase(i); } - if (epoll_ctl(epollFd_, action, fd, &ev)<0) - throw SystemException(errno); + if (! file && epoll_ctl(epollFd_, action, fd, &ev) < 0) + throwErrno("::epoll_ctl()"); + if (file) + -- files_; } prefix_ void senf::Scheduler::registerSigHandlers() @@ -205,14 +215,18 @@ prefix_ void senf::Scheduler::process() } int timeout (-1); - if (timerQueue_.empty()) { - if (fdTable_.empty()) - break; - } + if (files_ > 0) + timeout = 0; else { - ClockService::clock_type delta ( - (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL); - timeout = delta < 0 ? 0 : delta; + if (timerQueue_.empty()) { + if (fdTable_.empty()) + break; + } + else { + ClockService::clock_type delta ( + (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL); + timeout = delta < 0 ? 0 : delta; + } } ///\todo Handle more than one epoll_event per call @@ -261,37 +275,41 @@ prefix_ void senf::Scheduler::process() continue; } - FdTable::iterator i = fdTable_.find(ev.data.fd); - BOOST_ASSERT (i != fdTable_.end() ); - EventSpec spec (i->second); + for (FdTable::iterator i = fdTable_.begin(); i != fdTable_.end(); ++i) { + EventSpec spec (i->second); + unsigned extraFlags (0); + unsigned events (spec.file ? spec.epollMask() : ev.events); - unsigned extraFlags (0); - if (ev.events & EPOLLHUP) extraFlags |= EV_HUP; - if (ev.events & EPOLLERR) extraFlags |= EV_ERR; + if (! (spec.file || i->first == ev.data.fd)) + continue; + + if (events & EPOLLHUP) extraFlags |= EV_HUP; + if (events & EPOLLERR) extraFlags |= EV_ERR; - if (ev.events & EPOLLIN) { - BOOST_ASSERT(spec.cb_read); - spec.cb_read(EventId(EV_READ | extraFlags)); - } - else if (ev.events & EPOLLPRI) { - BOOST_ASSERT(spec.cb_prio); - spec.cb_prio(EventId(EV_PRIO | extraFlags)); - } - else if (ev.events & EPOLLOUT) { - BOOST_ASSERT(spec.cb_write); - spec.cb_write(EventId(EV_WRITE | extraFlags)); - } - else { - // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. - // In this case we will signal all registered callbacks. The callbacks must be - // prepared to be called multiple times if they are registered to more than - // one event. - if (spec.cb_write) - spec.cb_write(EventId(extraFlags)); - if (spec.cb_prio) - spec.cb_prio(EventId(extraFlags)); - if (spec.cb_read) - spec.cb_read(EventId(extraFlags)); + if (events & EPOLLIN) { + BOOST_ASSERT(spec.cb_read); + spec.cb_read(EventId(EV_READ | extraFlags)); + } + else if (events & EPOLLPRI) { + BOOST_ASSERT(spec.cb_prio); + spec.cb_prio(EventId(EV_PRIO | extraFlags)); + } + else if (events & EPOLLOUT) { + BOOST_ASSERT(spec.cb_write); + spec.cb_write(EventId(EV_WRITE | extraFlags)); + } + else { + // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. + // In this case we will signal all registered callbacks. The callbacks must be + // prepared to be called multiple times if they are registered to more than + // one event. + if (spec.cb_write) + spec.cb_write(EventId(extraFlags)); + if (spec.cb_prio) + spec.cb_prio(EventId(extraFlags)); + if (spec.cb_read) + spec.cb_read(EventId(extraFlags)); + } } } } diff --git a/Scheduler/Scheduler.hh b/Scheduler/Scheduler.hh index 1ad6896..4691e4a 100644 --- a/Scheduler/Scheduler.hh +++ b/Scheduler/Scheduler.hh @@ -153,6 +153,9 @@ namespace senf { wait for signals \e only. \todo Fix EventId parameter (probably to int) to allow |-ing without casting ... + + \todo Fix the file support to use threads (?) fork (?) and a pipe so it works reliably even + over e.g. NFS. */ class Scheduler : boost::noncopyable @@ -320,7 +323,11 @@ namespace senf { FdCallback cb_prio; FdCallback cb_write; + EventSpec() : file(false) {} + int epollMask() const; + + bool file; }; /** \brief Timer event specification @@ -364,6 +371,7 @@ namespace senf { typedef std::vector SigHandlers; FdTable fdTable_; + unsigned files_; unsigned timerIdCounter_; TimerQueue timerQueue_;