Fawkes API  Fawkes Development Version
server_client_thread.cpp
00001 
00002 /***************************************************************************
00003  *  server_client_thread.cpp - Thread handling Fawkes network client
00004  *
00005  *  Created: Fri Nov 17 17:23:24 2006
00006  *  Copyright  2006-2007  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <core/exceptions/system.h>
00025 
00026 #include <netcomm/fawkes/server_client_thread.h>
00027 #include <netcomm/fawkes/server_thread.h>
00028 #include <netcomm/fawkes/message_queue.h>
00029 #include <netcomm/fawkes/transceiver.h>
00030 #include <netcomm/socket/stream.h>
00031 #include <netcomm/utils/exceptions.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/threading/wait_condition.h>
00034 
00035 #include <unistd.h>
00036 
00037 namespace fawkes {
00038 
00039 /** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h>
00040  * Sending thread for a Fawkes client connected to the server.
00041  * This thread is spawned for each client connected to the server to handle the
00042  * server-side sending
00043  * @ingroup NetComm
00044  * @author Tim Niemueller
00045  */
00046 
00047 class FawkesNetworkServerClientSendThread
00048   : public Thread
00049 {
00050  public:
00051   /** Constructor.
00052    * @param s client stream socket
00053    * @param parent parent FawkesNetworkServerClientThread instance
00054    */
00055   FawkesNetworkServerClientSendThread(StreamSocket *s,
00056                                       FawkesNetworkServerClientThread *parent)
00057     : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
00058   {
00059     __s = s;
00060     __parent = parent;
00061     __outbound_mutex    = new Mutex();
00062     __outbound_msgqs[0] = new FawkesNetworkMessageQueue();
00063     __outbound_msgqs[1] = new FawkesNetworkMessageQueue();
00064     __outbound_active   = 0;
00065     __outbound_msgq     = __outbound_msgqs[0];
00066   }
00067 
00068   /** Destructor. */
00069   ~FawkesNetworkServerClientSendThread()
00070   {
00071     for (unsigned int i = 0; i < 2; ++i) {
00072       while ( ! __outbound_msgqs[i]->empty() ) {
00073         FawkesNetworkMessage *m = __outbound_msgqs[i]->front();
00074         m->unref();
00075         __outbound_msgqs[i]->pop();
00076       }
00077     }
00078     delete __outbound_msgqs[0];
00079     delete __outbound_msgqs[1];
00080     delete __outbound_mutex;
00081   }
00082 
00083   virtual void loop()
00084   {
00085     if ( ! __parent->alive() )  return;
00086 
00087     while ( __outbound_havemore ) {
00088       __outbound_mutex->lock();
00089       __outbound_havemore = false;
00090       FawkesNetworkMessageQueue *q = __outbound_msgq;
00091       __outbound_active = 1 - __outbound_active;
00092       __outbound_msgq = __outbound_msgqs[__outbound_active];
00093       __outbound_mutex->unlock();
00094 
00095       if ( ! q->empty() ) {
00096         try {
00097           FawkesNetworkTransceiver::send(__s, q);
00098         } catch (ConnectionDiedException &e) {
00099           __parent->connection_died();
00100           exit();
00101         }
00102       }
00103     }
00104   }
00105 
00106 
00107   /** Enqueue message to outbound queue.
00108    * This enqueues the given message to the outbound queue. The message will
00109    * be sent in the next loop iteration. This method takes ownership of the
00110    * transmitted message. If you want to use the message after enqueuing you
00111    * must reference it explicitly.
00112    * @param msg message to enqueue
00113    */
00114   void enqueue(FawkesNetworkMessage *msg)
00115   {
00116     __outbound_mutex->lock();
00117     __outbound_msgq->push(msg);
00118     __outbound_havemore = true;
00119     __outbound_mutex->unlock();
00120     wakeup();
00121   }
00122 
00123 
00124   /** Wait until all data has been sent. */
00125   void wait_for_all_sent()
00126   {
00127     loop_mutex->lock();
00128     loop_mutex->unlock();
00129   }
00130 
00131  /** Stub to see name in backtrace for easier debugging. @see Thread::run() */
00132  protected: virtual void run() { Thread::run(); }
00133 
00134  private:
00135   StreamSocket                    *__s;
00136   FawkesNetworkServerClientThread *__parent;
00137 
00138   Mutex                     *__outbound_mutex;
00139   unsigned int               __outbound_active;
00140   bool                       __outbound_havemore;
00141   FawkesNetworkMessageQueue *__outbound_msgq;
00142   FawkesNetworkMessageQueue *__outbound_msgqs[2];
00143 
00144 };
00145 
00146 
00147 /** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h
00148  * Fawkes Network Client Thread for server.
00149  * The FawkesNetworkServerThread spawns an instance of this class for every incoming
00150  * connection. It is then used to handle the client.
00151  * The thread will start another thread, an instance of
00152  * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing
00153  * traffic.
00154  *
00155  * @ingroup NetComm
00156  * @author Tim Niemueller
00157  */
00158 
00159 /** Constructor.
00160  * @param s socket to client
00161  * @param parent parent network thread
00162  */
00163 FawkesNetworkServerClientThread::FawkesNetworkServerClientThread(StreamSocket *s,
00164                                                                  FawkesNetworkServerThread *parent)
00165   : Thread("FawkesNetworkServerClientThread")
00166 {
00167   _s = s;
00168   _parent = parent;
00169   _alive = true;
00170   _clid = 0;
00171   _inbound_queue = new FawkesNetworkMessageQueue();
00172 
00173   _send_slave = new FawkesNetworkServerClientSendThread(_s, this);
00174 
00175   set_prepfin_conc_loop(true);
00176 }
00177 
00178 
00179 /** Destructor. */
00180 FawkesNetworkServerClientThread::~FawkesNetworkServerClientThread()
00181 {
00182   _send_slave->cancel();
00183   _send_slave->join();
00184   delete _send_slave;
00185   delete _s;
00186   delete _inbound_queue;
00187 }
00188 
00189 
00190 /** Get client ID.
00191  * The client ID can be used to send replies.
00192  * @return client ID
00193  */
00194 unsigned int
00195 FawkesNetworkServerClientThread::clid() const
00196 {
00197   return _clid;
00198 }
00199 
00200 
00201 /** Set client ID.
00202  * @param client_id new client ID
00203  */
00204 void
00205 FawkesNetworkServerClientThread::set_clid(unsigned int client_id)
00206 {
00207   _clid = client_id;
00208 }
00209 
00210 
00211 /** Receive data.
00212  * Receives data from the network if there is any and then dispatches all
00213  * inbound messages via the parent FawkesNetworkThread::dispatch()
00214  */
00215 void
00216 FawkesNetworkServerClientThread::recv()
00217 {
00218   try {
00219     FawkesNetworkTransceiver::recv(_s, _inbound_queue);
00220 
00221     _inbound_queue->lock();
00222     while ( ! _inbound_queue->empty() ) {
00223       FawkesNetworkMessage *m = _inbound_queue->front();
00224       m->set_client_id(_clid);
00225       _parent->dispatch(m);
00226       m->unref();
00227       _inbound_queue->pop();
00228     }
00229     _parent->wakeup();
00230     _inbound_queue->unlock();
00231 
00232   } catch (ConnectionDiedException &e) {
00233     _alive = false;
00234     _s->close();
00235     _parent->wakeup();
00236   }
00237 }
00238 
00239 
00240 void
00241 FawkesNetworkServerClientThread::once()
00242 {
00243   _send_slave->start();
00244 }
00245 
00246 
00247 /** Thread loop.
00248  * The client thread loop polls on the socket for 10 ms (wait for events
00249  * on the socket like closed connection or data that can be read). If any
00250  * event occurs it is processed. If the connection died or any other
00251  * error occured the thread is cancelled and the parent FawkesNetworkThread
00252  * is woken up to carry out any action that is needed when a client dies.
00253  * If data is available for reading thedata is received and dispatched
00254  * via recv().
00255  * Afterwards the outbound message queue is processed and alle messages are
00256  * sent. This is also done if the operation could block (POLL_OUT is not
00257  * honored).
00258  */
00259 void
00260 FawkesNetworkServerClientThread::loop()
00261 {
00262   if ( ! _alive) {
00263     usleep(1000000);
00264     return;
00265   }
00266 
00267   short p = 0;
00268   try {
00269     p = _s->poll(); // block until we got a message
00270   } catch (InterruptedException &e) {
00271     // we just ignore this and try it again
00272     return;
00273   }
00274 
00275   if ( (p & Socket::POLL_ERR) ||
00276        (p & Socket::POLL_HUP) ||
00277        (p & Socket::POLL_RDHUP)) {
00278     _alive = false;
00279     _parent->wakeup();
00280   } else if ( p & Socket::POLL_IN ) {
00281     // Data can be read
00282     recv();
00283   }
00284 }
00285 
00286 /** Enqueue message to outbound queue.
00287  * This enqueues the given message to the outbound queue. The message will be send
00288  * in the next loop iteration.
00289  * @param msg message to enqueue
00290  */
00291 void
00292 FawkesNetworkServerClientThread::enqueue(FawkesNetworkMessage *msg)
00293 {
00294   _send_slave->enqueue(msg);
00295 }
00296 
00297 
00298 /** Check aliveness of connection.
00299  * @return true if connection is still alive, false otherwise.
00300  */
00301 bool
00302 FawkesNetworkServerClientThread::alive() const
00303 {
00304   return _alive;
00305 }
00306 
00307 
00308 /** Force sending of all pending outbound messages.
00309  * This is a blocking operation. The current poll will be interrupted by sending
00310  * a signal to this thread (and ignoring it) and then wait for the sending to
00311  * finish.
00312  */
00313 void
00314 FawkesNetworkServerClientThread::force_send()
00315 {
00316   _send_slave->wait_for_all_sent();
00317 }
00318 
00319 
00320 /** Connection died notification.
00321  * To be called only be the send slave thread.
00322  */
00323 void
00324 FawkesNetworkServerClientThread::connection_died()
00325 {
00326   _alive = false;
00327   _parent->wakeup();
00328 }
00329 
00330 } // end namespace fawkes