Fawkes API  Fawkes Development Version
interface_proxy.cpp
00001 
00002 /***************************************************************************
00003  *  interface_proxy.cpp - BlackBoard interface proxy for RemoteBlackBoard
00004  *
00005  *  Created: Tue Mar 04 11:40:18 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <blackboard/net/interface_proxy.h>
00025 #include <blackboard/internal/instance_factory.h>
00026 #include <blackboard/net/messages.h>
00027 #include <blackboard/internal/interface_mem_header.h>
00028 #include <blackboard/internal/notifier.h>
00029 
00030 #include <core/threading/refc_rwlock.h>
00031 #include <utils/logging/liblogger.h>
00032 #include <netcomm/fawkes/client.h>
00033 #include <netcomm/fawkes/message.h>
00034 
00035 #include <cstdlib>
00036 #include <cstring>
00037 #include <arpa/inet.h>
00038 
00039 namespace fawkes {
00040 
00041 /** @class BlackBoardInterfaceProxy <blackboard/net/interface_proxy.h>
00042  * Interface proxy for remote BlackBoard.
00043  * This proxy is used internally by RemoteBlackBoard to interact with an interface
00044  * on the one side and the remote BlackBoard on the other side.
00045  * @author Tim Niemueller
00046  */
00047 
00048 /** Constructor.
00049  * @param client Fawkes network client
00050  * @param msg must be a MSG_BB_OPEN_SUCCESS message describing the interface in question
00051  * @param notifier BlackBoard notifier to use to notify of interface events
00052  * @param interface interface instance of the correct type, will be initialized in
00053  * this ctor and can be used afterwards.
00054  * @param writer true to make this a writing instance, false otherwise
00055  */
00056 BlackBoardInterfaceProxy::BlackBoardInterfaceProxy(FawkesNetworkClient *client,
00057                                                    FawkesNetworkMessage *msg,
00058                                                    BlackBoardNotifier *notifier,
00059                                                    Interface *interface, bool writer)
00060 {
00061   __fnc = client;
00062   if ( msg->msgid() != MSG_BB_OPEN_SUCCESS ) {
00063     throw Exception("Expected open success message");
00064   }
00065 
00066   void *payload = msg->payload();
00067   bb_iopensucc_msg_t *osm = (bb_iopensucc_msg_t *)payload;
00068 
00069   __notifier        = notifier;
00070   __interface       = interface;
00071   __instance_serial = ntohl(osm->serial);
00072   __has_writer      = (osm->has_writer == 1);
00073   __num_readers     = ntohl(osm->num_readers);
00074   __data_size       = ntohl(osm->data_size);
00075   __clid            = msg->clid();
00076   __next_msg_id     = 1;
00077 
00078   if ( interface->datasize() != __data_size ) {
00079     // Boom, sizes do not match
00080     throw Exception("Network message does not carry chunk of expected size");
00081   }
00082 
00083   __rwlock     = new RefCountRWLock();
00084   __mem_chunk  = malloc(sizeof(interface_header_t) + __data_size);
00085   __data_chunk = (char *)__mem_chunk + sizeof(interface_header_t);
00086   memset(__mem_chunk, 0, sizeof(interface_header_t) + __data_size);
00087   memcpy(__data_chunk, (char *)payload + sizeof(bb_iopensucc_msg_t), __data_size);
00088 
00089   interface_header_t *ih = (interface_header_t *)__mem_chunk;
00090 
00091   strncpy(ih->type, interface->type(), __INTERFACE_TYPE_SIZE);
00092   strncpy(ih->id, interface->id(), __INTERFACE_ID_SIZE);
00093   memcpy(ih->hash, interface->hash(), __INTERFACE_HASH_SIZE);
00094   ih->flag_writer_active = (__has_writer ? 1 : 0);
00095   ih->num_readers = __num_readers;
00096   ih->refcount = 1;
00097 
00098   interface->set_instance_serial(__instance_serial);
00099   interface->set_memory(0, __mem_chunk, __data_chunk);
00100   interface->set_mediators(this, this);
00101   interface->set_readwrite(writer, __rwlock);
00102 }
00103 
00104 /** Destructor. */
00105 BlackBoardInterfaceProxy::~BlackBoardInterfaceProxy()
00106 {
00107   free(__mem_chunk);
00108 }
00109 
00110 
00111 /** Process MSG_BB_DATA_CHANGED message.
00112  * @param msg message to process.
00113  */
00114 void
00115 BlackBoardInterfaceProxy::process_data_changed(FawkesNetworkMessage *msg)
00116 {
00117   if ( msg->msgid() != MSG_BB_DATA_CHANGED ) {
00118     LibLogger::log_error("BlackBoardInterfaceProxy", "Expected data changed BB message, but "
00119                          "received message of type %u, ignoring.", msg->msgid());
00120     return;
00121   }
00122 
00123   void *payload = msg->payload();
00124   bb_idata_msg_t *dm = (bb_idata_msg_t *)payload;
00125   if ( ntohl(dm->serial) != __instance_serial ) {
00126     LibLogger::log_error("BlackBoardInterfaceProxy", "Serial mismatch, expected %u, "
00127                          "but got %u, ignoring.", __instance_serial, ntohl(dm->serial));
00128     return;
00129   }
00130 
00131   if ( ntohl(dm->data_size) != __data_size ) {
00132     LibLogger::log_error("BlackBoardInterfaceProxy", "Data size mismatch, expected %zu, "
00133                          "but got %zu, ignoring.", __data_size, ntohl(dm->data_size));
00134     return;
00135   }
00136 
00137   memcpy(__data_chunk, (char *)payload + sizeof(bb_idata_msg_t), __data_size);
00138 
00139   __notifier->notify_of_data_change(__interface);
00140 }
00141 
00142 
00143 /** Process MSG_BB_INTERFACE message.
00144  * @param msg message to process.
00145  */
00146 void
00147 BlackBoardInterfaceProxy::process_interface_message(FawkesNetworkMessage *msg)
00148 {
00149   if ( msg->msgid() != MSG_BB_INTERFACE_MESSAGE ) {
00150     LibLogger::log_error("BlackBoardInterfaceProxy", "Expected interface BB message, but "
00151                          "received message of type %u, ignoring.", msg->msgid());
00152     return;
00153   }
00154 
00155   void *payload = msg->payload();
00156   bb_imessage_msg_t *mm = (bb_imessage_msg_t *)payload;
00157   if ( ntohl(mm->serial) != __instance_serial ) {
00158     LibLogger::log_error("BlackBoardInterfaceProxy", "Serial mismatch (msg), expected %u, "
00159                          "but got %u, ignoring.", __instance_serial, ntohl(mm->serial));
00160     return;
00161   }
00162 
00163   if ( ! __interface->is_writer() ) {
00164     LibLogger::log_error("BlackBoardInterfaceProxy", "Received interface message, but this"
00165                          "is a reading instance (%s), ignoring.", __interface->uid());
00166     return;
00167   }
00168 
00169   try {
00170     Message *im = __interface->create_message(mm->msg_type);
00171     im->set_id(ntohl(mm->msgid));
00172     im->set_hops(ntohl(mm->hops) + 1);
00173 
00174     if (im->hops() > 1) {
00175       LibLogger::log_warn("BlackBoardInterfaceProxy", "Message IDs are not stable across more than one hop, "
00176                           "message of type %s for interface %s has %u hops",
00177                           im->type(), __interface->uid(), im->hops());
00178     }
00179 
00180     if ( ntohl(mm->data_size) != im->datasize() ) {
00181       LibLogger::log_error("BlackBoardInterfaceProxy", "Message data size mismatch, expected "
00182                            "%zu, but got %zu, ignoring.", im->datasize(), ntohl(mm->data_size));
00183       delete im;
00184       return;
00185     }
00186 
00187     im->set_from_chunk((char *)payload + sizeof(bb_imessage_msg_t));
00188 
00189     if ( __notifier->notify_of_message_received(__interface, im) ) {
00190       __interface->msgq_append(im);
00191     }
00192   } catch (Exception &e) {
00193     e.append("Failed to enqueue interface message for %s, ignoring", __interface->uid());
00194     LibLogger::log_error("BlackBoardInterfaceProxy", e);
00195   }
00196 }
00197 
00198 
00199 /** Reader has been added.
00200  * @param event_serial instance serial of the interface that caused the event
00201  */
00202 void
00203 BlackBoardInterfaceProxy::reader_added(unsigned int event_serial)
00204 {
00205   ++__num_readers;
00206   __notifier->notify_of_reader_added(__interface, event_serial);
00207 }
00208 
00209 /** Reader has been removed.
00210  * @param event_serial instance serial of the interface that caused the event
00211  */
00212 void
00213 BlackBoardInterfaceProxy::reader_removed(unsigned int event_serial)
00214 {
00215   if ( __num_readers > 0 ) {
00216     --__num_readers;
00217   }
00218   __notifier->notify_of_reader_removed(__interface, event_serial);
00219 }
00220 
00221 /** Writer has been added.
00222  * @param event_serial instance serial of the interface that caused the event
00223  */
00224 void
00225 BlackBoardInterfaceProxy::writer_added(unsigned int event_serial)
00226 {
00227   __has_writer = true;
00228   __notifier->notify_of_writer_added(__interface, event_serial);
00229 }
00230 
00231 /** Writer has been removed.
00232  * @param event_serial instance serial of the interface that caused the event
00233  */
00234 void
00235 BlackBoardInterfaceProxy::writer_removed(unsigned int event_serial)
00236 {
00237   __has_writer = false;
00238   __notifier->notify_of_writer_removed(__interface, event_serial);
00239 }
00240 
00241 
00242 /** Get instance serial of interface.
00243  * @return instance serial
00244  */
00245 unsigned int
00246 BlackBoardInterfaceProxy::serial() const
00247 {
00248   return __instance_serial;
00249 }
00250 
00251 
00252 /** Get client ID of assigned client.
00253  * @return client ID
00254  */
00255 unsigned int
00256 BlackBoardInterfaceProxy::clid() const
00257 {
00258   return __instance_serial;
00259 }
00260 
00261 /** Get instance serial of interface.
00262  * @return instance serial
00263  */
00264 Interface *
00265 BlackBoardInterfaceProxy::interface() const
00266 {
00267   return __interface;
00268 }
00269 
00270 
00271 /* InterfaceMediator */
00272 bool
00273 BlackBoardInterfaceProxy::exists_writer(const Interface *interface) const
00274 {
00275   return __has_writer;
00276 }
00277 
00278 unsigned int
00279 BlackBoardInterfaceProxy::num_readers(const Interface *interface) const
00280 {
00281   return __num_readers;
00282 }
00283 
00284 void
00285 BlackBoardInterfaceProxy::notify_of_data_change(const Interface *interface)
00286 {
00287   // need to send write message
00288   size_t payload_size = sizeof(bb_idata_msg_t) + interface->datasize();
00289   void *payload = malloc(payload_size);
00290   bb_idata_msg_t *dm = (bb_idata_msg_t *)payload;
00291   dm->serial = htonl(interface->serial());
00292   dm->data_size = htonl(interface->datasize());
00293   memcpy((char *)payload + sizeof(bb_idata_msg_t), interface->datachunk(),
00294          interface->datasize());
00295 
00296   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(__clid, FAWKES_CID_BLACKBOARD,
00297                                                         MSG_BB_DATA_CHANGED,
00298                                                         payload, payload_size);
00299   __fnc->enqueue(omsg);
00300 }
00301 
00302 
00303 /* MessageMediator */
00304 void
00305 BlackBoardInterfaceProxy::transmit(Message *message)
00306 {
00307   // send out interface message
00308   size_t payload_size = sizeof(bb_imessage_msg_t) + message->datasize();
00309   void *payload = calloc(1, payload_size);
00310   bb_imessage_msg_t *dm = (bb_imessage_msg_t *)payload;
00311   dm->serial = htonl(__interface->serial());
00312   unsigned int msgid = next_msg_id();
00313   dm->msgid  = htonl(msgid);
00314   dm->hops   = htonl(message->hops());
00315   message->set_id(msgid);
00316   strncpy(dm->msg_type, message->type(), __INTERFACE_MESSAGE_TYPE_SIZE);
00317   dm->data_size = htonl(message->datasize());
00318   memcpy((char *)payload + sizeof(bb_imessage_msg_t), message->datachunk(),
00319          message->datasize());
00320 
00321   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(__clid, FAWKES_CID_BLACKBOARD,
00322                                                         MSG_BB_INTERFACE_MESSAGE,
00323                                                         payload, payload_size);
00324   __fnc->enqueue(omsg);
00325 }
00326 
00327 } // end namespace fawkes