Scheduler: Daemon class: Better IPC in daemonize()
[senf.git] / Scheduler / Daemon.cc
index 5c94256..3782974 100644 (file)
@@ -35,6 +35,9 @@
 #include <errno.h>
 #include <signal.h>
 #include <sstream>
+#include <algorithm>
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/algorithm/string/trim.hpp>
 #include "../Utils/Exception.hh"
 #include "../Utils/membind.hh"
 
@@ -64,40 +67,83 @@ prefix_ bool senf::Daemon::daemon()
     return daemonize_;
 }
 
-prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which)
+prefix_ void senf::Daemon::consoleLog(std::string const & path, StdStream which)
+{
+    switch (which) {
+    case StdOut : stdoutLog_ = path; break;
+    case StdErr : stderrLog_ = path; break;
+    case Both : stdoutLog_ = path; stderrLog_ = path; break;
+    }
+}
+
+
+prefix_ void senf::Daemon::openLog()
 {
     int fd (-1);
-    if (! path.empty()) {
-        int fd (::open(path.c_str(), O_WRONLY | O_APPEND));
+    if (! stdoutLog_.empty()) {
+        fd = ::open(stdoutLog_.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);
         if (fd < 0)
             throwErrno("::open()");
-    }
-    switch (which) {
-    case StdOut:
         stdout_ = fd;
-        break;
-    case StdErr:
+    }
+    if (stderrLog_ == stdoutLog_)
         stderr_ = fd;
-        break;
-    case Both:
-        stdout_ = fd;
+    else if (! stderrLog_.empty()) {
+        fd = ::open(stdoutLog_.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);
+        if (fd < 0)
+            throwErrno("::open()");
         stderr_ = fd;
-        break;
     }
 }
 
-prefix_ void senf::Daemon::pidFile(std::string f)
+prefix_ void senf::Daemon::pidFile(std::string const & f)
 {
     pidfile_ = f;
 }
 
+namespace {
+    bool signaled (false);
+    void waitusr(int) {
+        signaled = true;
+    }
+}
+
 prefix_ void senf::Daemon::detach()
 {
     if (daemonize_) {
+        // Wow .. ouch .. 
+        // To ensure all data is written to the console log file in the correct order, we suspend
+        // execution here until the parent process tells us to continue via SIGUSR1: We block
+        // SIGUSR1 and install our own signal handler saving the old handler and signal mask. Then
+        // we close stdin/stderr which will send a HUP condition to the parent process. We wait for
+        // SIGUSR1 and reinstall the old signal mask and action.
+        ::sigset_t oldsig;
+        ::sigset_t usrsig;
+        ::sigemptyset(&usrsig);
+        LIBC_CALL( ::sigaddset, (&usrsig, SIGUSR1) );
+        LIBC_CALL( ::sigprocmask, (SIG_BLOCK, &usrsig, &oldsig) );
+        struct ::sigaction oldact;
+        struct ::sigaction usract;
+        ::memset(&usract, 0, sizeof(usract));
+        usract.sa_handler = &waitusr;
+        LIBC_CALL( ::sigaction, (SIGUSR1, &usract, &oldact) );
+        ::sigset_t waitsig (oldsig);
+        LIBC_CALL( ::sigdelset, (&waitsig, SIGUSR1) );
+
         LIBC_CALL_RV( nul, ::open, ("/dev/null", O_WRONLY) );
-        LIBC_CALL( ::dup2, (nul, 1) );
-        LIBC_CALL( ::dup2, (nul, 2) );
+        LIBC_CALL( ::dup2, (stdout_ == -1 ? nul : stdout_, 1) );
+        LIBC_CALL( ::dup2, (stderr_ == -1 ? nul : stderr_, 2) );
         LIBC_CALL( ::close, (nul) );
+
+        signaled = false;
+        while (! signaled) {
+            ::sigsuspend(&waitsig);
+            if (errno != EINTR)
+                throwErrno("::sigsuspend()");
+        }
+
+        LIBC_CALL( ::sigaction, (SIGUSR1, &oldact, 0) );
+        LIBC_CALL( ::sigprocmask, (SIG_SETMASK, &oldsig, 0) );
     }
 }
 
@@ -107,13 +153,17 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
     argv_ = argv;
 
 #   ifdef NDEBUG
+
     try {
+
 #   endif
 
         configure();
 
-        if (daemonize_)
+        if (daemonize_) {
+            openLog();
             fork();
+        }
         if (! pidfile_.empty() && ! pidfileCreate()) {
             std::cerr << "\n*** PID file '" << pidfile_ << "' creation failed. Daemon running ?" 
                       << std::endl;
@@ -123,6 +173,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
         main();
 
 #   ifdef NDEBUG
+
     }
     catch (std::exception & e) {
         std::cerr << "\n*** Fatal exception: " << e.what() << std::endl;
@@ -132,6 +183,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
         std::cerr << "\n*** Fatal exception: (unknown)" << std::endl;
         return 1;
     }
+
 #   endif
 
     return 0;
@@ -149,7 +201,31 @@ prefix_ senf::Daemon::Daemon()
 // private members
 
 prefix_ void senf::Daemon::configure()
-{}
+{
+    for (int i (1); i<argc_; ++i) {
+        if (argv_[i] == std::string("--no-daemon"))
+            daemonize(false);
+        else if (boost::starts_with(argv_[i], std::string("--console-log="))) {
+            std::string arg (std::string(argv_[i]), 14u);
+            std::string::size_type komma (arg.find(','));
+            if (komma == std::string::npos) {
+                boost::trim(arg);
+                consoleLog(arg);
+            } else {
+                std::string arg1 (arg,0,komma);
+                std::string arg2 (arg,komma+1);
+                boost::trim(arg1);
+                boost::trim(arg2);
+                if (arg1 == std::string("none")) consoleLog("",StdOut);
+                else if (! arg1.empty() )        consoleLog(arg1, StdOut);
+                if (arg2 == std::string("none")) consoleLog("",StdErr);
+                else if (! arg2.empty() )        consoleLog(arg2, StdErr);
+            }
+        }
+        else if (boost::starts_with(argv_[i], std::string("--pid-file="))) 
+            pidFile(std::string(std::string(argv_[i]), 11u));
+    }
+}
 
 prefix_ void senf::Daemon::main()
 {
@@ -205,7 +281,7 @@ prefix_ void senf::Daemon::fork()
     LIBC_CALL( ::close, (coutpipe[1]) );
     LIBC_CALL( ::close, (cerrpipe[1]) );
 
-    detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0]);
+    detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0], stdout_, stderr_);
     watcher.run();
 
     ::_exit(0);
@@ -294,11 +370,20 @@ prefix_ bool senf::Daemon::pidfileCreate()
 ///////////////////////////////////////////////////////////////////////////
 // senf::detail::DaemonWatcher
 
-prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe)
-    : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), sigChld_(false),
-      coutForwarder_(coutpipe_, 1, boost::bind(&DaemonWatcher::pipeClosed, this, 1)),
-      cerrForwarder_(cerrpipe_, 2, boost::bind(&DaemonWatcher::pipeClosed, this, 2))
-{}
+prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe,
+                                                   int stdout, int stderr)
+    : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), stdout_(stdout),
+      stderr_(stderr), sigChld_(false),
+      coutForwarder_(coutpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 1)), 
+      cerrForwarder_(cerrpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 2)) 
+{
+    coutForwarder_.addTarget(1);
+    if (stdout_ >= 0)
+        coutForwarder_.addTarget(stdout_);
+    cerrForwarder_.addTarget(2);
+    if (stderr_ >= 0)
+        cerrForwarder_.addTarget(stderr_);
+}
 
 prefix_ void senf::detail::DaemonWatcher::run()
 {
@@ -319,6 +404,8 @@ prefix_ void senf::detail::DaemonWatcher::pipeClosed(int id)
     if (coutpipe_ == -1 && cerrpipe_ == -1) {
         if (sigChld_)
             childDied(); // does not return
+        if (::kill(childPid_, SIGUSR1) < 0)
+            if (errno != ESRCH) throwErrno("::kill()");
         Scheduler::instance().timeout(
             Scheduler::instance().eventTime() + ClockService::seconds(1),
             senf::membind(&DaemonWatcher::childOk, this));
@@ -355,8 +442,8 @@ prefix_ void senf::detail::DaemonWatcher::childOk()
 ///////////////////////////////////////////////////////////////////////////
 // senf::detail::DaemonWatcher::Forwarder
 
-prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, int dst, Callback cb)
-    : src_(src), dst_(dst), cb_(cb)
+prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, Callback cb)
+    : src_(src), cb_(cb)
 {
     Scheduler::instance().add(src_, senf::membind(&Forwarder::readData, this),
                               Scheduler::EV_READ);
@@ -366,14 +453,23 @@ prefix_ senf::detail::DaemonWatcher::Forwarder::~Forwarder()
 {
     if (src_ != -1)
         Scheduler::instance().remove(src_);
-    if (dst_ != -1 && ! buffer_.empty())
-        Scheduler::instance().remove(dst_);
+    
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        if (i->offset >= buffer_.size())
+            Scheduler::instance().remove(i->fd);
+}
+
+prefix_ void senf::detail::DaemonWatcher::Forwarder::addTarget(int fd)
+{
+    Target target = { fd, 0 };
+    targets_.push_back(target);
 }
 
 prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId event)
 {
     char buf[1024];
     int n (0);
+
     while (1) {
         n = ::read(src_,buf,1024);
         if (n<0) {
@@ -381,6 +477,7 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId
         } else 
             break;
     }
+
     if (n == 0) {
         // Hangup
         Scheduler::instance().remove(src_);
@@ -389,38 +486,55 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId
         src_ = -1;
         return;
     }
-    if (dst_ == -1)
-        // There was an error writing data -> drop it
+
+    if (targets_.empty())
         return;
-    if (buffer_.empty())
-        Scheduler::instance().add(dst_, senf::membind(&Forwarder::writeData, this),
-                                  Scheduler::EV_WRITE);
+
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        if (i->offset >= buffer_.size())
+            Scheduler::instance().add( i->fd, 
+                                       boost::bind(&Forwarder::writeData, this, _1, i),
+                                       Scheduler::EV_WRITE );
+
     buffer_.insert(buffer_.end(), buf, buf+n);
 }
 
-prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event)
+prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event,
+                                                               Targets::iterator target)
 {    
     if (event != Scheduler::EV_WRITE) {
         // Broken pipe while writing data ? Not much, we can do here, we just drop the data
-        Scheduler::instance().remove(dst_);
-        dst_ = -1;
-        if (src_ == -1) cb_();
+        Scheduler::instance().remove(target->fd);
+        targets_.erase(target);
+        if (targets_.empty() && src_ == -1)
+            cb_();
         return;
     }
+
     char buf[1024];
-    int n (buffer_.size() > 1024 ? 1024 : buffer_.size());
-    std::copy(buffer_.begin(), buffer_.begin() + n, buf);
-    int w (::write(dst_, buf, n));
+    int n (buffer_.size() - target->offset > 1024 ? 1024 : buffer_.size() - target->offset);
+    std::copy(buffer_.begin() + target->offset, buffer_.begin() + target->offset + n, buf);
+
+    int w (::write(target->fd, buf, n));
     if (w < 0) {
         if (errno != EINTR) throwErrno("::write()");
         return;
     }
-    buffer_.erase(buffer_.begin(), buffer_.begin()+w);
-    if (buffer_.empty()) {
-        Scheduler::instance().remove(dst_);
-        if (src_ == -1)
-            cb_();
-    }
+    target->offset += w;
+
+    n = std::min_element(
+        targets_.begin(), targets_.end(),
+        boost::bind(&Target::offset, _1) < boost::bind(&Target::offset, _2))->offset;
+
+    buffer_.erase(buffer_.begin(), buffer_.begin()+n);
+
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        i->offset -= n;
+
+    if (target->offset >= buffer_.size())
+        Scheduler::instance().remove(target->fd);
+    if (src_ == -1 && (buffer_.empty() || targets_.empty()))
+        cb_();
 }
 
 #undef LIBC_CALL