9bfc88d323b894e73d9f45ab572ce74cf2c1ada5
[senf.git] / senf / PPI / QueueingSocketSink.hh
1 // $Id$
2 //
3 // Copyright (C) 2010
4 // Fraunhofer Institute for Open Communication Systems (FOKUS)
5 // Competence Center NETwork research (NET), St. Augustin, GERMANY
6 //     Thorsten Horstmann <tho@berlios.de>
7 //
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12 //
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU General Public License
19 // along with this program; if not, write to the
20 // Free Software Foundation, Inc.,
21 // 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
22
23 /** \file
24     \brief QueueingSocketSink public header */
25
26 #ifndef HH_SENF_PPI_QueueingSocketSink_
27 #define HH_SENF_PPI_QueueingSocketSink_ 1
28
29 // Custom includes
30 #include <queue>
31 #include "SocketSink.hh"
32 #include <senf/Utils/Console/ScopedDirectory.hh>
33
34 //#include "QueueingSocketSink.mpp"
35 ///////////////////////////////hh.p////////////////////////////////////////
36
37 namespace senf {
38 namespace ppi {
39
40     class QueueingAlgorithm
41         : private boost::noncopyable
42     {
43         console::ScopedDirectory<QueueingAlgorithm> dir_;
44
45     public:
46         typedef QueueingAlgorithm * ptr;
47
48         virtual ~QueueingAlgorithm() {};
49
50         console::DirectoryNode & consoleDir();
51         Packet dequeue();
52         bool enqueue(Packet const & packet);
53         unsigned size();
54         void clear();
55
56     protected:
57         QueueingAlgorithm();
58
59         virtual Packet v_dequeue() = 0;
60         virtual bool v_enqueue(Packet const & packet) = 0;
61         virtual unsigned v_size() const = 0;
62         virtual void v_clear() = 0;
63     };
64
65
66     namespace detail {
67         struct QueueingAlgorithmRegistry_EntryBase
68         {
69             virtual QueueingAlgorithm::ptr create() const = 0;
70         };
71
72         template <class QAlgorithm>
73         struct QueueingAlgorithmRegistry_Entry : QueueingAlgorithmRegistry_EntryBase
74         {
75             virtual QueueingAlgorithm::ptr create() const;
76         };
77     }
78
79     class QueueingAlgorithmRegistry
80         : public senf::singleton<QueueingAlgorithmRegistry>
81     {
82         typedef boost::ptr_map<std::string, detail::QueueingAlgorithmRegistry_EntryBase> QAlgoMap;
83         QAlgoMap qAlgoMap_;
84
85         QueueingAlgorithmRegistry() {};
86     public:
87         using senf::singleton<QueueingAlgorithmRegistry>::instance;
88         friend class senf::singleton<QueueingAlgorithmRegistry>;
89
90         struct Exception : public senf::Exception {
91             Exception(std::string const & descr) : senf::Exception(descr) {}
92         };
93
94         template <class QAlgorithm>
95         struct RegistrationProxy {
96             RegistrationProxy(std::string const & key);
97         };
98
99         template <class QAlgorithm>
100         void registerQAlgorithm(std::string key);
101
102         QueueingAlgorithm::ptr createQAlgorithm(std::string const & key) const;
103         void dump(std::ostream & os) const;
104     };
105
106
107 #   define SENF_PPI_REGISTER_QALGORITHM( key, QAlgorithm )                          \
108         namespace {                                                                 \
109             senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm>     \
110                 BOOST_PP_CAT(qAlgorithmRegistration_, __LINE__)( key);              \
111         }
112
113
114     class FIFOQueueingAlgorithm : public QueueingAlgorithm
115     {
116         std::queue<Packet> queue_;
117         unsigned max_size_;
118
119         FIFOQueueingAlgorithm();
120
121         virtual Packet v_dequeue();
122         virtual bool v_enqueue(Packet const & packet);
123         virtual unsigned v_size() const;
124         virtual void v_clear();
125
126     public:
127         static QueueingAlgorithm::ptr create();
128     };
129
130
131 namespace module {
132
133     /** \brief QueueingSocketSink
134
135         \ingroup io_modules
136      */
137     template <class Writer=ConnectedDgramWriter>
138     class PassiveQueueingSocketSink : public Module
139     {
140         SENF_PPI_MODULE(PassiveQueueingSocketSink);
141
142     public:
143         typedef typename Writer::Handle Handle; ///< Handle type requested by writer
144         typedef typename Writer::PacketType PacketType;
145
146         connector::PassiveInput<PacketType> input; ///< Input connector from which data is received
147         console::ScopedDirectory<PassiveQueueingSocketSink<Writer> > dir;
148
149         explicit PassiveQueueingSocketSink(Handle const & handle, QueueingAlgorithm::ptr qAlgorithm);
150
151         Writer & writer();              ///< Access the Writer
152         Handle & handle();              ///< Access handle
153         void handle(Handle const & handle);
154                                         ///< Set handle
155                                         /**< Assigning an empty or in-valid() handle will disable
156                                              the module until a new valid handle is assigned. */
157         QueueingAlgorithm & qAlgorithm();
158         void qAlgorithm(QueueingAlgorithm::ptr qAlgorithm);
159
160     private:
161         void write();
162         void writable();
163         void checkThrottle();
164         void setQAlgorithm(std::string const & key);
165
166         Handle handle_;
167         Writer writer_;
168         boost::scoped_ptr<QueueingAlgorithm> qAlgo_;
169         IOEvent event_;
170     };
171
172 }}}
173
174 ///////////////////////////////hh.e////////////////////////////////////////
175 #include "QueueingSocketSink.cci"
176 #include "QueueingSocketSink.ct"
177 #include "QueueingSocketSink.cti"
178 #endif
179
180 \f
181 // Local Variables:
182 // mode: c++
183 // fill-column: 100
184 // c-file-style: "senf"
185 // indent-tabs-mode: nil
186 // ispell-local-dictionary: "american"
187 // compile-command: "scons -u test"
188 // comment-column: 40
189 // End: