PPI: optimized internal connector packet handling (queuing only if necessary, pass...
[senf.git] / senf / PPI / Connectors.cc
1 // $Id$
2 //
3 // Copyright (C) 2007
4 // Fraunhofer Institute for Open Communication Systems (FOKUS)
5 //
6 // The contents of this file are subject to the Fraunhofer FOKUS Public License
7 // Version 1.0 (the "License"); you may not use this file except in compliance
8 // with the License. You may obtain a copy of the License at 
9 // http://senf.berlios.de/license.html
10 //
11 // The Fraunhofer FOKUS Public License Version 1.0 is based on, 
12 // but modifies the Mozilla Public License Version 1.1.
13 // See the full license text for the amendments.
14 //
15 // Software distributed under the License is distributed on an "AS IS" basis, 
16 // WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
17 // for the specific language governing rights and limitations under the License.
18 //
19 // The Original Code is Fraunhofer FOKUS code.
20 //
21 // The Initial Developer of the Original Code is Fraunhofer-Gesellschaft e.V. 
22 // (registered association), Hansastraße 27 c, 80686 Munich, Germany.
23 // All Rights Reserved.
24 //
25 // Contributor(s):
26 //   Stefan Bund <g0dil@berlios.de>
27
28 /** \file
29     \brief Connectors non-inline non-template implementation */
30
31 #include "Connectors.hh"
32 #include "Connectors.ih"
33
34 // Custom includes
35 #include "ModuleManager.hh"
36 #include <senf/Utils/Console/ParsedCommand.hh>
37
38 //#include "Connectors.mpp"
39 #define prefix_
40 //-/////////////////////////////////////////////////////////////////////////////////////////////////
41
42 //-/////////////////////////////////////////////////////////////////////////////////////////////////
43 // senf::ppi::connector::Connector
44
45 prefix_ senf::ppi::connector::Connector::~Connector()
46 {
47     if (connected()) {
48         Connector & peer (*peer_);
49         peer_->peer_ = 0;
50         if (! peer.initializationScheduled())
51             peer.enqueueInitializable();
52         peer.v_disconnected();
53     }
54 }
55
56 prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
57 {
58     // The connector is not registered -> route() or noroute() statement missing
59     SENF_ASSERT( module_,
60                  "senf::ppi::connector::Connector::connect(): (source) "
61                  "Missing route() or noroute()" );
62     // The connector is already connected
63     SENF_ASSERT( ! peer_,
64                  "senf::ppi::connector::Connector::connect(): (source) "
65                  "duplicate connection" );
66     // The target connector is not registered -> route() or noroute() statement missing
67     SENF_ASSERT( target.module_,
68                  "senf::ppi::connector::Connector::connect(): (target) "
69                  "Missing route() or noroute()" );
70     // The target connector is already connected
71     SENF_ASSERT( ! target.peer_,
72                  "senf::ppi::connector::Connector::connect(): (target) "
73                  "duplicate connection" );
74
75     if (! (packetTypeID() == typeid(void) ||
76            target.packetTypeID() == typeid(void) ||
77            packetTypeID() == target.packetTypeID()) )
78         throw IncompatibleConnectorsException()
79             << ": " << prettyName(packetTypeID())
80             << " [in module " << prettyName(typeid(*module_))  << "] "
81             << ", " << prettyName(target.packetTypeID())
82             << " [in module " << prettyName(typeid(*target.module_)) << "]";
83
84     peer_ = & target;
85     target.peer_ = this;
86
87     if (! initializationScheduled())
88         enqueueInitializable();
89     if (! peer().initializationScheduled())
90         peer().enqueueInitializable();
91
92     v_connected();
93     peer_->v_connected();
94
95 }
96
97 senf::ppi::connector::Connector::TraceState senf::ppi::connector::Connector::traceState_ (
98     senf::ppi::connector::Connector::NO_TRACING);
99
100 prefix_ void senf::ppi::connector::Connector::trace(Packet const & p, char const * label)
101 {
102     if (traceState_ == NO_TRACING)
103         return;
104     SENF_LOG_BLOCK(({
105                 std::string type (prettyName(p.typeId().id()));
106                 log << "PPI packet trace: " << label << " 0x" << std::hex << p.id() << " "
107                     << type.substr(21, type.size()-22) << " on " << & module() << " "
108                     << prettyName(typeid(module())) << " connector 0x" << this << "\n";
109                 if (traceState_ == TRACE_CONTENTS)
110                     p.dump(log);
111             }));
112 }
113
114 prefix_ void senf::ppi::connector::Connector::throttleTrace(char const * label,
115                                                             char const * type)
116 {
117     if (traceState_ == NO_TRACING)
118         return;
119     SENF_LOG_BLOCK(({
120                 log << "PPI throttling trace: " << label << " " << type << " on " << & module()
121                     << " " << prettyName(typeid(module())) << " connector 0x" << this << "\n";
122             }));
123 }
124
125 namespace senf { namespace ppi { namespace connector {
126
127     SENF_CONSOLE_REGISTER_ENUM_MEMBER(
128         Connector, TraceState, (NO_TRACING)(TRACE_IDS)(TRACE_CONTENTS) );
129
130 }}}
131
132 namespace {
133
134     struct ConsoleRegister
135     {
136         ConsoleRegister();
137     };
138
139     ConsoleRegister::ConsoleRegister()
140     {
141 #ifndef SENF_PPI_NOTRACE
142         senf::ppi::ModuleManager::instance().consoleDir()
143             .add("tracing", senf::console::factory::Command(
144                      SENF_FNP(senf::ppi::connector::Connector::TraceState,
145                               senf::ppi::connector::Connector::tracing, ()))
146                  .doc("Log every packet sent or received by any module.\n"
147                       "There are three different tracing levels:\n"
148                       "\n"
149                       "    NO_TRACING      don't output any tracing information\n"
150                       "    TRACE_IDS       trace packet id's but do not show packet contents\n"
151                       "    TRACE_CONTENTS  trace complete packet contents\n"
152                       "\n"
153                       "A log message is generated whenever the packet traverses a connector. The\n"
154                       "TRACE_IDS log message has the following format:\n"
155                       "\n"
156                       "    PPI packet trace: <direction> <packet-id> <packet-type>\n"
157                       "                      on <module-id> <module-type> connector <connector-id>\n"
158                       "    PPI throttling trace: <direction> <throttle-msg>\n"
159                       "                      on <module-id> <module-type> connector <connector-id>\n"
160                       "\n"
161                       "The fields are:\n"
162                       "\n"
163                       "    direction       'IN' for packets/throttle notifications entering the module,\n"
164                       "                    'OUT' for packets/throttle notifications leaving it\n"
165                       "    packet-id       Numeric unique packet id. This value is unique for packets\n"
166                       "                    alive at the same time, packets at different times may (and\n"
167                       "                    will) share id's\n"
168                       "    packet-type     The type of the packet header\n"
169                       "    module-id       Unique module id\n"
170                       "    module-type     Type of the module the packet is sent to/from\n"
171                       "    connector-id    Unique connector id\n"
172                       "    throttle-msg    Type of throttling event\n")
173                 );
174
175         senf::ppi::ModuleManager::instance().consoleDir()
176             .add("tracing", senf::console::factory::Command(
177                      SENF_FNP(void, senf::ppi::connector::Connector::tracing,
178                               (senf::ppi::connector::Connector::TraceState)))
179                  .arg("state", "new tracing state")
180                 );
181 #endif
182     }
183
184     ConsoleRegister consoleRegister;
185
186 }
187
188 prefix_ void senf::ppi::connector::Connector::disconnect()
189 {
190     // Cannot disconnected a non-connected connector
191     SENF_ASSERT( peer_,
192                  "senf::ppi::connector::Connector::disconnect(): Not connected" );
193
194     Connector & peer (*peer_);
195     peer_ = 0;
196     peer.peer_ = 0;
197
198     if (! initializationScheduled())
199         enqueueInitializable();
200     if (! peer.initializationScheduled())
201         peer.enqueueInitializable();
202
203     v_disconnected();
204     peer.v_disconnected();
205 }
206
207 prefix_ std::type_info const & senf::ppi::connector::Connector::packetTypeID()
208 {
209     return typeid(void);
210 }
211
212 prefix_ void senf::ppi::connector::Connector::unregisterConnector()
213 {
214     if (module_)
215         module_->unregisterConnector(*this);
216 }
217
218 prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module)
219 {
220     module_ = &module;
221 }
222
223 prefix_ void senf::ppi::connector::Connector::v_disconnected()
224 {}
225
226 prefix_ void senf::ppi::connector::Connector::v_connected()
227 {}
228
229 //-/////////////////////////////////////////////////////////////////////////////////////////////////
230 // senf::ppi::connector::PassiveConnector
231
232 prefix_ senf::ppi::connector::PassiveConnector::~PassiveConnector()
233 {
234     // Must be here and NOT in base so it is called before destructing the routes_ member
235     unregisterConnector();
236 }
237
238 prefix_ void senf::ppi::connector::PassiveConnector::v_disconnected()
239 {
240     Connector::v_disconnected();
241     peer_ = 0;
242 }
243
244 prefix_ void senf::ppi::connector::PassiveConnector::v_connected()
245 {
246     Connector::v_connected();
247     peer_ = & dynamic_cast<ActiveConnector&>(Connector::peer());
248 }
249
250 //-/////////////////////////////////////////////////////////////////////////////////////////////////
251 // private members
252
253 prefix_ void senf::ppi::connector::PassiveConnector::v_init()
254 {
255     Routes::const_iterator i (routes_.begin());
256     Routes::const_iterator const i_end (routes_.end());
257     for (; i != i_end; ++i)
258         if ((*i)->throttled())
259             break;
260     if (i == i_end)
261         remoteThrottled_ = false;
262     if (throttled())
263         emitThrottle();
264     else
265         emitUnthrottle();
266 }
267
268 prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
269 {
270     routes_.push_back(&route);
271 }
272
273 prefix_ void senf::ppi::connector::PassiveConnector::unregisterRoute(ForwardingRoute & route)
274 {
275     Routes::iterator i (std::find(routes_.begin(), routes_.end(), &route));
276     if (i != routes_.end())
277         routes_.erase(i);
278 }
279
280 prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
281 {}
282
283 prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle()
284 {
285     if (std::find_if(routes_.begin(), routes_.end(),
286                      boost::bind(&ForwardingRoute::throttled, _1)) == routes_.end()) {
287         remoteThrottled_ = false;
288         if (!nativeThrottled_)
289             emitUnthrottle();
290     } else
291         SENF_PPI_THROTTLE_TRACE("OUT", "not forwarding unthrottle event");
292 }
293
294 //-/////////////////////////////////////////////////////////////////////////////////////////////////
295 // senf::ppi::connector::ActiveConnector
296
297 prefix_ senf::ppi::connector::ActiveConnector::~ActiveConnector()
298 {
299     // Must be here and NOT in base so it is called before destructing the routes_ member
300     unregisterConnector();
301 }
302
303 prefix_ void senf::ppi::connector::ActiveConnector::v_disconnected()
304 {
305     Connector::v_disconnected();
306     peer_ = 0;
307 }
308
309 prefix_ void senf::ppi::connector::ActiveConnector::v_connected()
310 {
311     Connector::v_connected();
312     peer_ = & dynamic_cast<PassiveConnector&>(Connector::peer());
313 }
314
315 //-/////////////////////////////////////////////////////////////////////////////////////////////////
316 // private members
317
318 prefix_ void senf::ppi::connector::ActiveConnector::v_init()
319 {
320     if (! connected())
321         notifyThrottle();
322 }
323
324 prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
325 {
326     SENF_PPI_THROTTLE_TRACE("IN ", "throttle");
327     if (! throttled_) {
328         throttled_ = true;
329         if (throttleCallback_)
330             throttleCallback_();
331         NotifyRoutes::const_iterator i (notifyRoutes_.begin());
332         NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
333         for (; i != i_end; ++i)
334             (*i)->notifyThrottle();
335     }
336 }
337
338 prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
339 {
340     SENF_PPI_THROTTLE_TRACE("IN ", "unthrottle");
341     if (throttled_) {
342         throttled_ = false;
343         if (unthrottleCallback_)
344             unthrottleCallback_();
345         NotifyRoutes::const_iterator i (notifyRoutes_.begin());
346         NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
347         for (; i != i_end; ++i)
348             (*i)->notifyUnthrottle();
349     }
350 }
351
352 prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route)
353 {
354     notifyRoutes_.push_back(&route);
355 }
356
357 prefix_ void senf::ppi::connector::ActiveConnector::unregisterRoute(ForwardingRoute & route)
358 {
359     NotifyRoutes::iterator i (std::find(notifyRoutes_.begin(), notifyRoutes_.end(), &route));
360     if (i != notifyRoutes_.end())
361         notifyRoutes_.erase(i);
362 }
363
364 //-/////////////////////////////////////////////////////////////////////////////////////////////////
365 // senf::ppi::connector::InputConnector
366
367 prefix_ senf::Packet const & senf::ppi::connector::InputConnector::operator()()
368 {
369     static Packet nullPacket;
370
371     if (empty())
372         v_requestEvent();
373     if (fastPacket_) {
374         Packet const * p = fastPacket_;
375         fastPacket_ = NULL;
376         v_dequeueEvent();
377         SENF_PPI_TRACE(*p, "IN ");
378         return *p;
379     }
380     if (! queue_.empty()) {
381         slowPacket_ = queue_.back();
382         queue_.pop_back();
383         v_dequeueEvent();
384         SENF_PPI_TRACE(slowPacket_, "IN ");
385         return slowPacket_;
386     } else {
387         SENF_PPI_TRACE(nullPacket, "IN ");
388         return nullPacket;
389     }
390 }
391
392 prefix_ void senf::ppi::connector::InputConnector::v_disconnected()
393 {
394     Connector::v_disconnected();
395     peer_ = 0;
396 }
397
398 prefix_ void senf::ppi::connector::InputConnector::v_connected()
399 {
400     Connector::v_connected();
401     peer_ = & dynamic_cast<OutputConnector&>(Connector::peer());
402 }
403
404 //-/////////////////////////////////////////////////////////////////////////////////////////////////
405 // private members
406
407 prefix_ void senf::ppi::connector::InputConnector::v_requestEvent()
408 {}
409
410 prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent()
411 {}
412
413 prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent()
414 {}
415
416 //-/////////////////////////////////////////////////////////////////////////////////////////////////
417 // senf::ppi::connector::OutputConnector
418
419 prefix_ void senf::ppi::connector::OutputConnector::v_disconnected()
420 {
421     Connector::v_disconnected();
422     peer_ = 0;
423 }
424
425 prefix_ void senf::ppi::connector::OutputConnector::v_connected()
426 {
427     Connector::v_connected();
428     peer_ = & dynamic_cast<InputConnector&>(Connector::peer());
429 }
430
431 //-/////////////////////////////////////////////////////////////////////////////////////////////////
432 // senf::ppi::connector::GenericActiveInput
433
434 //-/////////////////////////////////////////////////////////////////////////////////////////////////
435 // private members
436
437 prefix_ void senf::ppi::connector::GenericActiveInput::v_requestEvent()
438 {
439     request();
440 }
441
442 //-/////////////////////////////////////////////////////////////////////////////////////////////////
443 // senf::ppi::connector::GenericPassiveInput
444
445 prefix_ void senf::ppi::connector::GenericPassiveInput::v_disconnected()
446 {
447     PassiveConnector::v_disconnected();
448     InputConnector::v_disconnected();
449     peer_ = 0;
450 }
451
452 prefix_ void senf::ppi::connector::GenericPassiveInput::v_connected()
453 {
454     PassiveConnector::v_connected();
455     InputConnector::v_connected();
456     peer_ = & dynamic_cast<GenericActiveOutput&>(Connector::peer());
457 }
458
459 //-/////////////////////////////////////////////////////////////////////////////////////////////////
460 // private members
461
462 prefix_ void senf::ppi::connector::GenericPassiveInput::v_enqueueEvent()
463 {
464     emit();
465     if (qdisc_)
466         qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
467 }
468
469 prefix_ void senf::ppi::connector::GenericPassiveInput::v_dequeueEvent()
470 {
471     if (qdisc_)
472         qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
473 }
474
475 prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QueueingDiscipline::None_t)
476 {
477     qdisc_.reset( 0);
478 }
479
480 prefix_ void senf::ppi::connector::GenericPassiveInput::v_unthrottleEvent()
481 {
482     size_type n (queueSize());
483     while (n) {
484         emit();
485         size_type nn (queueSize());
486         if (n == nn)
487             break;
488         n = nn;
489     }
490 }
491
492 //-/////////////////////////////////////////////////////////////////////////////////////////////////
493 // senf::ppi::connector::GenericPassiveOutput
494
495 prefix_ void senf::ppi::connector::GenericPassiveOutput::v_disconnected()
496 {
497     PassiveConnector::v_disconnected();
498     OutputConnector::v_disconnected();
499     peer_ = 0;
500 }
501
502 prefix_ void senf::ppi::connector::GenericPassiveOutput::v_connected()
503 {
504     PassiveConnector::v_connected();
505     OutputConnector::v_connected();
506     peer_ = & dynamic_cast<GenericActiveInput&>(Connector::peer());
507 }
508
509 //-/////////////////////////////////////////////////////////////////////////////////////////////////
510 // senf::ppi::connector::GenericActiveInput
511
512 prefix_ void senf::ppi::connector::GenericActiveInput::v_disconnected()
513 {
514     ActiveConnector::v_disconnected();
515     InputConnector::v_disconnected();
516     peer_ = 0;
517 }
518
519 prefix_ void senf::ppi::connector::GenericActiveInput::v_connected()
520 {
521     ActiveConnector::v_connected();
522     InputConnector::v_connected();
523     peer_ = & dynamic_cast<GenericPassiveOutput&>(Connector::peer());
524 }
525
526 //-/////////////////////////////////////////////////////////////////////////////////////////////////
527 // senf::ppi::connector::GenericActiveOutput
528
529 prefix_ void senf::ppi::connector::GenericActiveOutput::v_disconnected()
530 {
531     ActiveConnector::v_disconnected();
532     OutputConnector::v_disconnected();
533     peer_ = 0;
534 }
535
536 prefix_ void senf::ppi::connector::GenericActiveOutput::v_connected()
537 {
538     ActiveConnector::v_connected();
539     OutputConnector::v_connected();
540     peer_ = & dynamic_cast<GenericPassiveInput&>(Connector::peer());
541 }
542
543
544 //-/////////////////////////////////////////////////////////////////////////////////////////////////
545 #undef prefix_
546 //#include "Connectors.mpp"
547
548 \f
549 // Local Variables:
550 // mode: c++
551 // fill-column: 100
552 // comment-column: 40
553 // c-file-style: "senf"
554 // indent-tabs-mode: nil
555 // ispell-local-dictionary: "american"
556 // compile-command: "scons -u test"
557 // End: