Fawkes API  Fawkes Development Version
log_thread.cpp
1 
2 /***************************************************************************
3  * log_thread.cpp - BB Logger Thread
4  *
5  * Created: Sun Nov 08 00:02:09 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "log_thread.h"
24 
25 #include "file.h"
26 
27 #include <blackboard/blackboard.h>
28 #include <core/exceptions/system.h>
29 #include <interfaces/SwitchInterface.h>
30 #include <logging/logger.h>
31 
32 #include <cerrno>
33 #include <cstdio>
34 #include <cstdlib>
35 #include <cstring>
36 #include <fcntl.h>
37 #include <memory>
38 #ifdef __FreeBSD__
39 # include <sys/endian.h>
40 #elif defined(__MACH__) && defined(__APPLE__)
41 # include <sys/_endian.h>
42 #else
43 # include <endian.h>
44 #endif
45 #include <arpa/inet.h>
46 #include <sys/mman.h>
47 #include <sys/stat.h>
48 
49 using namespace fawkes;
50 
51 /** @class BBLoggerThread "log_thread.h"
52  * BlackBoard logger thread.
53  * One instance of this thread handles logging of one specific interface.
54  * The plugin will spawn as many threads as there are interfaces to log. This
55  * allows for maximum concurrency of the writers and avoids a serialization
56  * bottle neck.
57  * The log thread can operate in buffering mode. If this mode is disabled, the
58  * data is written to the file within the blackboard data changed event, and
59  * thus the writing operation can slow down the overall system, but memory
60  * requirements are low. This is useful if a lot of data is written or if the
61  * storage device is slow. If the mode is enabled, during the event the BB data
62  * will be copied into another memory segment and the thread will be woken up.
63  * Once the thread is running it stores all of the BB data segments bufferd
64  * up to then.
65  * The interface listener listens for events for a particular interface and
66  * then writes the changes to the file.
67  * @author Tim Niemueller
68  */
69 
70 /** Constructor.
71  * @param iface_uid interface UID which to log
72  * @param logdir directory to store config files, must exist
73  * @param buffering enable log buffering?
74  * @param flushing true to flush after each written chunk
75  * @param scenario ID of the log scenario
76  * @param start_time time to use as start time for the log
77  */
78 BBLoggerThread::BBLoggerThread(const char * iface_uid,
79  const char * logdir,
80  bool buffering,
81  bool flushing,
82  const char * scenario,
83  fawkes::Time *start_time)
84 : Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP),
85  BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid)
86 {
88  set_name("BBLoggerThread(%s)", iface_uid);
89 
90  buffering_ = buffering;
91  flushing_ = flushing;
92  uid_ = strdup(iface_uid);
93  logdir_ = strdup(logdir);
94  scenario_ = strdup(scenario);
95  start_ = new Time(start_time);
96  filename_ = NULL;
97  queue_mutex_ = new Mutex();
98  data_size_ = 0;
99  is_master_ = false;
100  enabled_ = true;
101 
102  now_ = NULL;
103 
104  // Parse UID
105  Interface::parse_uid(uid_, type_, id_);
106 
107  char date[21];
108  Time now;
109  struct tm *tmp = localtime(&(now.get_timeval()->tv_sec));
110  strftime(date, 21, "%F-%H-%M-%S", tmp);
111 
112  if (asprintf(
113  &filename_, "%s/%s-%s-%s-%s.log", LOGDIR, scenario_, type_.c_str(), id_.c_str(), date)
114  == -1) {
115  throw OutOfMemoryException("Cannot generate log name");
116  }
117 }
118 
119 /** Destructor. */
121 {
122  free(uid_);
123  free(logdir_);
124  free(scenario_);
125  free(filename_);
126  delete queue_mutex_;
127  delete start_;
128 }
129 
130 void
132 {
133  queues_[0].clear();
134  queues_[1].clear();
135  act_queue_ = 0;
136 
137  queue_mutex_ = new Mutex();
138  data_size_ = 0;
139 
140  now_ = NULL;
141  num_data_items_ = 0;
142  session_start_ = 0;
143 
144  // use open because fopen does not provide O_CREAT | O_EXCL
145  // open read/write because of usage of mmap
146  mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
147  int fd = open(filename_, O_RDWR | O_CREAT | O_EXCL, m);
148  if (!fd) {
149  throw CouldNotOpenFileException(filename_, errno, "Failed to open log 1");
150  } else {
151  f_data_ = fdopen(fd, "w+");
152  if (!f_data_) {
153  throw CouldNotOpenFileException(filename_, errno, "Failed to open log 2");
154  }
155  }
156 
157  try {
158  iface_ = blackboard->open_for_reading(type_.c_str(), id_.c_str());
159  data_size_ = iface_->datasize();
160  } catch (Exception &e) {
161  fclose(f_data_);
162  throw;
163  }
164 
165  try {
166  write_header();
167  } catch (FileWriteException &e) {
168  blackboard->close(iface_);
169  fclose(f_data_);
170  throw;
171  }
172 
173  now_ = new Time(clock);
174 
175  if (is_master_) {
176  try {
177  switch_if_ = blackboard->open_for_writing<SwitchInterface>("BBLogger");
178  switch_if_->set_enabled(enabled_);
179  switch_if_->write();
180  bbil_add_message_interface(switch_if_);
181  } catch (Exception &e) {
182  fclose(f_data_);
183  throw;
184  }
185  }
186 
187  bbil_add_data_interface(iface_);
189 
191 
192  logger->log_info(
193  name(), "Logging %s to %s%s", iface_->uid(), filename_, is_master_ ? " as master" : "");
194 }
195 
196 void
198 {
200  if (is_master_) {
201  blackboard->close(switch_if_);
202  }
203  update_header();
204  fclose(f_data_);
205  for (unsigned int q = 0; q < 2; ++q) {
206  while (!queues_[q].empty()) {
207  void *t = queues_[q].front();
208  free(t);
209  queues_[q].pop();
210  }
211  }
212  delete now_;
213  now_ = NULL;
214 }
215 
216 /** Get filename.
217  * @return file name, valid after object instantiated, but before init() does not
218  * mean that the file has been or can actually be opened
219  */
220 const char *
222 {
223  return filename_;
224 }
225 
226 /** Enable or disable logging.
227  * @param enabled true to enable logging, false to disable
228  */
229 void
231 {
232  if (enabled && !enabled_) {
233  logger->log_info(name(), "Logging enabled");
234  session_start_ = num_data_items_;
235  } else if (!enabled && enabled_) {
236  logger->log_info(name(),
237  "Logging disabled (wrote %u entries), flushing",
238  (num_data_items_ - session_start_));
239  update_header();
240  fflush(f_data_);
241  }
242 
243  enabled_ = enabled;
244 }
245 
246 /** Set threadlist and master status.
247  * This copies the thread list and sets this thread as master thread.
248  * If you intend to use this method you must do so before the thread is
249  * initialized. You may only ever declare one thread as master.
250  * @param thread_list list of threads to notify on enable/disable events
251  */
252 void
254 {
255  is_master_ = true;
256  threads_ = thread_list;
257 }
258 
259 void
260 BBLoggerThread::write_header()
261 {
262  bblog_file_header header;
263  memset(&header, 0, sizeof(header));
264  header.file_magic = htonl(BBLOGGER_FILE_MAGIC);
265  header.file_version = htonl(BBLOGGER_FILE_VERSION);
266 #if BYTE_ORDER_ == BIG_ENDIAN_
267  header.endianess = BBLOG_BIG_ENDIAN;
268 #else
269  header.endianess = BBLOG_LITTLE_ENDIAN;
270 #endif
271  header.num_data_items = num_data_items_;
272  strncpy(header.scenario, (const char *)scenario_, BBLOG_SCENARIO_SIZE - 1);
273  strncpy(header.interface_type, iface_->type(), BBLOG_INTERFACE_TYPE_SIZE - 1);
274  strncpy(header.interface_id, iface_->id(), BBLOG_INTERFACE_ID_SIZE - 1);
275  memcpy(header.interface_hash, iface_->hash(), BBLOG_INTERFACE_HASH_SIZE);
276  header.data_size = iface_->datasize();
277  long start_time_sec, start_time_usec;
278  start_->get_timestamp(start_time_sec, start_time_usec);
279  header.start_time_sec = start_time_sec;
280  header.start_time_usec = start_time_usec;
281  if (fwrite(&header, sizeof(header), 1, f_data_) != 1) {
282  throw FileWriteException(filename_, "Failed to write header");
283  }
284  fflush(f_data_);
285 }
286 
287 /** Updates the num_data_items field in the header. */
288 void
289 BBLoggerThread::update_header()
290 {
291  // write updated num_data_items field
292 #if _POSIX_MAPPED_FILES
293  void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED, fileno(f_data_), 0);
294  if (h == MAP_FAILED) {
295  logger->log_warn(name(),
296  "Failed to mmap log (%s), "
297  "not updating number of data items",
298  strerror(errno));
299  } else {
300  bblog_file_header *header = (bblog_file_header *)h;
301  header->num_data_items = num_data_items_;
302  munmap(h, sizeof(bblog_file_header));
303  }
304 #else
305  logger->log_warn(name(),
306  "Memory mapped files not available, "
307  "not updating number of data items on close");
308 #endif
309 }
310 
311 void
312 BBLoggerThread::write_chunk(const void *chunk)
313 {
314  bblog_entry_header ehead;
315  now_->stamp();
316  Time d = *now_ - *start_;
317  long rel_time_sec, rel_time_usec;
318  d.get_timestamp(rel_time_sec, rel_time_usec);
319  ehead.rel_time_sec = rel_time_sec;
320  ehead.rel_time_usec = rel_time_usec;
321  if ((fwrite(&ehead, sizeof(ehead), 1, f_data_) == 1)
322  && (fwrite(chunk, data_size_, 1, f_data_) == 1)) {
323  if (flushing_)
324  fflush(f_data_);
325  num_data_items_ += 1;
326  } else {
327  logger->log_warn(name(), "Failed to write chunk");
328  }
329 }
330 
331 void
333 {
334  unsigned int write_queue = act_queue_;
335  queue_mutex_->lock();
336  act_queue_ = 1 - act_queue_;
337  queue_mutex_->unlock();
338  LockQueue<void *> &queue = queues_[write_queue];
339  //logger->log_debug(name(), "Writing %zu entries", queue.size());
340  while (!queue.empty()) {
341  void *c = queue.front();
342  write_chunk(c);
343  free(c);
344  queue.pop();
345  }
346 }
347 
348 bool
350 {
353 
354  bool enabled = true;
355  if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) {
356  enabled = true;
357  } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) {
358  enabled = false;
359  } else {
360  logger->log_debug(name(),
361  "Unhandled message type: %s via %s",
362  message->type(),
363  interface->uid());
364  }
365 
366  for (ThreadList::iterator i = threads_.begin(); i != threads_.end(); ++i) {
367  BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i);
368  bblt->set_enabled(enabled);
369  }
370 
371  switch_if_->set_enabled(enabled_);
372  switch_if_->write();
373 
374  return false;
375 }
376 
377 void
379 {
380  if (!enabled_)
381  return;
382 
383  try {
384  iface_->read();
385 
386  if (buffering_) {
387  void *c = malloc(iface_->datasize());
388  memcpy(c, iface_->datachunk(), iface_->datasize());
389  queue_mutex_->lock();
390  queues_[act_queue_].push_locked(c);
391  queue_mutex_->unlock();
392  wakeup();
393  } else {
394  queue_mutex_->lock();
395  write_chunk(iface_->datachunk());
396  queue_mutex_->unlock();
397  }
398 
399  } catch (Exception &e) {
400  logger->log_error(name(), "Exception when data changed");
401  logger->log_error(name(), e);
402  }
403 }
404 
405 void
407  unsigned int instance_serial) throw()
408 {
409  session_start_ = num_data_items_;
410 }
411 
412 void
414  unsigned int instance_serial) throw()
415 {
416  logger->log_info(name(),
417  "Writer removed (wrote %u entries), flushing",
418  (num_data_items_ - session_start_));
419  update_header();
420  fflush(f_data_);
421 }
BlackBoard logger thread.
Definition: log_thread.h:51
virtual void bb_interface_writer_added(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been opened for a watched interface.
Definition: log_thread.cpp:406
virtual ~BBLoggerThread()
Destructor.
Definition: log_thread.cpp:120
virtual void init()
Initialize the thread.
Definition: log_thread.cpp:131
virtual void bb_interface_data_changed(fawkes::Interface *interface)
BlackBoard data changed notification.
Definition: log_thread.cpp:378
const char * get_filename() const
Get filename.
Definition: log_thread.cpp:221
virtual void bb_interface_writer_removed(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
Definition: log_thread.cpp:413
virtual void finalize()
Finalize the thread.
Definition: log_thread.cpp:197
virtual bool bb_interface_message_received(fawkes::Interface *interface, fawkes::Message *message)
BlackBoard message received notification.
Definition: log_thread.cpp:349
void set_threadlist(fawkes::ThreadList &thread_list)
Set threadlist and master status.
Definition: log_thread.cpp:253
virtual void loop()
Code to execute in the thread.
Definition: log_thread.cpp:332
BBLoggerThread(const char *iface_uid, const char *logdir, bool buffering, bool flushing, const char *scenario, fawkes::Time *start_time)
Constructor.
Definition: log_thread.cpp:78
void set_enabled(bool enabled)
Enable or disable logging.
Definition: log_thread.cpp:230
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
BlackBoard interface listener.
void bbil_add_message_interface(Interface *interface)
Add an interface to the message received watch list.
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
File could not be opened.
Definition: system.h:53
Base class for exceptions in Fawkes.
Definition: exception.h:36
Could not write to file.
Definition: system.h:69
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:79
const char * type() const
Get type of interface.
Definition: interface.cpp:643
const unsigned char * hash() const
Get interface hash.
Definition: interface.cpp:298
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:494
const char * id() const
Get identifier of interface.
Definition: interface.cpp:652
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:677
unsigned int datasize() const
Get data size.
Definition: interface.cpp:531
void clear()
Clear the queue.
Definition: lock_queue.h:153
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:45
virtual void log_info(const char *component, const char *format,...)
Log informational message.
Definition: multi.cpp:195
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
virtual void log_error(const char *component, const char *format,...)
Log error message.
Definition: multi.cpp:237
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
DisableSwitchMessage Fawkes BlackBoard Interface Message.
EnableSwitchMessage Fawkes BlackBoard Interface Message.
SwitchInterface Fawkes BlackBoard Interface.
void set_enabled(const bool new_enabled)
Set enabled value.
List of threads.
Definition: thread_list.h:56
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
Definition: thread.cpp:729
A class for handling time.
Definition: time.h:93
void get_timestamp(long &sec, long &usec) const
Get time stamp.
Definition: time.h:137
Time & stamp()
Set this time to the current time.
Definition: time.cpp:704
const timeval * get_timeval() const
Obtain the timeval where the time is stored.
Definition: time.h:112
Fawkes library namespace.
BBLogger entry header.
Definition: file.h:76
uint32_t rel_time_usec
time since start time, microseconds
Definition: file.h:78
uint32_t rel_time_sec
time since start time, seconds
Definition: file.h:77
BBLogger file header definition.
Definition: file.h:53
char interface_type[BBLOG_INTERFACE_TYPE_SIZE]
Interface type.
Definition: file.h:64
char scenario[BBLOG_SCENARIO_SIZE]
Scenario as defined in config.
Definition: file.h:62
uint64_t start_time_sec
Start time, timestamp seconds.
Definition: file.h:68
uint32_t data_size
size of one interface data block
Definition: file.h:67
char interface_id[BBLOG_INTERFACE_ID_SIZE]
Interface ID.
Definition: file.h:65
uint64_t start_time_usec
Start time, timestamp microseconds.
Definition: file.h:69
uint32_t endianess
Endianess, 0 little endian, 1 big endian.
Definition: file.h:58
uint32_t file_version
File version, set to BBLOGGER_FILE_VERSION on write and verify on read (big endian)
Definition: file.h:56
uint32_t num_data_items
Number of data items in file, if set to zero reader must scan the file for this number.
Definition: file.h:60
unsigned char interface_hash[BBLOG_INTERFACE_HASH_SIZE]
Interface Hash.
Definition: file.h:66
uint32_t file_magic
Magic value to identify file, must be 0xFFBBFFBB (big endian)
Definition: file.h:54