X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=PPI%2FConnectors.hh;h=6e01ca36cfdbb1b6d78b247f4376f6186142c33b;hb=f539f4271d470794a773a92bacd8ba086c9bc1cd;hp=6ca4aa5fd12e66d4c83e531cf1c5a6e2a6866e93;hpb=ceba483d966a9fd884b30fb9dac07156b29a4b31;p=senf.git diff --git a/PPI/Connectors.hh b/PPI/Connectors.hh index 6ca4aa5..6e01ca3 100644 --- a/PPI/Connectors.hh +++ b/PPI/Connectors.hh @@ -67,8 +67,8 @@ namespace connector { : boost::noncopyable { public: - Connector & peer(); ///< Get peer connected to this connector - module::Module & module(); ///< Get this connectors containing module + Connector & peer() const; ///< Get peer connected to this connector + module::Module & module() const; ///< Get this connectors containing module protected: Connector(); @@ -119,13 +119,13 @@ namespace connector { operation is to be performed. */ - bool throttled(); ///< Get accumulative throttling state - bool nativeThrottled(); ///< Get native throttling state + bool throttled() const; ///< Get accumulative throttling state + bool nativeThrottled() const; ///< Get native throttling state void throttle(); ///< Set native throttling void unthrottle(); ///< Revoke native throttling - ActiveConnector & peer(); + ActiveConnector & peer() const; protected: PassiveConnector(); @@ -133,13 +133,25 @@ namespace connector { void emit(); private: + // Called by the routing to change the remote throttling state void notifyThrottle(); ///< Forward a throttling notification to this connector void notifyUnthrottle(); ///< Forward an unthrottling notification to this connector + // Internal members to emit throttling notifications + void emitThrottle(); + void emitUnthrottle(); + + // Called after unthrottling the connector + virtual void v_unthrottleEvent(); + typedef detail::Callback<>::type Callback; Callback callback_; - - friend class ActiveConnector; + + bool remoteThrottled_; + bool nativeThrottled_; + + friend class senf::ppi::detail::ForwardForwardingRouteImplementation; + friend class senf::ppi::detail::BackwardForwardingRouteImplementation; }; /** \brief Active connector baseclass @@ -155,9 +167,10 @@ namespace connector { class ActiveConnector : public virtual Connector { + typedef detail::Callback<>::type Callback; public: template - void onThrottle(Handler handle); ///< Register throttle notification handler + void onThrottle(Handler handler); ///< Register throttle notification handler /**< The handler register here will be called, whenever a throttle notification comes in. The \a handler argument is either an arbitrary callable object or it is a @@ -165,11 +178,12 @@ namespace connector { this input. In the second case, the pointer will automatically be bound to the containing instance. - \param[in] handle Handler to call on throttle + \param[in] handler Handler to call on throttle notifications. */ + void onThrottle(); template - void onUnthrottle(Handler handle); ///< Register unthrottle notification handler + void onUnthrottle(Handler handler); ///< Register unthrottle notification handler /**< The handler register here will be called, whenever an unthrottle notification comes in. The \a handler argument is either an arbitrary callable object or it @@ -179,11 +193,29 @@ namespace connector { \param[in] handle Handler to call on unthrottle notifications. */ + void onUnthrottle(); - PassiveConnector & peer(); + PassiveConnector & peer() const; protected: ActiveConnector(); + + private: + // called by the peer() to forward throttling notifications + void notifyThrottle(); + void notifyUnthrottle(); + + // called by ForwardingRoute to register a new route + void registerRoute(ForwardingRoute & route); + + Callback throttleCallback_; + Callback unthrottleCallback_; + + typedef std::vector NotifyRoutes; + NotifyRoutes notifyRoutes_; + + friend class senf::ppi::ForwardingRoute; + friend class PassiveConnector; }; /** \brief Input connector baseclass @@ -207,8 +239,7 @@ namespace connector { be added to the queue before it can be processed. */ class InputConnector - : public virtual Connector, - public SafeBool + : public virtual Connector { typedef std::deque Queue; public: @@ -224,28 +255,15 @@ namespace connector { request cannot be fulfilled, this is considered to be a logic error in the module implementation and an exception is raised. */ - bool boolean_test (); ///< Check packet availability - /**< Using any input connector in a boolean context will - check, whether an input request can be fulfilled. This - is always possible if the queue is non-empty. If the - input is active, it also returns when the connected - passive output is not throttled so new packets can be - requested. - - Calling the operator() member is an error if this test - returns \c false - - \returns \c true if operator() can be called, \c false - otherwise */ - OutputConnector & peer(); + OutputConnector & peer() const; - queue_iterator begin(); ///< Access queue begin (head) - queue_iterator end(); ///< Access queue past-the-end (tail) - Packet peek(); ///< Return head element from the queue + queue_iterator begin() const; ///< Access queue begin (head) + queue_iterator end() const; ///< Access queue past-the-end (tail) + Packet peek() const; ///< Return head element from the queue - size_type queueSize(); ///< Return number of elements in the queue - bool empty(); ///< Return queueSize() == 0 + size_type queueSize() const; ///< Return number of elements in the queue + bool empty() const; ///< Return queueSize() == 0 protected: InputConnector(); @@ -274,7 +292,7 @@ namespace connector { public: void operator()(Packet p); ///< Send out a packet - InputConnector & peer(); + InputConnector & peer() const; protected: OutputConnector(); @@ -285,21 +303,23 @@ namespace connector { /** \brief Combination of PassiveConnector and InputConnector - In addition to the native and the forwarded throttling state, the PassiveInput manages a - queue throttling state. This state is automatically managed by a queueing discipline. The - standard queueing discipline is ThresholdQueueing, which throttles the connection whenever - the queue length reaches the high threshold and unthrottles the connection when the queue - reaches the low threshold. The default queueing discipline is + The PassiveInput automatically controls the connectors throttling state using a queueing + discipline. The standard queueing discipline is ThresholdQueueing, which throttles the + connection whenever the queue length reaches the high threshold and unthrottles the + connection when the queue reaches the low threshold. The default queueing discipline is ThresholdQueueing(1,0) which will throttle the input whenever the queue is non-empty. */ class PassiveInput - : public PassiveConnector, public InputConnector + : public PassiveConnector, public InputConnector, + public SafeBool { public: PassiveInput(); - ActiveOutput & peer(); + ActiveOutput & peer() const; + + bool boolean_test() const; template void qdisc(QDisc const & disc); ///< Change the queueing discipline @@ -311,18 +331,21 @@ namespace connector { private: void v_enqueueEvent(); void v_dequeueEvent(); + void v_unthrottleEvent(); boost::scoped_ptr qdisc_; - QueueingDiscipline::State qstate_; }; /** \brief Combination of PassiveConnector and OutputConnector */ class PassiveOutput - : public PassiveConnector, public OutputConnector + : public PassiveConnector, public OutputConnector, + public SafeBool { public: - ActiveInput & peer(); + ActiveInput & peer() const; + + bool boolean_test() const; void connect(ActiveInput & target); @@ -332,10 +355,13 @@ namespace connector { /** \brief Combination of ActiveConnector and InputConnector */ class ActiveInput - : public ActiveConnector, public InputConnector + : public ActiveConnector, public InputConnector, + public SafeBool { public: - PassiveOutput & peer(); + PassiveOutput & peer() const; + + bool boolean_test() const; void request(); ///< request more packets without dequeuing any packet @@ -346,10 +372,13 @@ namespace connector { /** \brief Combination of ActiveConnector and OutputConnector */ class ActiveOutput - : public ActiveConnector, public OutputConnector + : public ActiveConnector, public OutputConnector, + public SafeBool { public: - PassiveInput & peer(); + PassiveInput & peer() const; + + bool boolean_test() const; void connect(PassiveInput & target); };