Socket: Ignore ECONNREFUSED on write to datagram socket
[senf.git] / Examples / RateStuffer / ratestuffer.cc
index 7c1d1c7..7969d59 100644 (file)
 #include "PPI/Module.hh"
 #include "PPI/IntervalTimer.hh"
 #include "PPI/Joins.hh"
+#include "PPI/ThrottleBarrier.hh"
 #include "PPI/PassiveQueue.hh"
+#include "PPI/Queueing.hh"
+#include "PPI/CloneSource.hh"
 #include "PPI/Setup.hh"
 
 //#include "ppitest.mpp"
@@ -80,36 +83,40 @@ void RateFilter::timeout()
 }
 
 // ////////////////////////////////////////////////////////////////////////
-// CopyPacketGenerator
 
-class CopyPacketGenerator
-    : public module::Module
+class RateStuffer
 {
-    SENF_PPI_MODULE(CopyPacketGenerator);
-public:
-
-    connector::PassiveOutput output;
-
-    CopyPacketGenerator(senf::Packet p);
+    module::ThrottleBarrier barrier;
+    module::PassiveQueue    queue;
+    module::CloneSource     generator;
+    module::PriorityJoin    join;
+    RateFilter              rateFilter;
 
-private:
-    void request();
-
-    senf::Packet packet;
+public:
+    connector::PassiveInput & input;
+    connector::ActiveOutput & output;
+
+    RateStuffer(senf::ClockService::clock_type interval, 
+                senf::Packet packet,
+                unsigned high = 1,
+                unsigned low  = 0)
+    :   barrier    (),
+        queue      (),
+        generator  ( packet ),
+        join       (),
+        rateFilter ( interval ),
+        input      ( barrier.input ),
+        output     ( rateFilter.output )
+    {
+        ppi::connect( barrier,    queue      );
+        ppi::connect( queue,      join       );
+        ppi::connect( generator,  join       );
+        ppi::connect( join,       rateFilter );
+
+        queue.qdisc(ppi::ThresholdQueueing(high,low));
+    }
 };
-
-CopyPacketGenerator::CopyPacketGenerator(senf::Packet p)
-    : packet(p) 
-{
-    noroute(output);
-    output.onRequest(&CopyPacketGenerator::request);
-}
-
-void CopyPacketGenerator::request()
-{
-    output(packet);
-}
-
+        
 // ////////////////////////////////////////////////////////////////////////
 // ////////////////////////////////////////////////////////////////////////
 
@@ -118,9 +125,14 @@ void CopyPacketGenerator::request()
 // 'O'        = active connector
 // '>' or '<' = input connector
 //
-// [ udpReader ] O--> [ queue ] -->O [      ]
-//                                   [ join ] -->O [ rateFilter] O--> [ udpWriter ]
-//                [ generator ] -->O [      ]
+//                   +----------------------------------------------------+
+//                   | stuffer                                            |
+//                   |                                                    |
+// [ udpReader ] O-->:---> [ queue ] -->O [      ]                        |
+//                   |                    [ join ] -->O [ rateFilter] O-->:O--> [ udpWriter ]
+//                   | [ generator ] -->O [      ]                        |
+//                   |                                                    |
+//                   +----------------------------------------------------+
 
 int main(int argc, char * argv[])
 {
@@ -130,18 +142,14 @@ int main(int argc, char * argv[])
     senf::ConnectedUDPv4ClientSocketHandle outputSocket(
         senf::INet4SocketAddress("localhost:44345"));
 
-    module::ActiveSocketReader<>  udpReader  (inputSocket);
-    module::PassiveQueue          queue;
-    CopyPacketGenerator           generator  (senf::DataPacket::create(std::string("<idle>\n")));
-    module::PriorityJoin          join;
-    RateFilter                    rateFilter (1000000000ul);
-    module::PassiveSocketWriter<> udpWriter  (outputSocket);
-
-    ppi::connect( udpReader,  queue      );
-    ppi::connect( queue,      join       );
-    ppi::connect( generator,  join       );
-    ppi::connect( join,       rateFilter );
-    ppi::connect( rateFilter, udpWriter  );
+    module::ActiveSocketReader<>  udpReader  ( inputSocket );
+    RateStuffer                   stuffer    ( 1000000000ul, 
+                                               senf::DataPacket::create(std::string("<idle>\n")),
+                                               2u, 1u );
+    module::PassiveSocketWriter<> udpWriter  ( outputSocket );
+
+    ppi::connect( udpReader, stuffer   );
+    ppi::connect( stuffer,   udpWriter );
 
     ppi::run();