Scheduler: Hack suppoer for ordinary files into the scheduler (epoll does *not* suppo...
g0dil [Fri, 9 Nov 2007 22:23:26 +0000 (22:23 +0000)]
Scheduler: Daemon class: Add consoleLog (log file) support

git-svn-id: https://svn.berlios.de/svnroot/repos/senf/trunk@505 270642c3-0616-0410-b53a-bc976706d245

Scheduler/Daemon.cc
Scheduler/Daemon.hh
Scheduler/Daemon.ih
Scheduler/Daemon.test.cc
Scheduler/Scheduler.cc
Scheduler/Scheduler.hh

index 5c94256..534b333 100644 (file)
@@ -35,6 +35,7 @@
 #include <errno.h>
 #include <signal.h>
 #include <sstream>
+#include <algorithm>
 #include "../Utils/Exception.hh"
 #include "../Utils/membind.hh"
 
@@ -64,11 +65,11 @@ prefix_ bool senf::Daemon::daemon()
     return daemonize_;
 }
 
-prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which)
+prefix_ void senf::Daemon::consoleLog(std::string const & path, StdStream which)
 {
     int fd (-1);
     if (! path.empty()) {
-        int fd (::open(path.c_str(), O_WRONLY | O_APPEND));
+        fd = ::open(path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);
         if (fd < 0)
             throwErrno("::open()");
     }
@@ -86,7 +87,7 @@ prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which)
     }
 }
 
-prefix_ void senf::Daemon::pidFile(std::string f)
+prefix_ void senf::Daemon::pidFile(std::string const & f)
 {
     pidfile_ = f;
 }
@@ -95,9 +96,15 @@ prefix_ void senf::Daemon::detach()
 {
     if (daemonize_) {
         LIBC_CALL_RV( nul, ::open, ("/dev/null", O_WRONLY) );
-        LIBC_CALL( ::dup2, (nul, 1) );
-        LIBC_CALL( ::dup2, (nul, 2) );
+        LIBC_CALL( ::dup2, (stdout_ == -1 ? nul : stdout_, 1) );
+        LIBC_CALL( ::dup2, (stderr_ == -1 ? nul : stderr_, 2) );
         LIBC_CALL( ::close, (nul) );
+
+        // We need to wait here to give the daemon watcher time to flush all data to the log file.
+        struct timespec ts;
+        ts.tv_sec = 0;
+        ts.tv_nsec = 100 * 1000000ul;
+        while (::nanosleep(&ts,&ts) < 0 && errno == EINTR) ;
     }
 }
 
@@ -107,7 +114,9 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
     argv_ = argv;
 
 #   ifdef NDEBUG
+
     try {
+
 #   endif
 
         configure();
@@ -123,6 +132,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
         main();
 
 #   ifdef NDEBUG
+
     }
     catch (std::exception & e) {
         std::cerr << "\n*** Fatal exception: " << e.what() << std::endl;
@@ -132,6 +142,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
         std::cerr << "\n*** Fatal exception: (unknown)" << std::endl;
         return 1;
     }
+
 #   endif
 
     return 0;
@@ -205,7 +216,7 @@ prefix_ void senf::Daemon::fork()
     LIBC_CALL( ::close, (coutpipe[1]) );
     LIBC_CALL( ::close, (cerrpipe[1]) );
 
-    detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0]);
+    detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0], stdout_, stderr_);
     watcher.run();
 
     ::_exit(0);
@@ -294,11 +305,20 @@ prefix_ bool senf::Daemon::pidfileCreate()
 ///////////////////////////////////////////////////////////////////////////
 // senf::detail::DaemonWatcher
 
-prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe)
-    : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), sigChld_(false),
-      coutForwarder_(coutpipe_, 1, boost::bind(&DaemonWatcher::pipeClosed, this, 1)),
-      cerrForwarder_(cerrpipe_, 2, boost::bind(&DaemonWatcher::pipeClosed, this, 2))
-{}
+prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe,
+                                                   int stdout, int stderr)
+    : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), stdout_(stdout),
+      stderr_(stderr), sigChld_(false),
+      coutForwarder_(coutpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 1)), 
+      cerrForwarder_(cerrpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 2)) 
+{
+    coutForwarder_.addTarget(1);
+    if (stdout_ >= 0)
+        coutForwarder_.addTarget(stdout_);
+    cerrForwarder_.addTarget(2);
+    if (stderr_ >= 0)
+        cerrForwarder_.addTarget(stderr_);
+}
 
 prefix_ void senf::detail::DaemonWatcher::run()
 {
@@ -355,8 +375,8 @@ prefix_ void senf::detail::DaemonWatcher::childOk()
 ///////////////////////////////////////////////////////////////////////////
 // senf::detail::DaemonWatcher::Forwarder
 
-prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, int dst, Callback cb)
-    : src_(src), dst_(dst), cb_(cb)
+prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, Callback cb)
+    : src_(src), cb_(cb)
 {
     Scheduler::instance().add(src_, senf::membind(&Forwarder::readData, this),
                               Scheduler::EV_READ);
@@ -366,14 +386,23 @@ prefix_ senf::detail::DaemonWatcher::Forwarder::~Forwarder()
 {
     if (src_ != -1)
         Scheduler::instance().remove(src_);
-    if (dst_ != -1 && ! buffer_.empty())
-        Scheduler::instance().remove(dst_);
+    
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        if (i->offset >= buffer_.size())
+            Scheduler::instance().remove(i->fd);
+}
+
+prefix_ void senf::detail::DaemonWatcher::Forwarder::addTarget(int fd)
+{
+    Target target = { fd, 0 };
+    targets_.push_back(target);
 }
 
 prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId event)
 {
     char buf[1024];
     int n (0);
+
     while (1) {
         n = ::read(src_,buf,1024);
         if (n<0) {
@@ -381,6 +410,7 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId
         } else 
             break;
     }
+
     if (n == 0) {
         // Hangup
         Scheduler::instance().remove(src_);
@@ -389,38 +419,55 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId
         src_ = -1;
         return;
     }
-    if (dst_ == -1)
-        // There was an error writing data -> drop it
+
+    if (targets_.empty())
         return;
-    if (buffer_.empty())
-        Scheduler::instance().add(dst_, senf::membind(&Forwarder::writeData, this),
-                                  Scheduler::EV_WRITE);
+
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        if (i->offset >= buffer_.size())
+            Scheduler::instance().add( i->fd, 
+                                       boost::bind(&Forwarder::writeData, this, _1, i),
+                                       Scheduler::EV_WRITE );
+
     buffer_.insert(buffer_.end(), buf, buf+n);
 }
 
-prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event)
+prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event,
+                                                               Targets::iterator target)
 {    
     if (event != Scheduler::EV_WRITE) {
         // Broken pipe while writing data ? Not much, we can do here, we just drop the data
-        Scheduler::instance().remove(dst_);
-        dst_ = -1;
-        if (src_ == -1) cb_();
+        Scheduler::instance().remove(target->fd);
+        targets_.erase(target);
+        if (targets_.empty() && src_ == -1)
+            cb_();
         return;
     }
+
     char buf[1024];
-    int n (buffer_.size() > 1024 ? 1024 : buffer_.size());
-    std::copy(buffer_.begin(), buffer_.begin() + n, buf);
-    int w (::write(dst_, buf, n));
+    int n (buffer_.size() - target->offset > 1024 ? 1024 : buffer_.size() - target->offset);
+    std::copy(buffer_.begin() + target->offset, buffer_.begin() + target->offset + n, buf);
+
+    int w (::write(target->fd, buf, n));
     if (w < 0) {
         if (errno != EINTR) throwErrno("::write()");
         return;
     }
-    buffer_.erase(buffer_.begin(), buffer_.begin()+w);
-    if (buffer_.empty()) {
-        Scheduler::instance().remove(dst_);
-        if (src_ == -1)
-            cb_();
-    }
+    target->offset += w;
+
+    n = std::min_element(
+        targets_.begin(), targets_.end(),
+        boost::bind(&Target::offset, _1) < boost::bind(&Target::offset, _2))->offset;
+
+    buffer_.erase(buffer_.begin(), buffer_.begin()+n);
+
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        i->offset -= n;
+
+    if (target->offset >= buffer_.size())
+        Scheduler::instance().remove(target->fd);
+    if (src_ == -1 && (buffer_.empty() || targets_.empty()))
+        cb_();
 }
 
 #undef LIBC_CALL
index 38aed48..ab70527 100644 (file)
@@ -97,7 +97,8 @@ namespace senf {
         void daemonize(bool);           ///< Configure whether to run in fore- or background
         bool daemon();                  ///< \c true, if running as daemon
 
-        void consoleLog(std::string, StdStream which = Both); ///< Configure console log file
+        void consoleLog(std::string const &, StdStream which = Both); 
+                                        ///< Configure console log file
                                         /**< May be called multiple times to set the log file
                                              for stdout and stderr seperately. Any standard stream
                                              not assigned to a log file will be redirected to
@@ -106,7 +107,7 @@ namespace senf {
                                              When running in the foreground, the log files will be
                                              ignored. */
 
-        void pidFile(std::string);      ///< Configure pid file
+        void pidFile(std::string const &); ///< Configure pid file
 
         ///\}
         ///\name Auxiliary helpers
index 6f49232..5988bca 100644 (file)
@@ -28,6 +28,7 @@
 
 // Custom includes
 #include <deque>
+#include <list>
 #include <boost/utility.hpp>
 #include <boost/function.hpp>
 #include "../Scheduler/Scheduler.hh"
@@ -43,7 +44,7 @@ namespace detail {
     {
     public:
 
-        DaemonWatcher(int pid, int coutpipe, int cerrpipe);
+        DaemonWatcher(int pid, int coutpipe, int cerrpipe, int stdout, int stderr);
 
         void run();
 
@@ -54,18 +55,29 @@ namespace detail {
         public:
             typedef boost::function<void ()> Callback;
 
-            Forwarder(int src, int dst, Callback cb);
+            Forwarder(int src, Callback cb);
             ~Forwarder();
 
+            void addTarget(int fd);
+
         private:
 
+            typedef std::deque<char> Buffer;
+            struct Target
+            {
+                int fd;
+                Buffer::size_type offset;
+            };
+            typedef std::list<Target> Targets;
+
             void readData(Scheduler::EventId event);
-            void writeData(Scheduler::EventId event);
+            void writeData(Scheduler::EventId event, Targets::iterator target);
 
-            typedef std::deque<char> Buffer;
             Buffer buffer_;
             int src_;
-            int dst_;
+
+            Targets targets_;
+
             Callback cb_;
         };
         
@@ -77,6 +89,8 @@ namespace detail {
         int childPid_;
         int coutpipe_;
         int cerrpipe_;
+        int stdout_;
+        int stderr_;
         bool sigChld_;
 
         Forwarder coutForwarder_;
index 3e44b3e..ff44b22 100644 (file)
@@ -31,6 +31,7 @@
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <iostream>
+#include <fstream>
 #include <boost/filesystem/operations.hpp>
 #include "Daemon.hh"
 #include "../Utils/Exception.hh"
@@ -56,6 +57,7 @@ namespace {
         void configure() { 
             std::cout << "Running configure()" << std::endl; 
             pidFile("testDaemon.pid");
+            consoleLog("testDaemon.log");
         }
 
         void init() { 
@@ -83,7 +85,7 @@ namespace {
         }
         int status;
         if (::waitpid(pid, &status, 0) < 0) senf::throwErrno("::waitpid()");
-         return WIFEXITED(status) ? WEXITSTATUS(status) : -1;
+        return WIFEXITED(status) ? WEXITSTATUS(status) : -1;
     }
 
 }
@@ -96,6 +98,13 @@ BOOST_AUTO_UNIT_TEST(testDaemon)
     BOOST_CHECK( boost::filesystem::exists("testDaemon.pid") );
     delay(1000);
     BOOST_CHECK( ! boost::filesystem::exists("testDaemon.pid") );
+    BOOST_REQUIRE( boost::filesystem::exists("testDaemon.log") );
+    
+    std::ifstream log ("testDaemon.log");
+    std::stringstream data;
+    data << log.rdbuf();
+    BOOST_CHECK_EQUAL( data.str(), "Running init()\nRunning run()\n" );
+    BOOST_CHECK_NO_THROW( boost::filesystem::remove("testDaemon.log") );
 }
 
 ///////////////////////////////cc.e////////////////////////////////////////
index 9f222bf..c720c55 100644 (file)
@@ -54,7 +54,7 @@ static const int EPollInitialSize = 16;
 ///////////////////////////////cc.p////////////////////////////////////////
 
 prefix_ senf::Scheduler::Scheduler()
-    : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false),
+    : files_(0), timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false),
       eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0)
 {
     if (epollFd_<0)
@@ -120,8 +120,15 @@ prefix_ void senf::Scheduler::do_add(int fd, FdCallback const & cb, int eventMas
     ev.events = i->second.epollMask();
     ev.data.fd = fd;
 
-    if (epoll_ctl(epollFd_, action, fd, &ev)<0)
-        throw SystemException(errno);
+    if (! i->second.file && epoll_ctl(epollFd_, action, fd, &ev) < 0) {
+        if (errno == EPERM) {
+            // Argh ... epoll does not support ordinary files :-( :-(
+            i->second.file = true;
+            ++ files_;
+        }
+        else
+            throwErrno("::epoll_ctl()");
+    }
 }
 
 prefix_ void senf::Scheduler::do_remove(int fd, int eventMask)
@@ -140,13 +147,16 @@ prefix_ void senf::Scheduler::do_remove(int fd, int eventMask)
     ev.data.fd = fd;
 
     int action (EPOLL_CTL_MOD);
+    bool file (i->second.file);
     if (ev.events==0) {
         action = EPOLL_CTL_DEL;
         fdTable_.erase(i);
     }
 
-    if (epoll_ctl(epollFd_, action, fd, &ev)<0)
-        throw SystemException(errno);
+    if (! file && epoll_ctl(epollFd_, action, fd, &ev) < 0)
+        throwErrno("::epoll_ctl()");
+    if (file)
+        -- files_;
 }
 
 prefix_ void senf::Scheduler::registerSigHandlers()
@@ -205,14 +215,18 @@ prefix_ void senf::Scheduler::process()
         }
 
         int timeout (-1);
-        if (timerQueue_.empty()) {
-            if (fdTable_.empty())
-                break;
-        }
+        if (files_ > 0)
+            timeout = 0;
         else {
-            ClockService::clock_type delta (
-                (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL);
-            timeout = delta < 0 ? 0 : delta;
+            if (timerQueue_.empty()) {
+                if (fdTable_.empty())
+                    break;
+            }
+            else {
+                ClockService::clock_type delta (
+                    (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL);
+                timeout = delta < 0 ? 0 : delta;
+            }
         }
 
         ///\todo Handle more than one epoll_event per call
@@ -261,37 +275,41 @@ prefix_ void senf::Scheduler::process()
             continue;
         }
 
-        FdTable::iterator i = fdTable_.find(ev.data.fd);
-        BOOST_ASSERT (i != fdTable_.end() );
-        EventSpec spec (i->second);
+        for (FdTable::iterator i = fdTable_.begin(); i != fdTable_.end(); ++i) {
+            EventSpec spec (i->second);
+            unsigned extraFlags (0);
+            unsigned events (spec.file ? spec.epollMask() : ev.events);
 
-        unsigned extraFlags (0);
-        if (ev.events & EPOLLHUP) extraFlags |= EV_HUP;
-        if (ev.events & EPOLLERR) extraFlags |= EV_ERR;
+            if (! (spec.file || i->first == ev.data.fd))
+                continue;
+                
+            if (events & EPOLLHUP) extraFlags |= EV_HUP;
+            if (events & EPOLLERR) extraFlags |= EV_ERR;
 
-        if (ev.events & EPOLLIN) {
-            BOOST_ASSERT(spec.cb_read);
-            spec.cb_read(EventId(EV_READ | extraFlags));
-        }
-        else if (ev.events & EPOLLPRI) {
-            BOOST_ASSERT(spec.cb_prio);
-            spec.cb_prio(EventId(EV_PRIO | extraFlags));
-        }
-        else if (ev.events & EPOLLOUT) {
-            BOOST_ASSERT(spec.cb_write);
-            spec.cb_write(EventId(EV_WRITE | extraFlags));
-        }
-        else {
-            // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. 
-            // In this case we will signal all registered callbacks. The callbacks must be
-            // prepared to be called multiple times if they are registered to more than
-            // one event.
-            if (spec.cb_write) 
-                spec.cb_write(EventId(extraFlags));
-            if (spec.cb_prio) 
-                spec.cb_prio(EventId(extraFlags));
-            if (spec.cb_read) 
-                spec.cb_read(EventId(extraFlags));
+            if (events & EPOLLIN) {
+                BOOST_ASSERT(spec.cb_read);
+                spec.cb_read(EventId(EV_READ | extraFlags));
+            }
+            else if (events & EPOLLPRI) {
+                BOOST_ASSERT(spec.cb_prio);
+                spec.cb_prio(EventId(EV_PRIO | extraFlags));
+            }
+            else if (events & EPOLLOUT) {
+                BOOST_ASSERT(spec.cb_write);
+                spec.cb_write(EventId(EV_WRITE | extraFlags));
+            }
+            else {
+                // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI. 
+                // In this case we will signal all registered callbacks. The callbacks must be
+                // prepared to be called multiple times if they are registered to more than
+                // one event.
+                if (spec.cb_write) 
+                    spec.cb_write(EventId(extraFlags));
+                if (spec.cb_prio) 
+                    spec.cb_prio(EventId(extraFlags));
+                if (spec.cb_read) 
+                    spec.cb_read(EventId(extraFlags));
+            }
         }
     }
 }
index 1ad6896..4691e4a 100644 (file)
@@ -153,6 +153,9 @@ namespace senf {
         wait for signals \e only.
 
         \todo Fix EventId parameter (probably to int) to allow |-ing without casting ...
+        
+        \todo Fix the file support to use threads (?) fork (?) and a pipe so it works reliably even
+            over e.g. NFS.
       */
     class Scheduler
         : boost::noncopyable
@@ -320,7 +323,11 @@ namespace senf {
             FdCallback cb_prio;
             FdCallback cb_write;
 
+            EventSpec() : file(false) {}
+
             int epollMask() const;
+
+            bool file;
         };
 
         /** \brief Timer event specification
@@ -364,6 +371,7 @@ namespace senf {
         typedef std::vector<SimpleCallback> SigHandlers;
 
         FdTable fdTable_;
+        unsigned files_;
 
         unsigned timerIdCounter_;
         TimerQueue timerQueue_;