23 #include "mongodb_log_bb_thread.h"
25 #include <core/threading/mutex_locker.h>
26 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
30 #include <mongocxx/client.hpp>
31 #include <mongocxx/exception/operation_exception.hpp>
33 using namespace mongocxx;
66 std::vector<std::string> includes;
76 if (includes.empty()) {
77 includes.push_back(
"*");
80 std::vector<std::string>::iterator i;
81 std::vector<std::string>::iterator e;
82 for (i = includes.begin(); i != includes.end(); ++i) {
85 std::list<Interface *> current_interfaces =
88 std::list<Interface *>::iterator i;
89 for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
91 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
92 if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
105 listeners_[(*i)->uid()] =
new InterfaceListener(
118 std::map<std::string, InterfaceListener *>::iterator i;
119 for (i = listeners_.begin(); i != listeners_.end(); ++i) {
120 client *mc = i->second->mongodb_client();
138 std::vector<std::string>::iterator e;
139 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
140 if (fnmatch(e->c_str(),
id, 0) != FNM_NOMATCH) {
141 logger->
log_debug(name(),
"Ignoring excluded interface '%s::%s'", type,
id);
147 Interface *
interface = blackboard->open_for_reading(type, id);
148 if (listeners_.find(interface->uid()) == listeners_.end()) {
149 logger->
log_debug(name(),
"Opening new %s", interface->uid());
150 client * mc = mongodb_connmgr->create_client();
152 listeners_[interface->uid()] =
new InterfaceListener(
153 blackboard, interface, mc, database_, collections_, agent_name, logger, now_);
155 logger->
log_warn(name(),
"Interface %s already opened", interface->uid());
156 blackboard->
close(interface);
159 logger->
log_warn(name(),
"Failed to open interface %s::%s, exception follows", type,
id);
174 MongoLogBlackboardThread::InterfaceListener::InterfaceListener(
BlackBoard * blackboard,
177 std::string & database,
179 const std::string & agent_name,
185 agent_name_(agent_name)
188 interface_ = interface;
194 std::string
id = interface->
id();
196 while ((pos =
id.find_first_of(
" -", pos)) != std::string::npos) {
197 id.replace(pos, 1,
"_");
200 collection_ = std::string(interface->
type()) +
"." + id;
201 if (collections_.find(collection_) != collections_.end()) {
202 throw Exception(
"Collection named %s already used, cannot log %s",
207 bbil_add_data_interface(interface);
208 blackboard_->register_listener(
this, BlackBoard::BBIL_FLAG_DATA);
212 MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
214 blackboard_->unregister_listener(
this);
218 MongoLogBlackboardThread::InterfaceListener::bb_interface_data_changed(
Interface *interface)
throw()
225 using namespace bsoncxx::builder;
226 basic::document document;
227 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(now_->in_msec())));
231 bool is_array = (length > 1);
238 document.append(basic::kvp(key, [bools, length](basic::sub_array subarray) {
239 for (
size_t l = 0; l < length; ++l) {
240 subarray.append(bools[l]);
244 document.append(basic::kvp(key, i.
get_bool()));
251 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
252 for (
size_t l = 0; l < length; ++l) {
253 subarray.append(ints[l]);
257 document.append(basic::kvp(key, i.
get_int8()));
264 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
265 for (
size_t l = 0; l < length; ++l) {
266 subarray.append(ints[l]);
270 document.append(basic::kvp(key, i.
get_uint8()));
277 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
278 for (
size_t l = 0; l < length; ++l) {
279 subarray.append(ints[l]);
283 document.append(basic::kvp(key, i.
get_int16()));
290 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
291 for (
size_t l = 0; l < length; ++l) {
292 subarray.append(ints[l]);
296 document.append(basic::kvp(key, i.
get_uint16()));
303 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
304 for (
size_t l = 0; l < length; ++l) {
305 subarray.append(ints[l]);
309 document.append(basic::kvp(key, i.
get_int32()));
316 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
317 for (
size_t l = 0; l < length; ++l) {
318 subarray.append(
static_cast<int64_t
>(ints[l]));
322 document.append(basic::kvp(key,
static_cast<int64_t
>(i.
get_uint32())));
329 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
330 for (
size_t l = 0; l < length; ++l) {
331 subarray.append(ints[l]);
335 document.append(basic::kvp(key, i.
get_int64()));
342 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
343 for (
size_t l = 0; l < length; ++l) {
344 subarray.append(
static_cast<int64_t
>(ints[l]));
348 document.append(basic::kvp(key,
static_cast<int64_t
>(i.
get_uint64())));
355 document.append(basic::kvp(key, [floats, length](basic::sub_array subarray) {
356 for (
size_t l = 0; l < length; ++l) {
357 subarray.append(floats[l]);
361 document.append(basic::kvp(key, i.
get_float()));
368 document.append(basic::kvp(key, [doubles, length](basic::sub_array subarray) {
369 for (
size_t l = 0; l < length; ++l) {
370 subarray.append(doubles[l]);
374 document.append(basic::kvp(key, i.
get_double()));
383 document.append(basic::kvp(key, [bytes, length](basic::sub_array subarray) {
384 for (
size_t l = 0; l < length; ++l) {
385 subarray.append(bytes[l]);
389 document.append(basic::kvp(key, i.
get_byte()));
396 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
397 for (
size_t l = 0; l < length; ++l) {
398 subarray.append(ints[l]);
402 document.append(basic::kvp(key, i.
get_enum()));
408 document.append(basic::kvp(
"agent-name", agent_name_));
409 mongodb_->database(database_)[collection_].insert_one(document.view());
410 }
catch (operation_exception &e) {
412 bbil_name(),
"Failed to log to %s.%s: %s", database_.c_str(), collection_.c_str(), e.what());
413 }
catch (std::exception &e) {
414 logger_->log_warn(bbil_name(),
415 "Failed to log to %s.%s: %s (*)",
virtual void finalize()
Finalize the thread.
virtual ~MongoLogBlackboardThread()
Destructor.
virtual void init()
Initialize the thread.
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
virtual void loop()
Code to execute in the thread.
MongoLogBlackboardThread()
Constructor.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
BlackBoard interface listener.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
The BlackBoard abstract class.
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Interface field iterator.
float get_float(unsigned int index=0) const
Get value of current field as float.
int16_t get_int16(unsigned int index=0) const
Get value of current field as integer.
int8_t get_int8(unsigned int index=0) const
Get value of current field as integer.
int8_t * get_int8s() const
Get value of current field as integer array.
float * get_floats() const
Get value of current field as float array.
int32_t get_int32(unsigned int index=0) const
Get value of current field as integer.
uint8_t * get_bytes() const
Get value of current field as byte array.
size_t get_length() const
Get length of current field.
int32_t * get_int32s() const
Get value of current field as integer array.
int64_t get_int64(unsigned int index=0) const
Get value of current field as integer.
uint64_t get_uint64(unsigned int index=0) const
Get value of current field as unsigned integer.
int32_t get_enum(unsigned int index=0) const
Get value of current enum field as integer.
uint16_t get_uint16(unsigned int index=0) const
Get value of current field as unsigned integer.
double get_double(unsigned int index=0) const
Get value of current field as double.
int32_t * get_enums() const
Get value of current enum field as integer array.
int64_t * get_int64s() const
Get value of current field as integer array.
uint64_t * get_uint64s() const
Get value of current field as unsigned integer array.
uint32_t get_uint32(unsigned int index=0) const
Get value of current field as unsigned integer.
interface_fieldtype_t get_type() const
Get type of current field.
const char * get_name() const
Get name of current field.
uint8_t * get_uint8s() const
Get value of current field as unsigned integer array.
uint16_t * get_uint16s() const
Get value of current field as unsigned integer array.
const char * get_string() const
Get value of current field as string.
uint8_t get_byte(unsigned int index=0) const
Get value of current field as byte.
bool * get_bools() const
Get value of current field as bool array.
uint8_t get_uint8(unsigned int index=0) const
Get value of current field as unsigned integer.
bool get_bool(unsigned int index=0) const
Get value of current field as bool.
uint32_t * get_uint32s() const
Get value of current field as unsigned integer array.
double * get_doubles() const
Get value of current field as double array.
int16_t * get_int16s() const
Get value of current field as integer array.
Base class for all Fawkes BlackBoard interfaces.
const char * type() const
Get type of interface.
InterfaceFieldIterator fields_end()
Invalid iterator.
const char * id() const
Get identifier of interface.
InterfaceFieldIterator fields()
Get iterator over all fields of this interface instance.
const char * uid() const
Get unique identifier of interface.
void read()
Read from BlackBoard into local copy.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to access MongoDB.
MongoDBConnCreator * mongodb_connmgr
Connection manager to retrieve more client connections from if necessary.
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Thread class encapsulation of pthreads.
const char * name() const
Get name of thread.
A class for handling time.
Fawkes library namespace.
@ IFT_INT8
8 bit integer field
@ IFT_UINT32
32 bit unsigned integer field
@ IFT_BYTE
byte field, alias for uint8
@ IFT_UINT64
64 bit unsigned integer field
@ IFT_UINT16
16 bit unsigned integer field
@ IFT_INT32
32 bit integer field
@ IFT_INT64
64 bit integer field
@ IFT_INT16
16 bit integer field
@ IFT_ENUM
field with interface specific enum type
@ IFT_UINT8
8 bit unsigned integer field