24 #include <blackboard/blackboard.h>
25 #include <blackboard/interface_listener.h>
26 #include <blackboard/interface_observer.h>
27 #include <blackboard/internal/notifier.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/mutex_locker.h>
30 #include <core/utils/lock_hashmap.h>
31 #include <core/utils/lock_hashset.h>
32 #include <interface/interface.h>
33 #include <logging/liblogger.h>
54 bbil_writer_events_ = 0;
55 bbil_writer_mutex_ =
new Mutex();
57 bbil_reader_events_ = 0;
58 bbil_reader_mutex_ =
new Mutex();
60 bbil_data_events_ = 0;
61 bbil_data_mutex_ =
new Mutex();
63 bbil_messages_events_ = 0;
64 bbil_messages_mutex_ =
new Mutex();
67 bbio_mutex_ =
new Mutex();
73 delete bbil_writer_mutex_;
74 delete bbil_reader_mutex_;
75 delete bbil_data_mutex_;
76 delete bbil_messages_mutex_;
104 BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
106 for (i = queue.begin(); i != queue.end(); ++i) {
110 proc_listener_maybe_queue(i->op,
122 proc_listener_maybe_queue(i->op,
125 bbil_messages_mutex_,
126 bbil_messages_events_,
128 bbil_messages_queue_,
134 proc_listener_maybe_queue(i->op,
146 proc_listener_maybe_queue(i->op,
160 listener->bbil_release_queue(flag);
164 BlackBoardNotifier::proc_listener_maybe_queue(
bool op,
168 unsigned int & events,
177 "listener %s for %s events (queued)",
178 op ?
"Registering" :
"Unregistering",
182 queue_listener(op, interface, listener, queue);
185 add_listener(interface, listener, map);
187 remove_listener(interface, listener, map);
202 BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
203 for (i = maps.
data.begin(); i != maps.
data.end(); ++i) {
204 proc_listener_maybe_queue(
false,
215 proc_listener_maybe_queue(
false,
218 bbil_messages_mutex_,
219 bbil_messages_events_,
221 bbil_messages_queue_,
225 for (i = maps.
reader.begin(); i != maps.
reader.end(); ++i) {
226 proc_listener_maybe_queue(
false,
236 for (i = maps.
writer.begin(); i != maps.
writer.end(); ++i) {
237 proc_listener_maybe_queue(
false,
247 listener->bbil_release_maps();
256 BlackBoardNotifier::add_listener(
Interface * interface,
260 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->
uid());
262 BBilMap::value_type v = std::make_pair(interface->
uid(), listener);
263 BBilMap::iterator f = std::find(ret.first, ret.second, v);
265 if (f == ret.second) {
266 ilmap.insert(std::make_pair(interface->
uid(), listener));
271 BlackBoardNotifier::remove_listener(Interface * interface,
272 BlackBoardInterfaceListener *listener,
275 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
276 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
277 if (j->second == listener) {
285 BlackBoardNotifier::is_in_queue(
bool op,
288 BlackBoardInterfaceListener *bbil)
290 BBilQueue::iterator q;
291 for (q = queue.begin(); q != queue.end(); ++q) {
292 if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
300 BlackBoardNotifier::queue_listener(
bool op,
301 Interface * interface,
302 BlackBoardInterfaceListener *listener,
305 BBilQueueEntry qe = {op, interface->uid(), interface, listener};
316 if (bbio_events_ > 0) {
317 bbio_queue_.push_back(std::make_pair(1, observer));
332 for (i = its->begin(); i != its->end(); ++i) {
333 bbiomap[i->first].push_back(make_pair(observer, i->second));
343 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
345 BBioMapIterator i, tmp;
348 while (i != iomap.end()) {
349 BBioListIterator j = i->second.begin();
350 while (j != i->second.end()) {
351 if (j->first == observer) {
352 j = i->second.erase(j);
357 if (i->second.empty()) {
376 if (bbio_events_ > 0) {
377 BBioQueueEntry e = std::make_pair((
unsigned int)0, observer);
378 BBioQueue::iterator re;
379 while ((re = find_if(bbio_queue_.begin(),
381 bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
382 != bbio_queue_.end()) {
384 if (re->second == observer) {
385 bbio_queue_.erase(re);
388 bbio_queue_.push_back(std::make_pair(0, observer));
391 remove_observer(bbio_created_, observer);
392 remove_observer(bbio_destroyed_, observer);
405 bbio_mutex_->unlock();
407 BBioMapIterator lhmi;
408 BBioListIterator i, l;
409 for (lhmi = bbio_created_.begin(); lhmi != bbio_created_.end(); ++lhmi) {
410 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
413 BBioList &list = lhmi->second;
414 for (i = list.begin(); i != list.end(); ++i) {
416 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
417 if (fnmatch(pi->c_str(),
id, 0) == 0) {
427 process_bbio_queue();
428 bbio_mutex_->unlock();
440 bbio_mutex_->unlock();
442 BBioMapIterator lhmi;
443 BBioListIterator i, l;
444 for (lhmi = bbio_destroyed_.begin(); lhmi != bbio_destroyed_.end(); ++lhmi) {
445 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
448 BBioList &list = (*lhmi).second;
449 for (i = list.begin(); i != list.end(); ++i) {
451 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
452 if (fnmatch(pi->c_str(),
id, 0) == 0) {
462 process_bbio_queue();
463 bbio_mutex_->unlock();
467 BlackBoardNotifier::process_bbio_queue()
469 if (!bbio_queue_.empty()) {
470 if (bbio_events_ > 0) {
473 while (!bbio_queue_.empty()) {
474 BBioQueueEntry &e = bbio_queue_.front();
476 add_observer(e.second, e.second->bbio_get_observed_create(), bbio_created_);
477 add_observer(e.second, e.second->bbio_get_observed_destroy(), bbio_destroyed_);
479 remove_observer(bbio_created_, e.second);
480 remove_observer(bbio_destroyed_, e.second);
482 bbio_queue_.pop_front();
496 unsigned int event_instance_serial)
throw()
498 bbil_writer_mutex_->lock();
499 bbil_writer_events_ += 1;
500 bbil_writer_mutex_->unlock();
502 const char * uid = interface->uid();
503 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
504 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
506 if (!is_in_queue(
false, bbil_writer_queue_, uid, bbil)) {
508 if (bbil_iface != NULL) {
512 "BBIL[%s] registered for writer events "
513 "(open) for '%s' but has no such interface",
520 bbil_writer_mutex_->lock();
521 bbil_writer_events_ -= 1;
522 process_writer_queue();
523 bbil_writer_mutex_->unlock();
533 unsigned int event_instance_serial)
throw()
535 bbil_writer_mutex_->lock();
536 bbil_writer_events_ += 1;
537 bbil_writer_mutex_->unlock();
539 const char * uid = interface->uid();
540 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
541 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
543 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
545 if (bbil_iface != NULL) {
549 "BBIL[%s] registered for writer events "
550 "(close) for '%s' but has no such interface",
557 bbil_writer_mutex_->lock();
558 bbil_writer_events_ -= 1;
559 process_writer_queue();
560 bbil_writer_mutex_->unlock();
564 BlackBoardNotifier::process_writer_queue()
566 if (!bbil_writer_queue_.empty()) {
567 if (bbil_writer_events_ > 0) {
570 while (!bbil_writer_queue_.empty()) {
571 BBilQueueEntry &e = bbil_writer_queue_.front();
573 add_listener(e.interface, e.listener, bbil_writer_);
575 remove_listener(e.interface, e.listener, bbil_writer_);
577 bbil_writer_queue_.pop_front();
590 unsigned int event_instance_serial)
throw()
592 bbil_reader_mutex_->lock();
593 bbil_reader_events_ += 1;
594 bbil_reader_mutex_->unlock();
596 const char * uid = interface->uid();
597 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
598 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
600 if (!is_in_queue(
false, bbil_reader_queue_, uid, bbil)) {
602 if (bbil_iface != NULL) {
606 "BBIL[%s] registered for reader events "
607 "(open) for '%s' but has no such interface",
614 bbil_reader_mutex_->lock();
615 bbil_reader_events_ -= 1;
616 process_reader_queue();
617 bbil_reader_mutex_->unlock();
627 unsigned int event_instance_serial)
throw()
629 bbil_reader_mutex_->lock();
630 bbil_reader_events_ += 1;
631 bbil_reader_mutex_->unlock();
633 const char * uid = interface->uid();
634 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
635 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
637 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
639 if (bbil_iface != NULL) {
643 "BBIL[%s] registered for reader events "
644 "(close) for '%s' but has no such interface",
651 bbil_reader_mutex_->lock();
652 bbil_reader_events_ -= 1;
653 process_reader_queue();
654 bbil_reader_mutex_->unlock();
658 BlackBoardNotifier::process_reader_queue()
660 if (!bbil_reader_queue_.empty()) {
661 if (bbil_reader_events_ > 0) {
664 while (!bbil_reader_queue_.empty()) {
665 BBilQueueEntry &e = bbil_reader_queue_.front();
667 add_listener(e.interface, e.listener, bbil_reader_);
669 remove_listener(e.interface, e.listener, bbil_reader_);
671 bbil_reader_queue_.pop_front();
689 bbil_data_mutex_->
lock();
690 bbil_data_events_ += 1;
691 bbil_data_mutex_->
unlock();
693 const char * uid = interface->
uid();
694 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_data_.equal_range(uid);
695 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
697 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
699 if (bbil_iface != NULL) {
703 "BBIL[%s] registered for data change events "
704 "for '%s' but has no such interface",
711 bbil_data_mutex_->
lock();
712 bbil_data_events_ -= 1;
713 if (!bbil_data_queue_.empty()) {
714 if (bbil_data_events_ == 0) {
715 while (!bbil_data_queue_.empty()) {
716 BBilQueueEntry &e = bbil_data_queue_.front();
718 add_listener(e.interface, e.listener, bbil_data_);
720 remove_listener(e.interface, e.listener, bbil_data_);
722 bbil_data_queue_.pop_front();
726 bbil_data_mutex_->
unlock();
742 bbil_messages_mutex_->
lock();
743 bbil_messages_events_ += 1;
744 bbil_messages_mutex_->
unlock();
748 const char * uid = interface->
uid();
749 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_messages_.equal_range(uid);
750 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
752 if (!is_in_queue(
false, bbil_messages_queue_, uid, bbil)) {
754 if (bbil_iface != NULL) {
762 "BBIL[%s] registered for message events "
763 "for '%s' but has no such interface",
770 bbil_messages_mutex_->
lock();
771 bbil_messages_events_ -= 1;
772 if (!bbil_messages_queue_.empty()) {
773 if (bbil_messages_events_ == 0) {
774 while (!bbil_messages_queue_.empty()) {
775 BBilQueueEntry &e = bbil_messages_queue_.front();
777 add_listener(e.interface, e.listener, bbil_messages_);
779 remove_listener(e.interface, e.listener, bbil_messages_);
781 bbil_messages_queue_.pop_front();
785 bbil_messages_mutex_->
unlock();
BlackBoard interface listener.
Interface * bbil_writer_interface(const char *iuid)
Get interface instance for given UID.
@ WRITER
Writer event entry.
@ READER
Reader event entry.
@ MESSAGES
Message received event entry.
@ DATA
Data changed event entry.
Interface * bbil_reader_interface(const char *iuid)
Get interface instance for given UID.
virtual void bb_interface_writer_removed(Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
virtual void bb_interface_reader_removed(Interface *interface, unsigned int instance_serial)
A reading instance has been closed for a watched interface.
Interface * bbil_data_interface(const char *iuid)
Get interface instance for given UID.
virtual void bb_interface_reader_added(Interface *interface, unsigned int instance_serial)
A reading instance has been opened for a watched interface.
virtual bool bb_interface_message_received(Interface *interface, Message *message)
BlackBoard message received notification.
const char * bbil_name() const
Get BBIL name.
std::list< QueueEntry > InterfaceQueue
Queue of additions/removal of interfaces.
virtual void bb_interface_writer_added(Interface *interface, unsigned int instance_serial)
A writing instance has been opened for a watched interface.
Interface * bbil_message_interface(const char *iuid)
Get interface instance for given UID.
virtual void bb_interface_data_changed(Interface *interface)
BlackBoard data changed notification.
BlackBoard interface observer.
ObservedInterfaceLockMap * bbio_get_observed_create()
Get interface creation type watch list.
ObservedInterfaceLockMap::iterator ObservedInterfaceLockMapIterator
Type for iterator of lockable interface type hash sets.
ObservedInterfaceLockMap * bbio_get_observed_destroy()
Get interface destriction type watch list.
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
virtual void bb_interface_destroyed(const char *type, const char *id)
BlackBoard interface destroyed notification.
BlackBoardNotifier()
Constructor.
void notify_of_reader_added(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been added.
void notify_of_data_change(const Interface *interface)
Notify of data change.
void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
void notify_of_writer_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been removed.
void notify_of_writer_added(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been added.
void notify_of_interface_destroyed(const char *type, const char *id)
Notify that an interface has been destroyed.
virtual ~BlackBoardNotifier()
Destructor.
void notify_of_interface_created(const char *type, const char *id)
Notify that an interface has been created.
void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
void register_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Register BB event listener.
void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
void notify_of_reader_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been removed.
void update_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Update BB event listener.
bool notify_of_message_received(const Interface *interface, Message *message)
Notify of message received Notify all subscribers of the given interface of an incoming message This ...
ListenerRegisterFlag
Flags to constrain listener registration/updates.
@ BBIL_FLAG_READER
consider reader events
@ BBIL_FLAG_DATA
consider data events
@ BBIL_FLAG_WRITER
consider writer events
@ BBIL_FLAG_MESSAGES
consider message received events
Base class for all Fawkes BlackBoard interfaces.
const char * uid() const
Get unique identifier of interface.
static void log_warn(const char *component, const char *format,...)
Log warning message.
void lock() const
Lock list.
void unlock() const
Unlock list.
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Mutex mutual exclusion lock.
void lock()
Lock this mutex.
void unlock()
Unlock the mutex.
Fawkes library namespace.
Structure to hold maps for active subscriptions.
InterfaceMap writer
Writer event subscriptions.
InterfaceMap messages
Message received event subscriptions.
InterfaceMap data
Data event subscriptions.
InterfaceMap reader
Reader event subscriptions.