switch to new MPL based Fraunhofer FOKUS Public License
[senf.git] / senf / PPI / QueueingSocketSink.test.cc
1 // $Id$
2 //
3 // Copyright (C) 2010
4 // Fraunhofer Institute for Open Communication Systems (FOKUS)
5 //
6 // The contents of this file are subject to the Fraunhofer FOKUS Public License
7 // Version 1.0 (the "License"); you may not use this file except in compliance
8 // with the License. You may obtain a copy of the License at 
9 // http://senf.berlios.de/license.html
10 //
11 // The Fraunhofer FOKUS Public License Version 1.0 is based on, 
12 // but modifies the Mozilla Public License Version 1.1.
13 // See the full license text for the amendments.
14 //
15 // Software distributed under the License is distributed on an "AS IS" basis, 
16 // WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
17 // for the specific language governing rights and limitations under the License.
18 //
19 // The Original Code is Fraunhofer FOKUS code.
20 //
21 // The Initial Developer of the Original Code is Fraunhofer-Gesellschaft e.V. 
22 // (registered association), Hansastraße 27 c, 80686 Munich, Germany.
23 // All Rights Reserved.
24 //
25 // Contributor(s):
26 //   Thorsten Horstmann <tho@berlios.de>
27
28 /** \file
29     \brief QueueingSocketSink unit tests */
30
31 #include "QueueingSocketSink.hh"
32
33 // Custom includes
34 #include <senf/Socket/Protocols/INet/UDPSocketHandle.hh>
35 #include <senf/Socket/Protocols/INet/ConnectedUDPSocketHandle.hh>
36 #include "DebugModules.hh"
37 #include "SocketSink.hh"
38 #include "Setup.hh"
39
40 #include <senf/Utils/auto_unit_test.hh>
41 #include <boost/test/test_tools.hpp>
42
43 #define prefix_
44 //-/////////////////////////////////////////////////////////////////////////////////////////////////
45 namespace ppi = senf::ppi;
46 namespace module = ppi::module;
47 namespace debug = module::debug;
48 namespace scheduler = senf::scheduler;
49
50 namespace {
51     void runPPI(senf::ClockService::clock_type t)
52     {
53         scheduler::TimerEvent timeout(
54                 "test-timeout", &scheduler::terminate, scheduler::now() + t);
55         ppi::run();
56     }
57
58     int base_pid = 0;
59
60     unsigned port(unsigned i)
61     {
62         if (! base_pid)
63             base_pid = ::getpid();
64         return 23456u + (((base_pid^(base_pid>>8)^(base_pid>>16)^(base_pid>>24))&0xff)<<2) + i;
65     }
66
67     std::string localhost4str(unsigned i)
68     {
69         return (boost::format("localhost:%d") % port(i)).str();
70     }
71
72     struct TestingConnectedDgramWriter
73         : public ppi::ConnectedDgramWriter
74     {
75         bool throttled;
76
77         bool operator()(Handle handle, PacketType const & packet)
78         {
79             if (throttled)
80                 return false;
81             return ConnectedDgramWriter::operator()( handle, packet);
82         }
83
84         TestingConnectedDgramWriter(){
85             throttled = false;
86         }
87     };
88 }
89
90 SENF_AUTO_UNIT_TEST(passiveQueueingSocketSink)
91 {
92     senf::ConnectedUDPv4ClientSocketHandle os (senf::noinit);
93
94     senf::ConnectedUDPv4ClientSocketHandle outputSocket (
95             senf::INet4SocketAddress( localhost4str(0)));
96     module::PassiveQueueingSocketSink<TestingConnectedDgramWriter> udpSink (
97             os, ppi::FIFOQueueingAlgorithm::create());
98     
99     // test re-assignment of socket
100     udpSink.handle( outputSocket);
101     
102     udpSink.writer().throttled = false;
103     debug::ActiveSource source;
104     ppi::connect(source, udpSink);
105     senf::ppi::init();
106
107     std::string data ("TEST");
108     senf::Packet p (senf::DataPacket::create(data));
109
110     senf::UDPv4ClientSocketHandle inputSocket;
111     inputSocket.bind(senf::INet4SocketAddress(localhost4str(0)));
112
113     source.submit(p);
114
115     std::string input (inputSocket.read());
116     BOOST_CHECK_EQUAL( data, input );
117     BOOST_CHECK_EQUAL( udpSink.qAlgorithm().size(), 0);
118
119     udpSink.writer().throttled = true;
120
121     source.submit(p);
122     BOOST_CHECK_EQUAL( udpSink.qAlgorithm().size(), 1);
123
124     for (int n = 0; n < 100; n++) {
125         source.submit(p);
126     }
127     // queue default size is 64
128     BOOST_CHECK_EQUAL( udpSink.qAlgorithm().size(), 64);
129
130     udpSink.writer().throttled = false;
131
132     runPPI( senf::ClockService::milliseconds(250));
133
134     inputSocket.blocking(false);
135     while (true) {
136         input = inputSocket.read();
137         if (input.empty()) break;
138         BOOST_CHECK_EQUAL( data, input );
139     }
140
141     runPPI( senf::ClockService::milliseconds(250));
142     BOOST_CHECK_EQUAL( udpSink.qAlgorithm().size(), 0);
143 }
144
145
146 //-/////////////////////////////////////////////////////////////////////////////////////////////////
147 #undef prefix_
148
149 \f
150 // Local Variables:
151 // mode: c++
152 // fill-column: 100
153 // comment-column: 40
154 // c-file-style: "senf"
155 // indent-tabs-mode: nil
156 // ispell-local-dictionary: "american"
157 // compile-command: "scons -u test"
158 // End: