Scheduler: Hack suppoer for ordinary files into the scheduler (epoll does *not* suppo...
[senf.git] / Scheduler / Daemon.cc
index 5c94256..534b333 100644 (file)
@@ -35,6 +35,7 @@
 #include <errno.h>
 #include <signal.h>
 #include <sstream>
+#include <algorithm>
 #include "../Utils/Exception.hh"
 #include "../Utils/membind.hh"
 
@@ -64,11 +65,11 @@ prefix_ bool senf::Daemon::daemon()
     return daemonize_;
 }
 
-prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which)
+prefix_ void senf::Daemon::consoleLog(std::string const & path, StdStream which)
 {
     int fd (-1);
     if (! path.empty()) {
-        int fd (::open(path.c_str(), O_WRONLY | O_APPEND));
+        fd = ::open(path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);
         if (fd < 0)
             throwErrno("::open()");
     }
@@ -86,7 +87,7 @@ prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which)
     }
 }
 
-prefix_ void senf::Daemon::pidFile(std::string f)
+prefix_ void senf::Daemon::pidFile(std::string const & f)
 {
     pidfile_ = f;
 }
@@ -95,9 +96,15 @@ prefix_ void senf::Daemon::detach()
 {
     if (daemonize_) {
         LIBC_CALL_RV( nul, ::open, ("/dev/null", O_WRONLY) );
-        LIBC_CALL( ::dup2, (nul, 1) );
-        LIBC_CALL( ::dup2, (nul, 2) );
+        LIBC_CALL( ::dup2, (stdout_ == -1 ? nul : stdout_, 1) );
+        LIBC_CALL( ::dup2, (stderr_ == -1 ? nul : stderr_, 2) );
         LIBC_CALL( ::close, (nul) );
+
+        // We need to wait here to give the daemon watcher time to flush all data to the log file.
+        struct timespec ts;
+        ts.tv_sec = 0;
+        ts.tv_nsec = 100 * 1000000ul;
+        while (::nanosleep(&ts,&ts) < 0 && errno == EINTR) ;
     }
 }
 
@@ -107,7 +114,9 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
     argv_ = argv;
 
 #   ifdef NDEBUG
+
     try {
+
 #   endif
 
         configure();
@@ -123,6 +132,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
         main();
 
 #   ifdef NDEBUG
+
     }
     catch (std::exception & e) {
         std::cerr << "\n*** Fatal exception: " << e.what() << std::endl;
@@ -132,6 +142,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv)
         std::cerr << "\n*** Fatal exception: (unknown)" << std::endl;
         return 1;
     }
+
 #   endif
 
     return 0;
@@ -205,7 +216,7 @@ prefix_ void senf::Daemon::fork()
     LIBC_CALL( ::close, (coutpipe[1]) );
     LIBC_CALL( ::close, (cerrpipe[1]) );
 
-    detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0]);
+    detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0], stdout_, stderr_);
     watcher.run();
 
     ::_exit(0);
@@ -294,11 +305,20 @@ prefix_ bool senf::Daemon::pidfileCreate()
 ///////////////////////////////////////////////////////////////////////////
 // senf::detail::DaemonWatcher
 
-prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe)
-    : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), sigChld_(false),
-      coutForwarder_(coutpipe_, 1, boost::bind(&DaemonWatcher::pipeClosed, this, 1)),
-      cerrForwarder_(cerrpipe_, 2, boost::bind(&DaemonWatcher::pipeClosed, this, 2))
-{}
+prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe,
+                                                   int stdout, int stderr)
+    : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), stdout_(stdout),
+      stderr_(stderr), sigChld_(false),
+      coutForwarder_(coutpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 1)), 
+      cerrForwarder_(cerrpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 2)) 
+{
+    coutForwarder_.addTarget(1);
+    if (stdout_ >= 0)
+        coutForwarder_.addTarget(stdout_);
+    cerrForwarder_.addTarget(2);
+    if (stderr_ >= 0)
+        cerrForwarder_.addTarget(stderr_);
+}
 
 prefix_ void senf::detail::DaemonWatcher::run()
 {
@@ -355,8 +375,8 @@ prefix_ void senf::detail::DaemonWatcher::childOk()
 ///////////////////////////////////////////////////////////////////////////
 // senf::detail::DaemonWatcher::Forwarder
 
-prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, int dst, Callback cb)
-    : src_(src), dst_(dst), cb_(cb)
+prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, Callback cb)
+    : src_(src), cb_(cb)
 {
     Scheduler::instance().add(src_, senf::membind(&Forwarder::readData, this),
                               Scheduler::EV_READ);
@@ -366,14 +386,23 @@ prefix_ senf::detail::DaemonWatcher::Forwarder::~Forwarder()
 {
     if (src_ != -1)
         Scheduler::instance().remove(src_);
-    if (dst_ != -1 && ! buffer_.empty())
-        Scheduler::instance().remove(dst_);
+    
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        if (i->offset >= buffer_.size())
+            Scheduler::instance().remove(i->fd);
+}
+
+prefix_ void senf::detail::DaemonWatcher::Forwarder::addTarget(int fd)
+{
+    Target target = { fd, 0 };
+    targets_.push_back(target);
 }
 
 prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId event)
 {
     char buf[1024];
     int n (0);
+
     while (1) {
         n = ::read(src_,buf,1024);
         if (n<0) {
@@ -381,6 +410,7 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId
         } else 
             break;
     }
+
     if (n == 0) {
         // Hangup
         Scheduler::instance().remove(src_);
@@ -389,38 +419,55 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId
         src_ = -1;
         return;
     }
-    if (dst_ == -1)
-        // There was an error writing data -> drop it
+
+    if (targets_.empty())
         return;
-    if (buffer_.empty())
-        Scheduler::instance().add(dst_, senf::membind(&Forwarder::writeData, this),
-                                  Scheduler::EV_WRITE);
+
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        if (i->offset >= buffer_.size())
+            Scheduler::instance().add( i->fd, 
+                                       boost::bind(&Forwarder::writeData, this, _1, i),
+                                       Scheduler::EV_WRITE );
+
     buffer_.insert(buffer_.end(), buf, buf+n);
 }
 
-prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event)
+prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event,
+                                                               Targets::iterator target)
 {    
     if (event != Scheduler::EV_WRITE) {
         // Broken pipe while writing data ? Not much, we can do here, we just drop the data
-        Scheduler::instance().remove(dst_);
-        dst_ = -1;
-        if (src_ == -1) cb_();
+        Scheduler::instance().remove(target->fd);
+        targets_.erase(target);
+        if (targets_.empty() && src_ == -1)
+            cb_();
         return;
     }
+
     char buf[1024];
-    int n (buffer_.size() > 1024 ? 1024 : buffer_.size());
-    std::copy(buffer_.begin(), buffer_.begin() + n, buf);
-    int w (::write(dst_, buf, n));
+    int n (buffer_.size() - target->offset > 1024 ? 1024 : buffer_.size() - target->offset);
+    std::copy(buffer_.begin() + target->offset, buffer_.begin() + target->offset + n, buf);
+
+    int w (::write(target->fd, buf, n));
     if (w < 0) {
         if (errno != EINTR) throwErrno("::write()");
         return;
     }
-    buffer_.erase(buffer_.begin(), buffer_.begin()+w);
-    if (buffer_.empty()) {
-        Scheduler::instance().remove(dst_);
-        if (src_ == -1)
-            cb_();
-    }
+    target->offset += w;
+
+    n = std::min_element(
+        targets_.begin(), targets_.end(),
+        boost::bind(&Target::offset, _1) < boost::bind(&Target::offset, _2))->offset;
+
+    buffer_.erase(buffer_.begin(), buffer_.begin()+n);
+
+    for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+        i->offset -= n;
+
+    if (target->offset >= buffer_.size())
+        Scheduler::instance().remove(target->fd);
+    if (src_ == -1 && (buffer_.empty() || targets_.empty()))
+        cb_();
 }
 
 #undef LIBC_CALL