return;
SENF_LOG_BLOCK(({
std::string type (prettyName(p.typeId().id()));
- log << "PPI trace: " << label << " 0x" << std::hex << p.id() << " "
+ log << "PPI packet trace: " << label << " 0x" << std::hex << p.id() << " "
<< type.substr(21, type.size()-22) << " on " << & module() << " "
<< prettyName(typeid(module())) << " connector 0x" << this << "\n";
if (traceState_ == TRACE_CONTENTS)
}));
}
+prefix_ void senf::ppi::connector::Connector::throttleTrace(char const * label,
+ char const * type)
+{
+ if (traceState_ == NO_TRACING)
+ return;
+ SENF_LOG_BLOCK(({
+ log << "PPI throttling trace: " << label << " " << type << " on " << & module()
+ << " " << prettyName(typeid(module())) << " connector 0x" << this << "\n";
+ }));
+}
+
namespace senf { namespace ppi { namespace connector {
SENF_CONSOLE_REGISTER_ENUM_MEMBER(
"A log message is generated whenever the packet traverses a connector. The\n"
"TRACE_IDS log message has the following format:\n"
"\n"
- " PPI trace: <packet-id> <packet-type> <direction>\n"
+ " PPI packet trace: <direction> <packet-id> <packet-type>\n"
+ " on <module-id> <module-type> connector <connector-id>\n"
+ " PPI throttling trace: <direction> <throttle-msg>\n"
" on <module-id> <module-type> connector <connector-id>\n"
"\n"
"The fields are:\n"
"\n"
+ " direction 'IN' for packets/throttle notifications entering the module,\n"
+ " 'OUT' for packets/throttle notifications leaving it\n"
" packet-id Numeric unique packet id. This value is unique for packets\n"
" alive at the same time, packets at different times may (and\n"
" will) share id's\n"
" packet-type The type of the packet header\n"
- " direction 'INCOMING' for packets entering the module, 'OUTGOING' for\n"
- " packets leaving it\n"
" module-id Unique module id\n"
" module-type Type of the module the packet is sent to/from\n"
- " connector-id Unique connector id\n");
+ " connector-id Unique connector id\n"
+ " throttle-msg Type of throttling event\n");
senf::ppi::ModuleManager::instance().consoleDir()
.add("tracing", SENF_FNP(void, senf::ppi::connector::Connector::tracing,
remoteThrottled_ = false;
if (!nativeThrottled_)
emitUnthrottle();
- }
+ } else
+ throttleTrace("OUT", "not forwarding unthrottle event");
}
///////////////////////////////////////////////////////////////////////////
prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
{
+ throttleTrace("IN ", "throttle");
if (! throttled_) {
throttled_ = true;
if (throttleCallback_)
prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
{
+ throttleTrace("IN ", "unthrottle");
if (throttled_) {
throttled_ = false;
if (unthrottleCallback_)
queue_.pop_back();
v_dequeueEvent();
}
- trace(p, "INCOMING");
+ trace(p, "IN ");
return p;
}
prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle()
{
+ throttleTrace("OUT", "throttle");
if (connected())
peer().notifyThrottle();
}
prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle()
{
+ throttleTrace("OUT", "unthrottle");
if (connected()) {
peer().notifyUnthrottle();
v_unthrottleEvent();
if (!throttled()) {
nativeThrottled_ = true;
emitThrottle();
- }
+ } else
+ nativeThrottled_ = true;
}
prefix_ void senf::ppi::connector::PassiveConnector::unthrottle()
SENF_ASSERT(callback_ && "senf::ppi::connector::PassiveConnector: missing onRequest()");
if (!throttled())
callback_();
+ else
+ throttleTrace("IN ", "queueing packet");
}
///////////////////////////////////////////////////////////////////////////
prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet const & p)
{
- trace(p, "OUTGOING");
+ trace(p, "OUT");
if (connected())
peer().enqueue(p);
}
void connect(Connector & target);
void trace(Packet const & p, char const * label);
+ void throttleTrace(char const * label, char const * type);
private:
virtual std::type_info const & packetTypeID();
private:
virtual void v_init();
- // Called by the routing to change the remote throttling state
+ // Called by the routing to change the throttling state from forwarding routes
void notifyThrottle(); ///< Forward a throttle notification to this connector
void notifyUnthrottle(); ///< Forward an unthrottle notification to this connector
- // Internal members to emit throttling notifications
+ // Internal members to emit throttling notifications to the connected peer
void emitThrottle();
void emitUnthrottle();
for (; i != i_end; ++i)
if ((*i)->throttled())
break;
- if (i == i_end)
- enabled(true);
+ if (i != i_end)
+ return;
+ throttled_ = false;
+ enabled(true);
}
prefix_ void senf::ppi::EventDescriptor::enabled(bool v)
{
SENF_ASSERT(v_isRegistered() && "Module::registerEvent() call missing");
+ if (throttled_ && v)
+ return;
if (v && ! enabled_)
v_enable();
else if (! v && enabled_)
// protected members
prefix_ senf::ppi::EventDescriptor::EventDescriptor()
- : enabled_(false)
+ : enabled_(false), throttled_(false)
{}
////////////////////////////////////////
prefix_ void senf::ppi::EventDescriptor::notifyThrottle()
{
+ throttled_ = true;
enabled(false);
}
void registerRoute(ForwardingRoute & route);
bool enabled_;
+ bool throttled_;
typedef std::vector<ForwardingRoute*> Routes;
Routes routes_;
if (handle) {
fd_ = senf::scheduler::get_descriptor(handle);
event_.events(events).handle(fd_);
- event_.enable();
+ if (enabled())
+ event_.enable();
+ else
+ event_.disable();
}
else {
event_.disable();
.add("dump", senf::membind(&ModuleManager::dumpModules, this))
.doc("Dump complete PPI structure\n"
"The dump will contain one paragraph for each module. The first line gives module\n"
- "information, additional lines list all connectors and their peers (if connected).");
+ "information, additional lines list all connectors and their peers (if connected).\n"
+ "\n"
+ "This information can be processed by 'PPI/drawmodules.py' and 'dot' (from the\n"
+ "graphviz package) to generate a graphic representation of the module structure:\n"
+ "\n"
+ " $ echo /sys/ppi/dump | nc -q1 <host> <port> \\\n"
+ " | python PPI/drawmodules.py | dot -Tpng /dev/fd/0 >modules.png\n");
}
prefix_ void senf::ppi::ModuleManager::dumpModules(std::ostream & os)
template <class Writer>
prefix_ void senf::ppi::module::PassiveSocketSink<Writer>::checkThrottle()
{
- if (handle_)
+ if (handle_.valid())
input.unthrottle();
else
input.throttle();
prefix_ void senf::ClientSocketHandle<SPolicy>::state(SocketStateMap & map, unsigned lod)
{
map["handle"] = prettyName(typeid(*this));
- this->body().state(map, lod);
+ if (this->valid()) {
+ map["valid"] << "true";
+ this->body().state(map,lod);
+ } else
+ map["valid"] << "false";
}
template <class SPolicy>
unsigned lod)
{
map["handle"] = prettyName(typeid(*this));
- this->body().state(map,lod);
+ if (this->valid()) {
+ map["valid"] << "true";
+ this->body().state(map,lod);
+ } else
+ map["valid"] << "false";
}
template <class SocketProtocol>
unsigned lod)
{
map["handle"] = prettyName(typeid(*this));
- this->body().state(map,lod);
+ if (this->valid()) {
+ map["valid"] << "true";
+ this->body().state(map,lod);
+ } else
+ map["valid"] << "false";
}
template <class SocketProtocol>
prefix_ void senf::ServerSocketHandle<SPolicy>::state(SocketStateMap & map, unsigned lod)
{
map["handle"] = prettyName(typeid(*this));
- this->body().state(map,lod);
+ if (this->valid()) {
+ map["valid"] << "true";
+ this->body().state(map,lod);
+ } else
+ map["valid"] << "false";
}
template <class SPolicy>
// the type name and therefore show the \e static policy of the
// socket handle.
map["handle"] << prettyName(typeid(*this));
- body().state(map,lod);
+ if (valid()) {
+ map["valid"] << "true";
+ body().state(map,lod);
+ } else
+ map["valid"] << "false";
}
template <class SPolicy>
prefix_ void senf::console::Server::removeClient(Client & client)
{
- SENF_LOG(( "Disposing client " << client.handle().peer() ));
+ SENF_LOG_BLOCK(({
+ log << "Disposing client ";
+ try {
+ log << client.handle().peer();
+ }
+ catch (senf::SystemException ex) {
+ log << "(unknown)";
+ }
+ }));
// THIS DELETES THE CLIENT INSTANCE !!
clients_.erase(boost::intrusive_ptr<Client>(&client));
}