X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=Scheduler%2FDaemon.cc;h=378297441d5bea4a3d69b1445e104c8dccdaecea;hb=44c966bc9d744d0926cffd5184fdb77a62564c16;hp=5252dcfbe883e609c0a72278a66373a56601f981;hpb=17e24d84603667395e9ffa786a9cdbb722bf9c1f;p=senf.git diff --git a/Scheduler/Daemon.cc b/Scheduler/Daemon.cc index 5252dcf..3782974 100644 --- a/Scheduler/Daemon.cc +++ b/Scheduler/Daemon.cc @@ -34,6 +34,10 @@ #include #include #include +#include +#include +#include +#include #include "../Utils/Exception.hh" #include "../Utils/membind.hh" @@ -48,7 +52,10 @@ // senf::Daemon prefix_ senf::Daemon::~Daemon() -{} +{ + if (! pidfile_.empty()) + LIBC_CALL( ::unlink, (pidfile_.c_str()) ); +} prefix_ void senf::Daemon::daemonize(bool v) { @@ -60,40 +67,84 @@ prefix_ bool senf::Daemon::daemon() return daemonize_; } -prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which) +prefix_ void senf::Daemon::consoleLog(std::string const & path, StdStream which) +{ + switch (which) { + case StdOut : stdoutLog_ = path; break; + case StdErr : stderrLog_ = path; break; + case Both : stdoutLog_ = path; stderrLog_ = path; break; + } +} + + +prefix_ void senf::Daemon::openLog() { int fd (-1); - if (! path.empty()) { - int fd (::open(path.c_str(), O_WRONLY | O_APPEND)); + if (! stdoutLog_.empty()) { + fd = ::open(stdoutLog_.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666); if (fd < 0) throwErrno("::open()"); - } - switch (which) { - case StdOut: stdout_ = fd; - break; - case StdErr: + } + if (stderrLog_ == stdoutLog_) stderr_ = fd; - break; - case Both: - stdout_ = fd; + else if (! stderrLog_.empty()) { + fd = ::open(stdoutLog_.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666); + if (fd < 0) + throwErrno("::open()"); stderr_ = fd; - break; } } -prefix_ void senf::Daemon::pidFile(std::string f, bool unique) +prefix_ void senf::Daemon::pidFile(std::string const & f) { pidfile_ = f; - unique_ = unique; +} + +namespace { + bool signaled (false); + void waitusr(int) { + signaled = true; + } } prefix_ void senf::Daemon::detach() { - LIBC_CALL_RV( nul, ::open, ("/dev/null", O_WRONLY) ); - LIBC_CALL( ::dup2, (nul, 1) ); - LIBC_CALL( ::dup2, (nul, 2) ); - LIBC_CALL( ::close, (nul) ); + if (daemonize_) { + // Wow .. ouch .. + // To ensure all data is written to the console log file in the correct order, we suspend + // execution here until the parent process tells us to continue via SIGUSR1: We block + // SIGUSR1 and install our own signal handler saving the old handler and signal mask. Then + // we close stdin/stderr which will send a HUP condition to the parent process. We wait for + // SIGUSR1 and reinstall the old signal mask and action. + ::sigset_t oldsig; + ::sigset_t usrsig; + ::sigemptyset(&usrsig); + LIBC_CALL( ::sigaddset, (&usrsig, SIGUSR1) ); + LIBC_CALL( ::sigprocmask, (SIG_BLOCK, &usrsig, &oldsig) ); + struct ::sigaction oldact; + struct ::sigaction usract; + ::memset(&usract, 0, sizeof(usract)); + usract.sa_handler = &waitusr; + LIBC_CALL( ::sigaction, (SIGUSR1, &usract, &oldact) ); + ::sigset_t waitsig (oldsig); + LIBC_CALL( ::sigdelset, (&waitsig, SIGUSR1) ); + + LIBC_CALL_RV( nul, ::open, ("/dev/null", O_WRONLY) ); + LIBC_CALL( ::dup2, (stdout_ == -1 ? nul : stdout_, 1) ); + LIBC_CALL( ::dup2, (stderr_ == -1 ? nul : stderr_, 2) ); + LIBC_CALL( ::close, (nul) ); + + signaled = false; + while (! signaled) { + ::sigsuspend(&waitsig); + if (errno != EINTR) + throwErrno("::sigsuspend()"); + } + + LIBC_CALL( ::sigaction, (SIGUSR1, &oldact, 0) ); + LIBC_CALL( ::sigprocmask, (SIG_SETMASK, &oldsig, 0) ); + } } prefix_ int senf::Daemon::start(int argc, char const ** argv) @@ -102,17 +153,27 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv) argv_ = argv; # ifdef NDEBUG + try { + # endif - configure(); - if (daemonize_) - fork(); - if (! pidfile_.empty()) - pidfileCreate(); - main(); + configure(); + + if (daemonize_) { + openLog(); + fork(); + } + if (! pidfile_.empty() && ! pidfileCreate()) { + std::cerr << "\n*** PID file '" << pidfile_ << "' creation failed. Daemon running ?" + << std::endl; + return 1; + } + + main(); # ifdef NDEBUG + } catch (std::exception & e) { std::cerr << "\n*** Fatal exception: " << e.what() << std::endl; @@ -122,6 +183,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv) std::cerr << "\n*** Fatal exception: (unknown)" << std::endl; return 1; } + # endif return 0; @@ -131,7 +193,7 @@ prefix_ int senf::Daemon::start(int argc, char const ** argv) // protected members prefix_ senf::Daemon::Daemon() - : argc_(0), argv_(0), daemonize_(true), stdout_(-1), stderr_(-1), pidfile_(""), unique_(true), + : argc_(0), argv_(0), daemonize_(true), stdout_(-1), stderr_(-1), pidfile_(""), detached_(false) {} @@ -139,7 +201,31 @@ prefix_ senf::Daemon::Daemon() // private members prefix_ void senf::Daemon::configure() -{} +{ + for (int i (1); i> old_pid) + || old_pid < 0 + || ::kill(old_pid, 0) >= 0 + || errno == EPERM ) + return false; + } + + // If we reach this point, the pid file exists but the process mentioned within the + // pid file does *not* exists. We assume, the pid file to be stale. + + // I hope, the following procedure is without race condition: We remove our generated + // temporary pid file and recreate it as hard-link to the old pid file. Now we check, that + // the hard-link count of this file is 2. If it is not, we terminate, since someone else + // must have already created his hardlink. We then truncate the file and write our pid. + + LIBC_CALL( ::unlink, (tempname.c_str() )); + if (::link(pidfile_.c_str(), tempname.c_str()) < 0) { + if (errno != ENOENT) throwErrno("::link()"); + // Hmm ... the pidfile mysteriously disappeared ... try again. + continue; + } + + { + struct ::stat s; + LIBC_CALL( ::stat, (tempname.c_str(), &s) ); + if (s.st_nlink != 2) { + LIBC_CALL( ::unlink, (tempname.c_str()) ); + return false; + } + } + + { + std::ofstream pidf (tempname.c_str()); + pidf << ::getpid() << std::endl; + } + + LIBC_CALL( ::unlink, (tempname.c_str()) ); + break; + } + return true; +} /////////////////////////////////////////////////////////////////////////// // senf::detail::DaemonWatcher -prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe) - : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), - coutForwarder_(coutpipe_, 1, senf::membind(&DaemonWatcher::pipeClosed, this)), - cerrForwarder_(cerrpipe_, 2, senf::membind(&DaemonWatcher::pipeClosed, this)) -{} +prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe, + int stdout, int stderr) + : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), stdout_(stdout), + stderr_(stderr), sigChld_(false), + coutForwarder_(coutpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 1)), + cerrForwarder_(cerrpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 2)) +{ + coutForwarder_.addTarget(1); + if (stdout_ >= 0) + coutForwarder_.addTarget(stdout_); + cerrForwarder_.addTarget(2); + if (stderr_ >= 0) + cerrForwarder_.addTarget(stderr_); +} prefix_ void senf::detail::DaemonWatcher::run() { - Scheduler::instance().registerSignal(SIGCHLD, senf::membind(&DaemonWatcher::childDied, this)); + Scheduler::instance().registerSignal(SIGCHLD, senf::membind(&DaemonWatcher::sigChld, this)); Scheduler::instance().process(); } //////////////////////////////////////// // private members -prefix_ void senf::detail::DaemonWatcher::pipeClosed() +prefix_ void senf::detail::DaemonWatcher::pipeClosed(int id) { - if (! timerRunning_) { - Scheduler::instance().timeout(Scheduler::instance().eventTime() + ClockService::seconds(1), - senf::membind(&DaemonWatcher::childOk, this)); - timerRunning_ = true; + switch (id) { + case 1 : coutpipe_ = -1; break; + case 2 : cerrpipe_ = -1; break; } + + if (coutpipe_ == -1 && cerrpipe_ == -1) { + if (sigChld_) + childDied(); // does not return + if (::kill(childPid_, SIGUSR1) < 0) + if (errno != ESRCH) throwErrno("::kill()"); + Scheduler::instance().timeout( + Scheduler::instance().eventTime() + ClockService::seconds(1), + senf::membind(&DaemonWatcher::childOk, this)); + } +} + +prefix_ void senf::detail::DaemonWatcher::sigChld() +{ + sigChld_ = true; + if (coutpipe_ == -1 && cerrpipe_ == -1) + childDied(); // does not return } prefix_ void senf::detail::DaemonWatcher::childDied() @@ -237,11 +427,11 @@ prefix_ void senf::detail::DaemonWatcher::childDied() ::signal(WTERMSIG(status),SIG_DFL); ::kill(::getpid(), WTERMSIG(status)); // should not be reached - ::exit(1); + ::_exit(1); } if (WEXITSTATUS(status) == 0) - ::exit(1); - ::exit(WEXITSTATUS(status)); + ::_exit(1); + ::_exit(WEXITSTATUS(status)); } prefix_ void senf::detail::DaemonWatcher::childOk() @@ -252,8 +442,8 @@ prefix_ void senf::detail::DaemonWatcher::childOk() /////////////////////////////////////////////////////////////////////////// // senf::detail::DaemonWatcher::Forwarder -prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, int dst, Callback cb) - : src_(src), dst_(dst), cb_(cb) +prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, Callback cb) + : src_(src), cb_(cb) { Scheduler::instance().add(src_, senf::membind(&Forwarder::readData, this), Scheduler::EV_READ); @@ -263,14 +453,23 @@ prefix_ senf::detail::DaemonWatcher::Forwarder::~Forwarder() { if (src_ != -1) Scheduler::instance().remove(src_); - if (dst_ != -1 && ! buffer_.empty()) - Scheduler::instance().remove(dst_); + + for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i) + if (i->offset >= buffer_.size()) + Scheduler::instance().remove(i->fd); +} + +prefix_ void senf::detail::DaemonWatcher::Forwarder::addTarget(int fd) +{ + Target target = { fd, 0 }; + targets_.push_back(target); } prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId event) { char buf[1024]; int n (0); + while (1) { n = ::read(src_,buf,1024); if (n<0) { @@ -278,6 +477,7 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId } else break; } + if (n == 0) { // Hangup Scheduler::instance().remove(src_); @@ -286,38 +486,55 @@ prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId src_ = -1; return; } - if (dst_ == -1) - // There was an error writing data -> drop it + + if (targets_.empty()) return; - if (buffer_.empty()) - Scheduler::instance().add(dst_, senf::membind(&Forwarder::writeData, this), - Scheduler::EV_WRITE); + + for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i) + if (i->offset >= buffer_.size()) + Scheduler::instance().add( i->fd, + boost::bind(&Forwarder::writeData, this, _1, i), + Scheduler::EV_WRITE ); + buffer_.insert(buffer_.end(), buf, buf+n); } -prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event) +prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event, + Targets::iterator target) { if (event != Scheduler::EV_WRITE) { // Broken pipe while writing data ? Not much, we can do here, we just drop the data - Scheduler::instance().remove(dst_); - dst_ = -1; - if (src_ == -1) cb_(); + Scheduler::instance().remove(target->fd); + targets_.erase(target); + if (targets_.empty() && src_ == -1) + cb_(); return; } + char buf[1024]; - int n (buffer_.size() > 1024 ? 1024 : buffer_.size()); - std::copy(buffer_.begin(), buffer_.begin() + n, buf); - int w (::write(dst_, buf, n)); + int n (buffer_.size() - target->offset > 1024 ? 1024 : buffer_.size() - target->offset); + std::copy(buffer_.begin() + target->offset, buffer_.begin() + target->offset + n, buf); + + int w (::write(target->fd, buf, n)); if (w < 0) { if (errno != EINTR) throwErrno("::write()"); return; } - buffer_.erase(buffer_.begin(), buffer_.begin()+w); - if (buffer_.empty()) { - Scheduler::instance().remove(dst_); - if (src_ == -1) - cb_(); - } + target->offset += w; + + n = std::min_element( + targets_.begin(), targets_.end(), + boost::bind(&Target::offset, _1) < boost::bind(&Target::offset, _2))->offset; + + buffer_.erase(buffer_.begin(), buffer_.begin()+n); + + for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i) + i->offset -= n; + + if (target->offset >= buffer_.size()) + Scheduler::instance().remove(target->fd); + if (src_ == -1 && (buffer_.empty() || targets_.empty())) + cb_(); } #undef LIBC_CALL