X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=PPI%2FConnectors.cci;h=ec587ceffa54580e0a2c862ea3c5498d8b5fadbc;hb=d5a72d0b3f6fee56dba6de1c54cafb448ebe3457;hp=8d24f267cb7f2214fea2b4bd9e412c97fbf367cc;hpb=026887c8f93c5dc89fb68446ae014205de559451;p=senf.git diff --git a/PPI/Connectors.cci b/PPI/Connectors.cci index 8d24f26..ec587ce 100644 --- a/PPI/Connectors.cci +++ b/PPI/Connectors.cci @@ -1,8 +1,8 @@ // $Id$ // -// Copyright (C) 2007 -// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) -// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Copyright (C) 2007 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY // Stefan Bund // // This program is free software; you can redistribute it and/or modify @@ -24,6 +24,8 @@ \brief Connectors inline non-template implementation */ // Custom includes +#include "../Utils/TypeInfo.hh" +#include "../Utils/senfassert.hh" #define prefix_ inline ///////////////////////////////cci.p/////////////////////////////////////// @@ -32,17 +34,32 @@ // senf::ppi::connector::Connector prefix_ senf::ppi::connector::Connector & senf::ppi::connector::Connector::peer() + const { - BOOST_ASSERT(peer_); + // The connector is not connected + SENF_ASSERT(peer_ && "senf::ppi::connect() call missing"); return *peer_; } prefix_ senf::ppi::module::Module & senf::ppi::connector::Connector::module() + const { - BOOST_ASSERT(module_); + // The connector is not registered in the module -> probably a route() or noroute() statement is + // missing. + SENF_ASSERT(module_ && "Connector not registered: Missing route() or noroute()"); return *module_; } +prefix_ void senf::ppi::connector::Connector::tracing(TraceState state) +{ + traceState_ = state; +} + +prefix_ senf::ppi::connector::Connector::TraceState senf::ppi::connector::Connector::tracing() +{ + return traceState_; +} + //////////////////////////////////////// // protected members @@ -50,6 +67,21 @@ prefix_ senf::ppi::connector::Connector::Connector() : peer_(), module_() {} +prefix_ senf::ppi::connector::Connector::~Connector() +{ + if (connected()) { + Connector & peer (*peer_); + peer_->peer_ = 0; + peer.v_init(); + } +} + +prefix_ bool senf::ppi::connector::Connector::connected() + const +{ + return peer_; +} + //////////////////////////////////////// // private members @@ -59,83 +91,176 @@ prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module) } /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::ActiveConnector +// senf::ppi::connector::PassiveConnector + +prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer() + const +{ + return dynamic_cast(Connector::peer()); +} + +prefix_ bool senf::ppi::connector::PassiveConnector::throttled() + const +{ + return nativeThrottled_ || remoteThrottled_; +} //////////////////////////////////////// -// protected members +// private members -prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector() -{} +prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle() +{ + throttleTrace("OUT", "throttle"); + if (connected()) + peer().notifyThrottle(); +} -/////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::PassiveConnector +prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle() +{ + throttleTrace("OUT", "unthrottle"); + if (connected()) { + peer().notifyUnthrottle(); + v_unthrottleEvent(); + } +} -prefix_ senf::ppi::connector::ActiveConnector & senf::ppi::connector::PassiveConnector::peer() +prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle() { - return dynamic_cast(Connector::peer()); + if (!throttled()) { + remoteThrottled_ = true; + emitThrottle(); + } + else + remoteThrottled_ = true; +} + +prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route) +{ + routes_.push_back(&route); +} + +// public members + +prefix_ bool senf::ppi::connector::PassiveConnector::nativeThrottled() + const +{ + return nativeThrottled_; +} + +prefix_ void senf::ppi::connector::PassiveConnector::throttle() +{ + if (!throttled()) { + nativeThrottled_ = true; + emitThrottle(); + } else + nativeThrottled_ = true; +} + +prefix_ void senf::ppi::connector::PassiveConnector::unthrottle() +{ + if (throttled() && ! remoteThrottled_) { + nativeThrottled_ = false; + emitUnthrottle(); + } else + nativeThrottled_ = false; + } //////////////////////////////////////// // protected members prefix_ senf::ppi::connector::PassiveConnector::PassiveConnector() - : callback_() + : callback_(), remoteThrottled_(), nativeThrottled_() {} prefix_ void senf::ppi::connector::PassiveConnector::emit() { - BOOST_ASSERT(callback_); - callback_(); + // No event callback has been registered (onRequest() call missing) + SENF_ASSERT(callback_ && "senf::ppi::connector::PassiveConnector: missing onRequest()"); + if (!throttled()) + callback_(); + else + throttleTrace("IN ", "queueing packet"); } /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::InputConnector +// senf::ppi::connector::ActiveConnector -prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()() +prefix_ senf::ppi::connector::PassiveConnector & senf::ppi::connector::ActiveConnector::peer() + const { - v_requestEvent(); - Packet p (peek()); - queue_.pop_back(); - v_dequeueEvent(); - return p; + return dynamic_cast(Connector::peer()); } -prefix_ bool senf::ppi::connector::InputConnector::boolean_test() +prefix_ void senf::ppi::connector::ActiveConnector::onThrottle() { - ///\fixme Add additional active/passive throttle/unthrottle conditions (make virtual ?) - return ! empty(); + throttleCallback_ = Callback(); +} + +prefix_ void senf::ppi::connector::ActiveConnector::onUnthrottle() +{ + unthrottleCallback_ = Callback(); +} + +prefix_ bool senf::ppi::connector::ActiveConnector::throttled() + const +{ + return ! connected() || peer().throttled(); +} + +//////////////////////////////////////// +// protected members + +prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector() + : throttleCallback_(), unthrottleCallback_(), notifyRoutes_(), throttled_(false) +{} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::InputConnector + +prefix_ senf::Packet senf::ppi::connector::InputConnector::read() +{ + return operator()(); } prefix_ senf::ppi::connector::OutputConnector & senf::ppi::connector::InputConnector::peer() + const { return dynamic_cast(Connector::peer()); } prefix_ senf::ppi::connector::InputConnector::queue_iterator senf::ppi::connector::InputConnector::begin() + const { return queue_.begin(); } prefix_ senf::ppi::connector::InputConnector::queue_iterator senf::ppi::connector::InputConnector::end() + const { return queue_.end(); } prefix_ senf::Packet senf::ppi::connector::InputConnector::peek() + const { - BOOST_ASSERT( ! queue_.empty() ); + // Cannot peek() head of empty queue + SENF_ASSERT( ! queue_.empty() && + "senf::ppi::connector::InputConnector: cannot call peek() on empty queue" ); return queue_.back(); } prefix_ senf::ppi::connector::InputConnector::size_type senf::ppi::connector::InputConnector::queueSize() + const { return queue_.size(); } prefix_ bool senf::ppi::connector::InputConnector::empty() + const { return queue_.empty(); } @@ -149,7 +274,7 @@ prefix_ senf::ppi::connector::InputConnector::InputConnector() //////////////////////////////////////// // private members -prefix_ void senf::ppi::connector::InputConnector::enqueue(Packet p) +prefix_ void senf::ppi::connector::InputConnector::enqueue(Packet const & p) { queue_.push_front(p); v_enqueueEvent(); @@ -158,14 +283,22 @@ prefix_ void senf::ppi::connector::InputConnector::enqueue(Packet p) /////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::OutputConnector -prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet p) +prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer() + const { - peer().enqueue(p); + return dynamic_cast(Connector::peer()); } -prefix_ senf::ppi::connector::InputConnector & senf::ppi::connector::OutputConnector::peer() +prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet const & p) { - return dynamic_cast(Connector::peer()); + trace(p, "OUT"); + if (connected()) + peer().enqueue(p); +} + +prefix_ void senf::ppi::connector::OutputConnector::write(Packet const & p) +{ + operator()(p); } //////////////////////////////////////// @@ -175,56 +308,94 @@ prefix_ senf::ppi::connector::OutputConnector::OutputConnector() {} /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::PassiveInput +// senf::ppi::connector::GenericPassiveInput -prefix_ senf::ppi::connector::PassiveInput::PassiveInput() - : qdisc_(), qstate_(QueueingDiscipline::UNTHROTTLED) +prefix_ senf::ppi::connector::GenericPassiveInput::GenericPassiveInput() + : qdisc_(new ThresholdQueueing(1,0)) {} -prefix_ senf::ppi::connector::ActiveOutput & senf::ppi::connector::PassiveInput::peer() +prefix_ senf::ppi::connector::GenericActiveOutput & senf::ppi::connector::GenericPassiveInput::peer() + const +{ + return dynamic_cast(Connector::peer()); +} + +prefix_ bool senf::ppi::connector::GenericPassiveInput::boolean_test() + const { - return dynamic_cast(Connector::peer()); + return ! empty(); } /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::PassiveOutput +// senf::ppi::connector::GenericPassiveOutput + +prefix_ senf::ppi::connector::GenericActiveInput & senf::ppi::connector::GenericPassiveOutput::peer() + const +{ + return dynamic_cast(Connector::peer()); +} -prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::PassiveOutput::peer() +prefix_ bool senf::ppi::connector::GenericPassiveOutput::boolean_test() + const { - return dynamic_cast(Connector::peer()); + return true; } -prefix_ void senf::ppi::connector::PassiveOutput::connect(ActiveInput & target) +prefix_ void senf::ppi::connector::GenericPassiveOutput::connect(GenericActiveInput & target) { Connector::connect(target); } +prefix_ senf::ppi::connector::GenericPassiveOutput::GenericPassiveOutput() +{} + /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::ActiveInput +// senf::ppi::connector::GenericActiveInput + +prefix_ senf::ppi::connector::GenericPassiveOutput & senf::ppi::connector::GenericActiveInput::peer() + const +{ + return dynamic_cast(Connector::peer()); +} -prefix_ senf::ppi::connector::PassiveOutput & senf::ppi::connector::ActiveInput::peer() +prefix_ bool senf::ppi::connector::GenericActiveInput::boolean_test() + const { - return dynamic_cast(Connector::peer()); + return ! empty() || (connected() && ! peer().throttled()); } -prefix_ void senf::ppi::connector::ActiveInput::request() +prefix_ void senf::ppi::connector::GenericActiveInput::request() { - peer().emit(); + if (connected()) + peer().emit(); } +prefix_ senf::ppi::connector::GenericActiveInput::GenericActiveInput() +{} + /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::ActiveOutput +// senf::ppi::connector::GenericActiveOutput -prefix_ senf::ppi::connector::ActiveInput & senf::ppi::connector::ActiveOutput::peer() +prefix_ senf::ppi::connector::GenericPassiveInput & senf::ppi::connector::GenericActiveOutput::peer() + const { - return dynamic_cast(Connector::peer()); + return dynamic_cast(Connector::peer()); } -prefix_ void senf::ppi::connector::ActiveOutput::connect(PassiveInput & target) +prefix_ bool senf::ppi::connector::GenericActiveOutput::boolean_test() + const +{ + return connected() && ! peer().throttled(); +} + +prefix_ void senf::ppi::connector::GenericActiveOutput::connect(GenericPassiveInput & target) { Connector::connect(target); } +prefix_ senf::ppi::connector::GenericActiveOutput::GenericActiveOutput() +{} + ///////////////////////////////cci.e/////////////////////////////////////// #undef prefix_