Scheduler: Implement new file descriptor event API
[senf.git] / Console / Server.cc
1 // $Id$
2 //
3 // Copyright (C) 2008 
4 // Fraunhofer Institute for Open Communication Systems (FOKUS)
5 // Competence Center NETwork research (NET), St. Augustin, GERMANY
6 //     Stefan Bund <g0dil@berlios.de>
7 //
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12 //
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU General Public License
19 // along with this program; if not, write to the
20 // Free Software Foundation, Inc.,
21 // 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
22
23 /** \file
24     \brief Server non-inline non-template implementation */
25
26 #include "Server.hh"
27 #include "Server.ih"
28
29 // Custom includes
30 #include <iostream>
31 #include <boost/algorithm/string/trim.hpp>
32 #include <boost/iostreams/device/file_descriptor.hpp>
33 #include <boost/iostreams/stream.hpp>
34 #include <boost/bind.hpp>
35 #include "../Utils/senfassert.hh"
36 #include "../Utils/membind.hh"
37 #include "../Utils/Logger/SenfLog.hh"
38 #include "Readline.hh"
39
40 //#include "Server.mpp"
41 #define prefix_
42 ///////////////////////////////cc.p////////////////////////////////////////
43
44 ///////////////////////////////////////////////////////////////////////////
45 // senf::console::detail::NonBlockingSocketSink
46
47 prefix_ std::streamsize senf::console::detail::NonblockingSocketSink::write(const char * s,
48                                                                             std::streamsize n)
49 {
50     try {
51         if (client_.handle().writeable()) {
52             std::string data (s, n);
53             client_.translate(data);
54             client_.handle().write( data );
55         }
56     }
57     catch (SystemException & ex) {
58         ;
59     }
60     return n;
61 }
62
63 ///////////////////////////////////////////////////////////////////////////
64 // senf::console::Server
65
66 prefix_ senf::console::Server &
67 senf::console::Server::start(senf::INet4SocketAddress const & address)
68 {
69     senf::TCPv4ServerSocketHandle handle (address);
70     Server & server (senf::console::Server::start(handle));
71     SENF_LOG((Server::SENFLogArea)(log::NOTICE)( 
72                  "Console server started at " << address ));
73     return server;
74 }
75
76 prefix_ senf::console::Server &
77 senf::console::Server::start(senf::INet6SocketAddress const & address)
78 {
79     senf::TCPv6ServerSocketHandle handle (address);
80     Server & server (senf::console::Server::start(handle));
81     SENF_LOG((Server::SENFLogArea)(log::NOTICE)( 
82                  "Console server started at " << address ));
83     return server;
84 }
85
86 prefix_ senf::console::Server & senf::console::Server::start(ServerHandle handle)
87 {
88     // Uah .... ensure the scheduler is created before the instance pointer so it get's destructed
89     // AFTER it.
90     (void) senf::Scheduler::instance();
91     boost::intrusive_ptr<Server> p (new Server(handle));
92     detail::ServerManager::add(boost::intrusive_ptr<Server>(p));
93     return *p;
94 }
95
96 prefix_ senf::console::Server::Server(ServerHandle handle)
97     : handle_ (handle), 
98       event_ ("console::Server", senf::membind(&Server::newClient, this),
99               handle_, scheduler::FdEvent::EV_READ),
100       root_ (senf::console::root().thisptr()), mode_ (Automatic)
101 {}
102
103 prefix_ void senf::console::Server::newClient(int event)
104 {
105     ServerHandle::ClientSocketHandle client (handle_.accept());
106     boost::intrusive_ptr<Client> p (new Client(*this, client));
107     clients_.insert( p );
108     SENF_LOG(( "Registered new client " << p.get() ));
109 }
110
111 prefix_ void senf::console::Server::removeClient(Client & client)
112 {
113     SENF_LOG(( "Disposing client " << & client ));
114     // THIS DELETES THE CLIENT INSTANCE !!
115     clients_.erase(boost::intrusive_ptr<Client>(&client));
116 }
117
118 ///////////////////////////////////////////////////////////////////////////
119 // senf::console::detail::DumbClientReader
120
121 prefix_ senf::console::detail::DumbClientReader::DumbClientReader(Client & client)
122     : ClientReader(client), promptLen_ (0), promptActive_ (false)
123 {
124     showPrompt();
125     ReadHelper<ClientHandle>::dispatch( handle(), 16384u, ReadUntil("\n"),
126                                         senf::membind(&DumbClientReader::clientData, this) );
127 }
128
129 prefix_ void
130 senf::console::detail::DumbClientReader::clientData(senf::ReadHelper<ClientHandle>::ptr helper)
131 {
132     if (helper->error() || handle().eof()) {
133         // THIS COMMITS SUICIDE. THE INSTANCE IS GONE AFTER stopClient RETURNS
134         stopClient();
135         return;
136     }
137     
138     promptLen_ = 0;
139     promptActive_ = false;
140
141     std::string data (tail_ + helper->data());
142     tail_ = helper->tail();
143     boost::trim(data);                  // Gets rid of superfluous  \r or \n characters
144     handleInput(data);
145
146     showPrompt();
147     ReadHelper<ClientHandle>::dispatch( handle(), 16384u, ReadUntil("\n"),
148                                         senf::membind(&DumbClientReader::clientData, this) );
149
150 }
151
152 prefix_ void senf::console::detail::DumbClientReader::showPrompt()
153 {
154     std::string prompt (promptString());
155
156     stream() << std::flush;
157     handle().write(prompt);
158     promptLen_ = prompt.size();
159     promptActive_ = true;
160 }
161
162 prefix_ void senf::console::detail::DumbClientReader::v_disablePrompt()
163 {
164     if (promptActive_ && promptLen_ > 0) {
165         stream() << '\r' << std::string(' ', promptLen_) << '\r';
166         promptLen_ = 0;
167     }
168 }
169
170 prefix_ void senf::console::detail::DumbClientReader::v_enablePrompt()
171 {
172     if (promptActive_ && ! promptLen_)
173         showPrompt();
174 }
175
176 prefix_ void senf::console::detail::DumbClientReader::v_translate(std::string & data)
177 {}
178
179 ///////////////////////////////////////////////////////////////////////////
180 // senf::console::detail::NoninteractiveClientReader
181
182 prefix_
183 senf::console::detail::NoninteractiveClientReader::NoninteractiveClientReader(Client & client)
184     : ClientReader (client), 
185       readevent_ ("NoninteractiveClientReader", 
186                   senf::membind(&NoninteractiveClientReader::newData, this),
187                   handle(), senf::Scheduler::EV_READ)
188 {}
189
190 prefix_ void senf::console::detail::NoninteractiveClientReader::v_disablePrompt()
191 {}
192
193 prefix_ void senf::console::detail::NoninteractiveClientReader::v_enablePrompt()
194 {}
195
196 prefix_ void senf::console::detail::NoninteractiveClientReader::v_translate(std::string & data)
197 {}
198
199 prefix_ void
200 senf::console::detail::NoninteractiveClientReader::newData(int event)
201 {
202     if (event != senf::Scheduler::EV_READ || handle().eof()) {
203         if (! buffer_.empty())
204             handleInput(buffer_);
205         stopClient();
206         return;
207     }
208
209     std::string::size_type n (buffer_.size());
210     buffer_.resize(n + handle().available());
211     buffer_.erase(handle().read(boost::make_iterator_range(buffer_.begin()+n, buffer_.end())),
212                   buffer_.end());
213     buffer_.erase(0, handleInput(buffer_, true));
214     stream() << std::flush;
215 }
216
217 ///////////////////////////////////////////////////////////////////////////
218 // senf::console::Client
219
220 prefix_ senf::console::Client::Client(Server & server, ClientHandle handle)
221     : out_t(boost::ref(*this)), senf::log::IOStreamTarget(out_t::member), server_ (server),
222       handle_ (handle), 
223       readevent_ ("senf::console::Client", boost::bind(&Client::setNoninteractive,this), 
224                   handle, Scheduler::EV_READ, false),
225       timer_ ("senf::console::Client interactive timeout", 
226               boost::bind(&Client::setInteractive, this),
227               Scheduler::instance().eventTime() + ClockService::milliseconds(INTERACTIVE_TIMEOUT),
228               false),
229       name_ (server.name()), reader_ (), mode_ (server.mode())
230 {
231     handle_.facet<senf::TCPSocketProtocol>().nodelay();
232     executor_.chroot(root());
233     switch (mode_) {
234     case Server::Interactive :
235         setInteractive();
236         break;
237     case Server::Noninteractive :
238         setNoninteractive();
239         break;
240     case Server::Automatic :
241         readevent_.enable();
242         timer_.enable();
243         break;
244     }
245 }
246
247 prefix_ void senf::console::Client::setInteractive()
248 {
249     readevent_.disable();
250     timer_.disable();
251     mode_ = Server::Interactive;
252     reader_.reset(new detail::SafeReadlineClientReader (*this));
253     executor_.autocd(true).autocomplete(true);
254 }
255
256 prefix_ void senf::console::Client::setNoninteractive()
257 {
258     readevent_.disable();
259     timer_.disable();
260     mode_ = Server::Noninteractive;
261     reader_.reset(new detail::NoninteractiveClientReader(*this));
262 }
263
264 prefix_ void senf::console::Client::translate(std::string & data)
265 {
266     reader_->translate(data);
267 }
268
269 prefix_ std::string::size_type senf::console::Client::handleInput(std::string data,
270                                                                   bool incremental)
271 {
272     if (data.empty() && ! incremental)
273         data = lastCommand_;
274     else
275         lastCommand_ = data;
276
277     bool state (true);
278     std::string::size_type n (data.size());
279
280     try {
281         if (incremental)
282             n = parser_.parseIncremental(data, boost::bind<void>( boost::ref(executor_),
283                                                                   boost::ref(stream()),
284                                                                   _1 ));
285         else
286             state = parser_.parse(data, boost::bind<void>( boost::ref(executor_),
287                                                            boost::ref(stream()),
288                                                            _1 ));
289         if (! state )
290             stream() << "syntax error" << std::endl;
291     }
292     catch (Executor::ExitException &) {
293         // This generates an EOF condition on the Handle. This EOF condition is expected
294         // to be handled gracefully by the ClientReader. We cannot call stop() here, since we
295         // are called from the client reader callback and that will continue executing even if we
296         // call stop here ...
297         handle_.facet<senf::TCPSocketProtocol>().shutdown(senf::TCPSocketProtocol::ShutRD);
298     }
299     catch (std::exception & ex) {
300         stream() << ex.what() << std::endl;
301     }
302     catch (...) {
303         stream() << "unidentified error (unknown exception thrown)" << std::endl;
304     }
305     return n;
306 }
307
308 prefix_ void senf::console::Client::v_write(senf::log::time_type timestamp,
309                                             std::string const & stream,
310                                             std::string const & area, unsigned level,
311                                             std::string const & message)
312 {
313     reader_->disablePrompt();
314     IOStreamTarget::v_write(timestamp, stream, area, level, message);
315     out_t::member << std::flush;
316     reader_->enablePrompt();
317 }
318
319 prefix_ std::ostream & senf::console::operator<<(std::ostream & os, Client const & client)
320 {
321     typedef ClientSocketHandle< MakeSocketPolicy<
322         INet4AddressingPolicy,ConnectedCommunicationPolicy>::policy > V4Socket;
323     typedef ClientSocketHandle< MakeSocketPolicy<
324         INet6AddressingPolicy,ConnectedCommunicationPolicy>::policy > V6Socket;
325
326     try {
327         if (check_socket_cast<V4Socket>(client.handle()))
328             os << dynamic_socket_cast<V4Socket>(client.handle()).peer();
329         else if (check_socket_cast<V6Socket>(client.handle()))
330             os << dynamic_socket_cast<V6Socket>(client.handle()).peer();
331         else
332             os << static_cast<void const *>(&client);
333     }
334     catch (SystemException &) {
335         os << "0.0.0.0:0";
336     }
337         
338     return os;
339 }
340
341 prefix_ std::ostream & senf::console::operator<<(std::ostream & os, Client * client)
342 {
343     return os << *client;
344 }
345
346 ///////////////////////////////cc.e////////////////////////////////////////
347 #undef prefix_
348 //#include "Server.mpp"
349
350 \f
351 // Local Variables:
352 // mode: c++
353 // fill-column: 100
354 // comment-column: 40
355 // c-file-style: "senf"
356 // indent-tabs-mode: nil
357 // ispell-local-dictionary: "american"
358 // compile-command: "scons -u test"
359 // End: