X-Git-Url: http://g0dil.de/git?a=blobdiff_plain;f=PPI%2FConnectors.cc;h=4c5a3ea3f7cd7e68263035c677371a9553b868ee;hb=bd9f9d3fd6fbcff0112a7bf48ab9284da9576b11;hp=d8e74dcaf77819bd478937b731c0897370aafaaa;hpb=81ffa1c459b96dd44472bcef37e1e373934ee138;p=senf.git diff --git a/PPI/Connectors.cc b/PPI/Connectors.cc index d8e74dc..4c5a3ea 100644 --- a/PPI/Connectors.cc +++ b/PPI/Connectors.cc @@ -1,8 +1,8 @@ // $Id$ // -// Copyright (C) 2007 -// Fraunhofer Institut fuer offene Kommunikationssysteme (FOKUS) -// Kompetenzzentrum fuer Satelitenkommunikation (SatCom) +// Copyright (C) 2007 +// Fraunhofer Institute for Open Communication Systems (FOKUS) +// Competence Center NETwork research (NET), St. Augustin, GERMANY // Stefan Bund // // This program is free software; you can redistribute it and/or modify @@ -24,32 +24,182 @@ \brief Connectors non-inline non-template implementation */ #include "Connectors.hh" -//#include "Connectors.ih" +#include "Connectors.ih" // Custom includes +#include "Route.hh" +#include "Module.hh" +#include "ModuleManager.hh" //#include "Connectors.mpp" #define prefix_ ///////////////////////////////cc.p//////////////////////////////////////// /////////////////////////////////////////////////////////////////////////// -// protected members - -prefix_ senf::ppi::connector::Connector::~Connector() -{} +// senf::ppi::connector::Connector prefix_ void senf::ppi::connector::Connector::connect(Connector & target) { + // The connector is not registered -> route() or noroute() statement missing + SENF_ASSERT( module_ && + "senf::ppi::connector::Connector::connect(): (source) " + "Missing route() or noroute()" ); + // The connector is already connected + SENF_ASSERT( ! peer_ && + "senf::ppi::connector::Connector::connect(): (source) " + "duplicate connection" ); + // The target connector is not registered -> route() or noroute() statement missing + SENF_ASSERT( target.module_ && + "senf::ppi::connector::Connector::connect(): (target) " + "Missing route() or noroute()" ); + // The target connector is already connected + SENF_ASSERT( ! target.peer_ && + "senf::ppi::connector::Connector::connect(): (target) " + "duplicate connection" ); + if (! (packetTypeID() == typeid(void) || + target.packetTypeID() == typeid(void) || + packetTypeID() == target.packetTypeID()) ) + throw IncompatibleConnectorsException() + << ": " << prettyName(packetTypeID()) + << " [in module " << prettyName(typeid(*module_)) << "] " + << ", " << prettyName(target.packetTypeID()) + << " [in module " << prettyName(typeid(*target.module_)) << "]"; + peer_ = & target; target.peer_ = this; + + if (! initializationScheduled()) + enqueueInitializable(); + if (! peer().initializationScheduled()) + peer().enqueueInitializable(); +} + +prefix_ void senf::ppi::connector::Connector::disconnect() +{ + // Cannot disconnected a non-connected connector + SENF_ASSERT( peer_ && + "senf::ppi::connector::Connector::disconnect(): Not connected" ); + Connector & peer (*peer_); + peer_ = 0; + peer.peer_ = 0; + + if (! initializationScheduled()) + enqueueInitializable(); + if (! peer.initializationScheduled()) + peer.enqueueInitializable(); +} + +prefix_ std::type_info const & senf::ppi::connector::Connector::packetTypeID() +{ + return typeid(void); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::PassiveConnector + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::PassiveConnector::v_init() +{ + Routes::const_iterator i (routes_.begin()); + Routes::const_iterator const i_end (routes_.end()); + for (; i != i_end; ++i) + if ((*i)->throttled()) + break; + if (i == i_end) + remoteThrottled_ = false; + if (throttled()) + emitThrottle(); + else + emitUnthrottle(); +} + +prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent() +{} + +prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle() +{ + if (throttled() && !nativeThrottled_) { + Routes::const_iterator i (routes_.begin()); + Routes::const_iterator const i_end (routes_.end()); + for (; i != i_end; ++i) + if ((*i)->throttled()) + break; + if (i == i_end) { + remoteThrottled_ = false; + emitUnthrottle(); + } + } + else + remoteThrottled_ = false; +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::ActiveConnector + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::ActiveConnector::v_init() +{ + if (! connected()) + notifyThrottle(); +} + +prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle() +{ + if (! throttled_) { + throttled_ = true; + if (throttleCallback_) + throttleCallback_(); + NotifyRoutes::const_iterator i (notifyRoutes_.begin()); + NotifyRoutes::const_iterator const i_end (notifyRoutes_.end()); + for (; i != i_end; ++i) + (*i)->notifyThrottle(); + } +} + +prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle() +{ + if (throttled_) { + throttled_ = false; + if (unthrottleCallback_) + unthrottleCallback_(); + NotifyRoutes::const_iterator i (notifyRoutes_.begin()); + NotifyRoutes::const_iterator const i_end (notifyRoutes_.end()); + for (; i != i_end; ++i) + (*i)->notifyUnthrottle(); + } +} + +prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route) +{ + notifyRoutes_.push_back(&route); } /////////////////////////////////////////////////////////////////////////// // senf::ppi::connector::InputConnector +prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()() +{ + if (empty()) + v_requestEvent(); + Packet p; + if (! empty()) { + p = peek(); + queue_.pop_back(); + v_dequeueEvent(); + } + return p; +} + //////////////////////////////////////// // private members +prefix_ void senf::ppi::connector::InputConnector::v_requestEvent() +{} + prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent() {} @@ -57,22 +207,43 @@ prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent() {} /////////////////////////////////////////////////////////////////////////// -// senf::ppi::connector::PassiveInput +// senf::ppi::connector::GenericActiveInput + +//////////////////////////////////////// +// private members + +prefix_ void senf::ppi::connector::GenericActiveInput::v_requestEvent() +{ + request(); +} + +/////////////////////////////////////////////////////////////////////////// +// senf::ppi::connector::GenericPassiveInput //////////////////////////////////////// // private members -prefix_ void senf::ppi::connector::PassiveInput::v_enqueueEvent() +prefix_ void senf::ppi::connector::GenericPassiveInput::v_enqueueEvent() { - ///\fixme Emit notifications when qstate_ changes - qstate_ = qdisc_->update(*this, QueueingDiscipline::ENQUEUE); emit(); + qdisc_->update(*this, QueueingDiscipline::ENQUEUE); +} + +prefix_ void senf::ppi::connector::GenericPassiveInput::v_dequeueEvent() +{ + qdisc_->update(*this, QueueingDiscipline::DEQUEUE); } -prefix_ void senf::ppi::connector::PassiveInput::v_dequeueEvent() +prefix_ void senf::ppi::connector::GenericPassiveInput::v_unthrottleEvent() { - ///\fixme Emit notifications when qstate_ changes - qstate_ = qdisc_->update(*this, QueueingDiscipline::DEQUEUE); + size_type n (queueSize()); + while (n) { + emit(); + size_type nn (queueSize()); + if (n == nn) + break; + n = nn; + } } ///////////////////////////////cc.e////////////////////////////////////////