#include <errno.h>
#include <signal.h>
#include <sstream>
+#include <algorithm>
#include "../Utils/Exception.hh"
#include "../Utils/membind.hh"
return daemonize_;
}
-prefix_ void senf::Daemon::consoleLog(std::string path, StdStream which)
+prefix_ void senf::Daemon::consoleLog(std::string const & path, StdStream which)
{
int fd (-1);
if (! path.empty()) {
- int fd (::open(path.c_str(), O_WRONLY | O_APPEND));
+ fd = ::open(path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);
if (fd < 0)
throwErrno("::open()");
}
}
}
-prefix_ void senf::Daemon::pidFile(std::string f)
+prefix_ void senf::Daemon::pidFile(std::string const & f)
{
pidfile_ = f;
}
{
if (daemonize_) {
LIBC_CALL_RV( nul, ::open, ("/dev/null", O_WRONLY) );
- LIBC_CALL( ::dup2, (nul, 1) );
- LIBC_CALL( ::dup2, (nul, 2) );
+ LIBC_CALL( ::dup2, (stdout_ == -1 ? nul : stdout_, 1) );
+ LIBC_CALL( ::dup2, (stderr_ == -1 ? nul : stderr_, 2) );
LIBC_CALL( ::close, (nul) );
+
+ // We need to wait here to give the daemon watcher time to flush all data to the log file.
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 100 * 1000000ul;
+ while (::nanosleep(&ts,&ts) < 0 && errno == EINTR) ;
}
}
argv_ = argv;
# ifdef NDEBUG
+
try {
+
# endif
configure();
main();
# ifdef NDEBUG
+
}
catch (std::exception & e) {
std::cerr << "\n*** Fatal exception: " << e.what() << std::endl;
std::cerr << "\n*** Fatal exception: (unknown)" << std::endl;
return 1;
}
+
# endif
return 0;
LIBC_CALL( ::close, (coutpipe[1]) );
LIBC_CALL( ::close, (cerrpipe[1]) );
- detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0]);
+ detail::DaemonWatcher watcher (pid, coutpipe[0], cerrpipe[0], stdout_, stderr_);
watcher.run();
::_exit(0);
///////////////////////////////////////////////////////////////////////////
// senf::detail::DaemonWatcher
-prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe)
- : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), sigChld_(false),
- coutForwarder_(coutpipe_, 1, boost::bind(&DaemonWatcher::pipeClosed, this, 1)),
- cerrForwarder_(cerrpipe_, 2, boost::bind(&DaemonWatcher::pipeClosed, this, 2))
-{}
+prefix_ senf::detail::DaemonWatcher::DaemonWatcher(int pid, int coutpipe, int cerrpipe,
+ int stdout, int stderr)
+ : childPid_(pid), coutpipe_(coutpipe), cerrpipe_(cerrpipe), stdout_(stdout),
+ stderr_(stderr), sigChld_(false),
+ coutForwarder_(coutpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 1)),
+ cerrForwarder_(cerrpipe_, boost::bind(&DaemonWatcher::pipeClosed, this, 2))
+{
+ coutForwarder_.addTarget(1);
+ if (stdout_ >= 0)
+ coutForwarder_.addTarget(stdout_);
+ cerrForwarder_.addTarget(2);
+ if (stderr_ >= 0)
+ cerrForwarder_.addTarget(stderr_);
+}
prefix_ void senf::detail::DaemonWatcher::run()
{
///////////////////////////////////////////////////////////////////////////
// senf::detail::DaemonWatcher::Forwarder
-prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, int dst, Callback cb)
- : src_(src), dst_(dst), cb_(cb)
+prefix_ senf::detail::DaemonWatcher::Forwarder::Forwarder(int src, Callback cb)
+ : src_(src), cb_(cb)
{
Scheduler::instance().add(src_, senf::membind(&Forwarder::readData, this),
Scheduler::EV_READ);
{
if (src_ != -1)
Scheduler::instance().remove(src_);
- if (dst_ != -1 && ! buffer_.empty())
- Scheduler::instance().remove(dst_);
+
+ for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+ if (i->offset >= buffer_.size())
+ Scheduler::instance().remove(i->fd);
+}
+
+prefix_ void senf::detail::DaemonWatcher::Forwarder::addTarget(int fd)
+{
+ Target target = { fd, 0 };
+ targets_.push_back(target);
}
prefix_ void senf::detail::DaemonWatcher::Forwarder::readData(Scheduler::EventId event)
{
char buf[1024];
int n (0);
+
while (1) {
n = ::read(src_,buf,1024);
if (n<0) {
} else
break;
}
+
if (n == 0) {
// Hangup
Scheduler::instance().remove(src_);
src_ = -1;
return;
}
- if (dst_ == -1)
- // There was an error writing data -> drop it
+
+ if (targets_.empty())
return;
- if (buffer_.empty())
- Scheduler::instance().add(dst_, senf::membind(&Forwarder::writeData, this),
- Scheduler::EV_WRITE);
+
+ for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+ if (i->offset >= buffer_.size())
+ Scheduler::instance().add( i->fd,
+ boost::bind(&Forwarder::writeData, this, _1, i),
+ Scheduler::EV_WRITE );
+
buffer_.insert(buffer_.end(), buf, buf+n);
}
-prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event)
+prefix_ void senf::detail::DaemonWatcher::Forwarder::writeData(Scheduler::EventId event,
+ Targets::iterator target)
{
if (event != Scheduler::EV_WRITE) {
// Broken pipe while writing data ? Not much, we can do here, we just drop the data
- Scheduler::instance().remove(dst_);
- dst_ = -1;
- if (src_ == -1) cb_();
+ Scheduler::instance().remove(target->fd);
+ targets_.erase(target);
+ if (targets_.empty() && src_ == -1)
+ cb_();
return;
}
+
char buf[1024];
- int n (buffer_.size() > 1024 ? 1024 : buffer_.size());
- std::copy(buffer_.begin(), buffer_.begin() + n, buf);
- int w (::write(dst_, buf, n));
+ int n (buffer_.size() - target->offset > 1024 ? 1024 : buffer_.size() - target->offset);
+ std::copy(buffer_.begin() + target->offset, buffer_.begin() + target->offset + n, buf);
+
+ int w (::write(target->fd, buf, n));
if (w < 0) {
if (errno != EINTR) throwErrno("::write()");
return;
}
- buffer_.erase(buffer_.begin(), buffer_.begin()+w);
- if (buffer_.empty()) {
- Scheduler::instance().remove(dst_);
- if (src_ == -1)
- cb_();
- }
+ target->offset += w;
+
+ n = std::min_element(
+ targets_.begin(), targets_.end(),
+ boost::bind(&Target::offset, _1) < boost::bind(&Target::offset, _2))->offset;
+
+ buffer_.erase(buffer_.begin(), buffer_.begin()+n);
+
+ for (Targets::iterator i (targets_.begin()); i != targets_.end(); ++i)
+ i->offset -= n;
+
+ if (target->offset >= buffer_.size())
+ Scheduler::instance().remove(target->fd);
+ if (src_ == -1 && (buffer_.empty() || targets_.empty()))
+ cb_();
}
#undef LIBC_CALL
void daemonize(bool); ///< Configure whether to run in fore- or background
bool daemon(); ///< \c true, if running as daemon
- void consoleLog(std::string, StdStream which = Both); ///< Configure console log file
+ void consoleLog(std::string const &, StdStream which = Both);
+ ///< Configure console log file
/**< May be called multiple times to set the log file
for stdout and stderr seperately. Any standard stream
not assigned to a log file will be redirected to
When running in the foreground, the log files will be
ignored. */
- void pidFile(std::string); ///< Configure pid file
+ void pidFile(std::string const &); ///< Configure pid file
///\}
///\name Auxiliary helpers
// Custom includes
#include <deque>
+#include <list>
#include <boost/utility.hpp>
#include <boost/function.hpp>
#include "../Scheduler/Scheduler.hh"
{
public:
- DaemonWatcher(int pid, int coutpipe, int cerrpipe);
+ DaemonWatcher(int pid, int coutpipe, int cerrpipe, int stdout, int stderr);
void run();
public:
typedef boost::function<void ()> Callback;
- Forwarder(int src, int dst, Callback cb);
+ Forwarder(int src, Callback cb);
~Forwarder();
+ void addTarget(int fd);
+
private:
+ typedef std::deque<char> Buffer;
+ struct Target
+ {
+ int fd;
+ Buffer::size_type offset;
+ };
+ typedef std::list<Target> Targets;
+
void readData(Scheduler::EventId event);
- void writeData(Scheduler::EventId event);
+ void writeData(Scheduler::EventId event, Targets::iterator target);
- typedef std::deque<char> Buffer;
Buffer buffer_;
int src_;
- int dst_;
+
+ Targets targets_;
+
Callback cb_;
};
int childPid_;
int coutpipe_;
int cerrpipe_;
+ int stdout_;
+ int stderr_;
bool sigChld_;
Forwarder coutForwarder_;
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
+#include <fstream>
#include <boost/filesystem/operations.hpp>
#include "Daemon.hh"
#include "../Utils/Exception.hh"
void configure() {
std::cout << "Running configure()" << std::endl;
pidFile("testDaemon.pid");
+ consoleLog("testDaemon.log");
}
void init() {
}
int status;
if (::waitpid(pid, &status, 0) < 0) senf::throwErrno("::waitpid()");
- return WIFEXITED(status) ? WEXITSTATUS(status) : -1;
+ return WIFEXITED(status) ? WEXITSTATUS(status) : -1;
}
}
BOOST_CHECK( boost::filesystem::exists("testDaemon.pid") );
delay(1000);
BOOST_CHECK( ! boost::filesystem::exists("testDaemon.pid") );
+ BOOST_REQUIRE( boost::filesystem::exists("testDaemon.log") );
+
+ std::ifstream log ("testDaemon.log");
+ std::stringstream data;
+ data << log.rdbuf();
+ BOOST_CHECK_EQUAL( data.str(), "Running init()\nRunning run()\n" );
+ BOOST_CHECK_NO_THROW( boost::filesystem::remove("testDaemon.log") );
}
///////////////////////////////cc.e////////////////////////////////////////
///////////////////////////////cc.p////////////////////////////////////////
prefix_ senf::Scheduler::Scheduler()
- : timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false),
+ : files_(0), timerIdCounter_(0), epollFd_ (epoll_create(EPollInitialSize)), terminate_(false),
eventTime_(0), eventEarly_(ClockService::milliseconds(11)), eventAdjust_(0)
{
if (epollFd_<0)
ev.events = i->second.epollMask();
ev.data.fd = fd;
- if (epoll_ctl(epollFd_, action, fd, &ev)<0)
- throw SystemException(errno);
+ if (! i->second.file && epoll_ctl(epollFd_, action, fd, &ev) < 0) {
+ if (errno == EPERM) {
+ // Argh ... epoll does not support ordinary files :-( :-(
+ i->second.file = true;
+ ++ files_;
+ }
+ else
+ throwErrno("::epoll_ctl()");
+ }
}
prefix_ void senf::Scheduler::do_remove(int fd, int eventMask)
ev.data.fd = fd;
int action (EPOLL_CTL_MOD);
+ bool file (i->second.file);
if (ev.events==0) {
action = EPOLL_CTL_DEL;
fdTable_.erase(i);
}
- if (epoll_ctl(epollFd_, action, fd, &ev)<0)
- throw SystemException(errno);
+ if (! file && epoll_ctl(epollFd_, action, fd, &ev) < 0)
+ throwErrno("::epoll_ctl()");
+ if (file)
+ -- files_;
}
prefix_ void senf::Scheduler::registerSigHandlers()
}
int timeout (-1);
- if (timerQueue_.empty()) {
- if (fdTable_.empty())
- break;
- }
+ if (files_ > 0)
+ timeout = 0;
else {
- ClockService::clock_type delta (
- (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL);
- timeout = delta < 0 ? 0 : delta;
+ if (timerQueue_.empty()) {
+ if (fdTable_.empty())
+ break;
+ }
+ else {
+ ClockService::clock_type delta (
+ (timerQueue_.top()->second.timeout - eventTime_ + eventAdjust_)/1000000UL);
+ timeout = delta < 0 ? 0 : delta;
+ }
}
///\todo Handle more than one epoll_event per call
continue;
}
- FdTable::iterator i = fdTable_.find(ev.data.fd);
- BOOST_ASSERT (i != fdTable_.end() );
- EventSpec spec (i->second);
+ for (FdTable::iterator i = fdTable_.begin(); i != fdTable_.end(); ++i) {
+ EventSpec spec (i->second);
+ unsigned extraFlags (0);
+ unsigned events (spec.file ? spec.epollMask() : ev.events);
- unsigned extraFlags (0);
- if (ev.events & EPOLLHUP) extraFlags |= EV_HUP;
- if (ev.events & EPOLLERR) extraFlags |= EV_ERR;
+ if (! (spec.file || i->first == ev.data.fd))
+ continue;
+
+ if (events & EPOLLHUP) extraFlags |= EV_HUP;
+ if (events & EPOLLERR) extraFlags |= EV_ERR;
- if (ev.events & EPOLLIN) {
- BOOST_ASSERT(spec.cb_read);
- spec.cb_read(EventId(EV_READ | extraFlags));
- }
- else if (ev.events & EPOLLPRI) {
- BOOST_ASSERT(spec.cb_prio);
- spec.cb_prio(EventId(EV_PRIO | extraFlags));
- }
- else if (ev.events & EPOLLOUT) {
- BOOST_ASSERT(spec.cb_write);
- spec.cb_write(EventId(EV_WRITE | extraFlags));
- }
- else {
- // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI.
- // In this case we will signal all registered callbacks. The callbacks must be
- // prepared to be called multiple times if they are registered to more than
- // one event.
- if (spec.cb_write)
- spec.cb_write(EventId(extraFlags));
- if (spec.cb_prio)
- spec.cb_prio(EventId(extraFlags));
- if (spec.cb_read)
- spec.cb_read(EventId(extraFlags));
+ if (events & EPOLLIN) {
+ BOOST_ASSERT(spec.cb_read);
+ spec.cb_read(EventId(EV_READ | extraFlags));
+ }
+ else if (events & EPOLLPRI) {
+ BOOST_ASSERT(spec.cb_prio);
+ spec.cb_prio(EventId(EV_PRIO | extraFlags));
+ }
+ else if (events & EPOLLOUT) {
+ BOOST_ASSERT(spec.cb_write);
+ spec.cb_write(EventId(EV_WRITE | extraFlags));
+ }
+ else {
+ // This branch is only taken, if HUP or ERR is signaled but none of IN/OUT/PRI.
+ // In this case we will signal all registered callbacks. The callbacks must be
+ // prepared to be called multiple times if they are registered to more than
+ // one event.
+ if (spec.cb_write)
+ spec.cb_write(EventId(extraFlags));
+ if (spec.cb_prio)
+ spec.cb_prio(EventId(extraFlags));
+ if (spec.cb_read)
+ spec.cb_read(EventId(extraFlags));
+ }
}
}
}
wait for signals \e only.
\todo Fix EventId parameter (probably to int) to allow |-ing without casting ...
+
+ \todo Fix the file support to use threads (?) fork (?) and a pipe so it works reliably even
+ over e.g. NFS.
*/
class Scheduler
: boost::noncopyable
FdCallback cb_prio;
FdCallback cb_write;
+ EventSpec() : file(false) {}
+
int epollMask() const;
+
+ bool file;
};
/** \brief Timer event specification
typedef std::vector<SimpleCallback> SigHandlers;
FdTable fdTable_;
+ unsigned files_;
unsigned timerIdCounter_;
TimerQueue timerQueue_;