--- /dev/null
+// $Id$
+//
+// Copyright (C) 2008
+// Fraunhofer Institute for Open Communication Systems (FOKUS)
+// Competence Center NETwork research (NET), St. Augustin, GERMANY
+// Stefan Bund <g0dil@berlios.de>
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the
+// Free Software Foundation, Inc.,
+// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+/** \file
+ \brief MultiMCLoop non-inline non-template implementation */
+
+//#include "MultiMCLoop.hh"
+//#include "MultiMCLoop.ih"
+
+// Custom includes
+#include <senf/Scheduler/ClockService.hh>
+#include <senf/Scheduler/Scheduler.hh>
+#include <senf/Utils/membind.hh>
+#include <senf/Socket/Protocols/INet.hh>
+#include <boost/format.hpp>
+
+//#include "MultiMCLoop.mpp"
+#define prefix_
+///////////////////////////////cc.p////////////////////////////////////////
+
+typedef senf::UDPv4ClientSocketHandle UDPSocket;
+
+class MCReader
+{
+ std::string name;
+ UDPSocket socket;
+ senf::scheduler::FdEvent event;
+
+ void handler(int events);
+
+public:
+ MCReader(unsigned n, std::string const & name, UDPSocket::Address const & group);
+};
+
+prefix_ MCReader::MCReader(unsigned n, std::string const & name_,
+ UDPSocket::Address const & group)
+ : name (name_), socket (),
+ event (name, senf::membind(&MCReader::handler, this), socket,
+ senf::scheduler::FdEvent::EV_READ)
+{
+ socket.protocol().reuseaddr(true);
+ socket.bind(group);
+ socket.protocol().bindInterface("dummy0");
+ socket.protocol().mcAddMembership(group.address(), "dummy0");
+}
+
+prefix_ void MCReader::handler(int events)
+{
+ std::cout << "I " << name << ": " << socket.read() << "\n";
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+class MCWriter
+{
+ std::string name;
+ UDPSocket::Address group;
+ UDPSocket socket;
+ senf::ClockService::clock_type interval;
+ senf::scheduler::TimerEvent event;
+ unsigned count;
+
+ void handler();
+
+public:
+ MCWriter(std::string const & name, UDPSocket::Address const & group,
+ senf::ClockService::clock_type interval);
+};
+
+prefix_ MCWriter::MCWriter(std::string const & name_, UDPSocket::Address const & group_,
+ senf::ClockService::clock_type interval_)
+ : name (name_), group (group_), socket (), interval (interval_),
+ event (name, senf::membind(&MCWriter::handler, this), senf::ClockService::now() + interval),
+ count (0)
+{}
+
+prefix_ void MCWriter::handler()
+{
+ std::stringstream ss;
+ ss << name << "-" << ++count;
+ std::cout << "O " << name << ": " << ss.str() << "\n";
+ socket.writeto(group, ss.str());
+ event.timeout(senf::scheduler::eventTime() + interval);
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+class IfSetup
+{
+ std::string iface;
+
+public:
+ IfSetup(std::string const & iface_);
+ ~IfSetup();
+
+ void sys(std::string const & cmd);
+
+ struct SystemException : public senf::Exception
+ { SystemException() : senf::Exception("IfSetup::SystemException") {} };
+
+};
+
+prefix_ IfSetup::IfSetup(std::string const & iface_)
+ : iface (iface_)
+{
+ sys((boost::format("ifconfig %s up") % iface).str());
+ sys((boost::format("ifconfig %s 192.168.192.1") % iface).str());
+ sys((boost::format("ip route add 224.0.0.0/4 dev %s") % iface).str());
+}
+
+prefix_ IfSetup::~IfSetup()
+{
+ try {
+ sys((boost::format("ifconfig %s down") % iface).str());
+ }
+ catch (SystemException & ex) {
+ std::cerr << ex.what() << "\n";
+ }
+}
+
+prefix_ void IfSetup::sys(std::string const & cmd)
+{
+ int rv (system(cmd.c_str()));
+ if (rv != 0)
+ throw SystemException() << ": code " << rv << ": " << cmd;
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+void sigintHandler(siginfo_t const &)
+{
+ senf::scheduler::terminate();
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+int main(int argc, char * argv[])
+{
+ try {
+ IfSetup setup ("dummy0");
+
+ senf::scheduler::SignalEvent sigint (SIGINT, &sigintHandler);
+
+ UDPSocket::Address g1 ("225.1:43434");
+ UDPSocket::Address g2 ("225.2:43434");
+
+ MCReader r1g1 (1u, "r1g1", g1);
+ MCReader r2g1 (2u, "r2g1", g1);
+ MCReader r1g2 (3u, "r1g2", g2);
+ MCReader r2g2 (4u, "r2g2", g2);
+
+ MCWriter w1g1 ("w1g1", g1, senf::ClockService::milliseconds(600));
+ MCWriter w2g1 ("w2g1", g1, senf::ClockService::milliseconds(800));
+ MCWriter w1g2 ("w1g2", g2, senf::ClockService::milliseconds(700));
+ MCWriter w2g2 ("w2g2", g2, senf::ClockService::milliseconds(900));
+
+ senf::scheduler::process();
+ }
+ catch (std::exception const & ex) {
+ std::cerr << ex.what() << "\n";
+ return 1;
+ }
+};
+
+///////////////////////////////cc.e////////////////////////////////////////
+#undef prefix_
+//#include "MultiMCLoop.mpp"
+
+\f
+// Local Variables:
+// mode: c++
+// fill-column: 100
+// comment-column: 40
+// c-file-style: "senf"
+// indent-tabs-mode: nil
+// ispell-local-dictionary: "american"
+// compile-command: "scons -u multimcloop"
+// End:
--- /dev/null
+Import('env')
+import SENFSCons
+
+###########################################################################
+
+SENFSCons.Binary(env, 'multimcloop', [ 'MultiMCLoop.cc' ],
+ LIBS = [ 'Scheduler', 'Socket', 'Utils' ])
// Custom includes
#include "Route.hh"
#include "Module.hh"
+#include "ModuleManager.hh"
//#include "Connectors.mpp"
#define prefix_
peer_ = & target;
target.peer_ = this;
+
+ if (ModuleManager::instance().running())
+ v_init();
}
prefix_ std::type_info const & senf::ppi::connector::Connector::packetTypeID()
////////////////////////////////////////
// private members
+prefix_ void senf::ppi::connector::PassiveConnector::v_init()
+{
+ Routes::const_iterator i (routes_.begin());
+ Routes::const_iterator const i_end (routes_.end());
+ for (; i != i_end; ++i)
+ if ((*i)->throttled())
+ break;
+ if (i == i_end)
+ remoteThrottled_ = false;
+ if (throttled())
+ emitThrottle();
+ else
+ emitUnthrottle();
+}
+
+prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
+{}
+
prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle()
{
if (throttled() && !nativeThrottled_) {
////////////////////////////////////////
// private members
+prefix_ void senf::ppi::connector::ActiveConnector::v_init()
+{
+ if (! connected())
+ notifyThrottle();
+}
+
prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
{
- if (throttleCallback_)
- throttleCallback_();
- NotifyRoutes::const_iterator i (notifyRoutes_.begin());
- NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
- for (; i != i_end; ++i)
- (*i)->notifyThrottle();
+ if (! throttled_) {
+ throttled_ = true;
+ if (throttleCallback_)
+ throttleCallback_();
+ NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+ NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+ for (; i != i_end; ++i)
+ (*i)->notifyThrottle();
+ }
}
prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
{
- if (unthrottleCallback_)
- unthrottleCallback_();
- NotifyRoutes::const_iterator i (notifyRoutes_.begin());
- NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
- for (; i != i_end; ++i)
- (*i)->notifyUnthrottle();
+ if (throttled_) {
+ throttled_ = false;
+ if (unthrottleCallback_)
+ unthrottleCallback_();
+ NotifyRoutes::const_iterator i (notifyRoutes_.begin());
+ NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
+ for (; i != i_end; ++i)
+ (*i)->notifyUnthrottle();
+ }
}
prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route)
prefix_ senf::ppi::connector::Connector::~Connector()
{
- if (peer_)
+ if (connected())
peer_->peer_ = 0;
}
+prefix_ bool senf::ppi::connector::Connector::connected()
+ const
+{
+ return peer_;
+}
+
////////////////////////////////////////
// private members
module_ = &module;
}
+prefix_ void senf::ppi::connector::Connector::init()
+{
+ v_init();
+}
+
///////////////////////////////////////////////////////////////////////////
// senf::ppi::connector::PassiveConnector
prefix_ void senf::ppi::connector::PassiveConnector::emitThrottle()
{
- peer().notifyThrottle();
+ if (connected())
+ peer().notifyThrottle();
}
prefix_ void senf::ppi::connector::PassiveConnector::emitUnthrottle()
{
- peer().notifyUnthrottle();
- v_unthrottleEvent();
+ if (connected()) {
+ peer().notifyUnthrottle();
+ v_unthrottleEvent();
+ }
}
prefix_ void senf::ppi::connector::PassiveConnector::notifyThrottle()
remoteThrottled_ = true;
}
-prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
-{}
-
prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
{
routes_.push_back(&route);
prefix_ bool senf::ppi::connector::ActiveConnector::throttled()
const
{
- return peer().throttled();
+ return ! connected() || peer().throttled();
}
////////////////////////////////////////
// protected members
prefix_ senf::ppi::connector::ActiveConnector::ActiveConnector()
- : throttleCallback_(), unthrottleCallback_(), notifyRoutes_()
+ : throttleCallback_(), unthrottleCallback_(), notifyRoutes_(), throttled_(false)
{}
///////////////////////////////////////////////////////////////////////////
prefix_ void senf::ppi::connector::OutputConnector::operator()(Packet p)
{
- peer().enqueue(p);
+ if (connected())
+ peer().enqueue(p);
}
prefix_ void senf::ppi::connector::OutputConnector::write(Packet p)
prefix_ bool senf::ppi::connector::GenericActiveInput::boolean_test()
const
{
- return ! empty() || ! peer().throttled();
+ return ! empty() || (connected() && ! peer().throttled());
}
prefix_ void senf::ppi::connector::GenericActiveInput::request()
{
- peer().emit();
+ if (connected())
+ peer().emit();
}
prefix_ senf::ppi::connector::GenericActiveInput::GenericActiveInput()
prefix_ bool senf::ppi::connector::GenericActiveOutput::boolean_test()
const
{
- return ! peer().throttled();
+ return connected() && ! peer().throttled();
}
prefix_ void senf::ppi::connector::GenericActiveOutput::connect(GenericPassiveInput & target)
\li it has an (optional) packet type
\e Active connectors are activated from within the module, \e passive connectors are
- signaled by the external framework. \e Input modules receive packets, \e output modules send
- packets.
+ signaled by the external framework. \e Input connectors receive packets, \e output
+ connectors send packets.
All passive connectors call some onRequest callback whenever I/O needs to be performed. All
- input modules possess a packet queue.
+ input connectors possess a packet queue.
We therefore have 4 connector types each of which is parameterized by the type of packet
traversing the connector:
private:
void onRequest() {
// 'input()' will return a senf::EthernetPacket packet handle
- try { output( input().find<IpPacket>() ); }
+ try { output( input().find<senf::IpPacket>() ); }
catch (senf::InvalidPacketChainException & ex) { ; }
}
};
Connector & peer() const; ///< Get peer connected to this connector
module::Module & module() const; ///< Get this connectors containing module
+ bool connected() const; ///< \c true, if connector connected, \c false otherwise
+
protected:
Connector();
virtual ~Connector();
void connect(Connector & target);
-
+
private:
virtual std::type_info const & packetTypeID();
void setModule(module::Module & module);
+ void init();
+ virtual void v_init() = 0;
Connector * peer_;
module::Module * module_;
void emit();
private:
+ virtual void v_init();
+
// Called by the routing to change the remote throttling state
void notifyThrottle(); ///< Forward a throttle notification to this connector
void notifyUnthrottle(); ///< Forward an unthrottle notification to this connector
ActiveConnector();
private:
+ virtual void v_init();
+
// called by the peer() to forward throttling notifications
void notifyThrottle();
void notifyUnthrottle();
typedef std::vector<ForwardingRoute*> NotifyRoutes;
NotifyRoutes notifyRoutes_;
+ bool throttled_;
+
friend class senf::ppi::ForwardingRoute;
friend class PassiveConnector;
};
{
public:
operator()(PacketType packet); ///< Send out a packet
- write(PacketType packet); ///< Alias for operator()
+ void write(PacketType packet); ///< Alias for operator()
};
/** \brief Connector passively providing packets
{
public:
operator()(PacketType packet); ///< Send out a packet
- write(PacketType packet); ///< Alias for operator()
+ void write(PacketType packet); ///< Alias for operator()
};
#endif
////////////////////////////////////////
// private members
-prefix_ void senf::ppi::module::debug::PassiveSource::init()
+prefix_ void senf::ppi::module::debug::PassiveSource::v_init()
{
if (empty())
output.throttle();
private:
void request();
- void init();
+ virtual void v_init();
Queue packets_;
};
// private members
prefix_ void senf::ppi::module::Module::init()
+{
+ ConnectorRegistry::iterator i (connectorRegistry_.begin());
+ ConnectorRegistry::iterator i_end (connectorRegistry_.end());
+ for (; i != i_end; ++i)
+ (*i)->init();
+ v_init();
+}
+
+prefix_ void senf::ppi::module::Module::v_init()
{}
prefix_ senf::ppi::EventManager & senf::ppi::module::Module::eventManager()
private:
#endif
- virtual void init(); ///< Called just before the network is run
+ void init();
+ virtual void v_init();
#ifndef DOXYGEN
public:
////////////////////////////////////////
// private members
-prefix_ void senf::ppi::module::PassiveQueue::init()
+prefix_ void senf::ppi::module::PassiveQueue::v_init()
{
- output.throttle();
+ if (!input)
+ output.throttle();
}
prefix_ void senf::ppi::module::PassiveQueue::onInput()
\see connector::GenericPassiveInput::qdisc() */
private:
- void init();
+ virtual void v_init();
void onInput();
void onOutput();
prefix_ void senf::INet6MulticastSocketProtocol::mcAddMembership(INet6Address const & mcAddr,
std::string const & iface)
+ const
{
if (mcAddr.inet4Mapped()) {
struct ip_mreqn mreqn;
}
namespace {
-void mc6SSMSourceRequest(int operation, int fd, senf::INet6Address const & group,
- senf::INet6Address const & source, int const & ifacei)
-{
- struct group_source_req req;
- ::memset(&req, 0, sizeof(req));
- req.gsr_interface = ifacei;
- req.gsr_group.ss_family = AF_INET6;
- std::copy(group.begin(), group.end(),
- reinterpret_cast<struct sockaddr_in6&>(req.gsr_group).sin6_addr.s6_addr);
- req.gsr_source.ss_family = AF_INET6;
- std::copy(source.begin(), source.end(),
- reinterpret_cast<struct sockaddr_in6&>(req.gsr_source).sin6_addr.s6_addr);
- if (::setsockopt(fd, SOL_IPV6, MCAST_JOIN_SOURCE_GROUP, &req, sizeof(req)) < 0)
- SENF_THROW_SYSTEM_EXCEPTION("::setsockopt()");
-}
-void mc6SSMSourceRequest(int operation, int fd, senf::INet6Address const & group,
- senf::INet6Address const & source, std::string const & iface)
-{
- int ifacei = if_nametoindex(iface.c_str());
- if (ifacei == 0)
- throw senf::SystemException("::if_nametoindex()", ENOENT SENF_EXC_DEBUGINFO);
- mc6SSMSourceRequest(operation, fd, group, source, ifacei);
+
+ void mc6SSMSourceRequest(int operation, int fd, senf::INet6Address const & group,
+ senf::INet6Address const & source, int ifacei)
+ {
+ struct group_source_req req;
+ ::memset(&req, 0, sizeof(req));
+ req.gsr_interface = ifacei;
+ req.gsr_group.ss_family = AF_INET6;
+ std::copy(group.begin(), group.end(),
+ reinterpret_cast<struct sockaddr_in6&>(req.gsr_group).sin6_addr.s6_addr);
+ req.gsr_source.ss_family = AF_INET6;
+ std::copy(source.begin(), source.end(),
+ reinterpret_cast<struct sockaddr_in6&>(req.gsr_source).sin6_addr.s6_addr);
+ if (::setsockopt(fd, SOL_IPV6, MCAST_JOIN_SOURCE_GROUP, &req, sizeof(req)) < 0)
+ SENF_THROW_SYSTEM_EXCEPTION("::setsockopt()");
+ }
+
+ void mc6SSMSourceRequest(int operation, int fd, senf::INet6Address const & group,
+ senf::INet6Address const & source, std::string const & iface)
+ {
+ int ifacei (0);
+ if (! iface.empty()) {
+ ifacei = if_nametoindex(iface.c_str());
+ if (ifacei == 0)
+ throw senf::SystemException("::if_nametoindex()", ENOENT SENF_EXC_DEBUGINFO);
+ }
+ mc6SSMSourceRequest(operation, fd, group, source, ifacei);
}
}
{
mc6SSMSourceRequest(MCAST_JOIN_SOURCE_GROUP, fd(), group, source, ifacei);
}
-prefix_ void senf::INet6MulticastSocketProtocol::mcJoinSSMSource(INet6Address const & group,
- INet6Address const & source)
- const
-{
- mc6SSMSourceRequest(MCAST_JOIN_SOURCE_GROUP, fd(), group, source, 0);
-}
prefix_ void senf::INet6MulticastSocketProtocol::mcLeaveSSMSource(INet6Address const & group,
INet6Address const & source,
bool mcLoop() const; ///< Return current multicast loopback state.
void mcLoop(bool value) const; ///< Set multicast loopback state
- /**< If set to false via \c mcLoop(value) multicast messages will not be looped back to local sockets. Default value is \c true (1). */
+ /**< If set to false via \c mcLoop(value) multicast messages
+ will not be looped back to local sockets. Default value
+ is \c true. */
void mcIface(std::string const & iface = std::string()) const;
///< Set multicast send interface of the socket
/**< \param[in] iface name of interface to send multicast
- data from */
+ data from
+
+ Under current linux versions this option is broken at
+ best. Don't use. */
};
/** \brief Multicast protocol facet for INet4 addressable multicast enabled sockets
groups received. The group is joined on the default
interface.
\param[in] mcAddr address of group to join */
- void mcAddMembership(INet4Address const & mcAddr, INet4Address const & localAddr)
- const;
+ void mcAddMembership(INet4Address const & mcAddr, INet4Address const & localAddr) const;
///< join multicast group on a specific interface
/**< This member will add \a mcAddr to the list of multicast
groups received. The group is joined on the interface
groups received. The group is joined on the default
interface.
\param[in] mcAddr address of group to join */
- void mcAddMembership(INet6Address const & mcAddr, std::string const & iface);
+ void mcAddMembership(INet6Address const & mcAddr, std::string const & iface) const;
///< join multicast group on a specific interface
/**< This member will add \a mcAddr to the list of multicast
groups received. The group is joined on the given
\param[in] group multicast group to join
\param[in] source SSM multicast source to join the
group on
- \param[in] iface interface to join the group on */
+ \param[in] iface interface to join the group on. If set
+ to the empty string, use the default interface. */
void mcJoinSSMSource(INet6Address const & group, INet6Address const & source,
- int ifacei) const;
+ int ifacei = 0) const;
///< join SSM multicast group
/**< This call will join the multicast group \a group for
traffic from \a source. A single group may be joined
\param[in] group multicast group to join
\param[in] source SSM multicast source to join the
group on
- \param[in] ifacei interface index to join the group on */
- void mcJoinSSMSource(INet6Address const & group, INet6Address const & source) const;
- ///< join SSM multicast group
- /**< This call will join the multicast group \a group for
- traffic from \a source. A single group may be joined
- multiple times on different sources.
- \param[in] group multicast group to join
- \param[in] source SSM multicast source to join the
+ \param[in] ifacei optional interface index to join the
group on */
void mcLeaveSSMSource(INet6Address const & group, INet6Address const & source,
std::string const & iface) const;
*/
class RawINetSocketProtocol
- : public virtual SocketProtocol
+ : public virtual INetSocketProtocol
{
public:
///\name Abstract Interface Implementation
// Custom includes
#include "../../../Socket/SocketProtocol.hh"
+#include "INetSocketProtocol.hh"
//#include "TCPSocketProtocol.mpp"
///////////////////////////////hh.p////////////////////////////////////////
which are available on any TCP socket.
*/
class TCPSocketProtocol
- : public virtual SocketProtocol
+ : public virtual INetSocketProtocol
{
public:
bool nodelay() const; ///< Check current \c SO_NODELAY status
// Custom includes
#include "../../../Socket/SocketProtocol.hh"
#include "INetAddressing.hh"
+#include "INetSocketProtocol.hh"
//#include "UDPSocketProtocol.mpp"
///////////////////////////////hh.p////////////////////////////////////////
which are available on any UDP socket.
*/
class UDPSocketProtocol
- : public virtual SocketProtocol
+ : public virtual INetSocketProtocol
{
public:
///\name Abstract Interface Implementation