Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * logreplay_thread.cpp - BB Log Replay Thread 00004 * 00005 * Created: Wed Feb 17 01:53:00 2010 00006 * Copyright 2010 Tim Niemueller [www.niemueller.de] 00007 * 2010 Masrur Doostdar <doostdar@kbsg.rwth-aachen.de> 00008 * 00009 ****************************************************************************/ 00010 00011 /* This program is free software; you can redistribute it and/or modify 00012 * it under the terms of the GNU General Public License as published by 00013 * the Free Software Foundation; either version 2 of the License, or 00014 * (at your option) any later version. 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL file in the doc directory. 00022 */ 00023 00024 #include "logreplay_thread.h" 00025 #include "file.h" 00026 00027 #include <blackboard/blackboard.h> 00028 #include <utils/logging/logger.h> 00029 #include <core/threading/wait_condition.h> 00030 #include <core/exceptions/system.h> 00031 #include <utils/misc/autofree.h> 00032 00033 #include <blackboard/internal/instance_factory.h> 00034 00035 00036 #include <memory> 00037 #include <cstring> 00038 #include <cstdlib> 00039 #include <cstdio> 00040 #include <cerrno> 00041 #include <fcntl.h> 00042 #ifdef __FreeBSD__ 00043 # include <sys/endian.h> 00044 #else 00045 # include <endian.h> 00046 #endif 00047 #include <arpa/inet.h> 00048 #include <sys/mman.h> 00049 00050 using namespace fawkes; 00051 00052 /** @class BBLogReplayThread "logreplay_thread.h" 00053 * BlackBoard log Replay thread. 00054 * Writes the data of the logfile into a blackboard interface, considering the 00055 * time-step differences between the data. 00056 * @author Masrur Doostdar 00057 * @author Tim Niemueller 00058 */ 00059 00060 /** Constructor. 00061 * @param logfile_name filename of the log to be replayed 00062 * @param logdir directory containing the logfile 00063 * @param scenario ID of the log scenario 00064 * @param grace_period time in seconds that desired offset and loop offset may 00065 * diverge to still write the new data 00066 * @param loop_replay specifies if the replay should be looped 00067 * @param non_blocking do not block the main loop if not enough time has elapsed 00068 * to replay new data but just wait for the next cycle. This is ignored in 00069 * continuous thread mode as it could cause busy waiting. 00070 * @param thread_name initial thread name 00071 * @param th_opmode thread operation mode 00072 */ 00073 BBLogReplayThread::BBLogReplayThread(const char *logfile_name, 00074 const char *logdir, 00075 const char *scenario, 00076 float grace_period, 00077 bool loop_replay, 00078 bool non_blocking, 00079 const char *thread_name, 00080 fawkes::Thread::OpMode th_opmode) 00081 : Thread(thread_name, th_opmode) 00082 { 00083 set_name("BBLogReplayThread(%s)", logfile_name); 00084 set_prepfin_conc_loop(true); 00085 00086 __logfile_name= strdup(logfile_name); 00087 __logdir = strdup(logdir); 00088 __scenario = strdup(scenario); // dont need this!? 00089 __filename = NULL; 00090 __cfg_grace_period = grace_period; 00091 __cfg_loop_replay = loop_replay; 00092 if (th_opmode == OPMODE_WAITFORWAKEUP) { 00093 __cfg_non_blocking = non_blocking; 00094 } else { 00095 // would cause busy waiting 00096 __cfg_non_blocking = false; 00097 } 00098 } 00099 00100 00101 /** Destructor. */ 00102 BBLogReplayThread::~BBLogReplayThread() 00103 { 00104 free(__logfile_name); 00105 free(__logdir); 00106 free(__scenario); 00107 } 00108 00109 00110 00111 00112 void 00113 BBLogReplayThread::init() 00114 { 00115 __logfile = NULL; 00116 __interface = NULL; 00117 __filename = NULL; 00118 00119 if (asprintf(&__filename, "%s/%s", __logdir, __logfile_name) == -1) { 00120 throw OutOfMemoryException("Cannot re-generate logfile-path"); 00121 } 00122 00123 try { 00124 __logfile = new BBLogFile(__filename, true); 00125 } catch (Exception &e) { 00126 finalize(); 00127 throw; 00128 } 00129 00130 if (! __logfile->has_next()) { 00131 finalize(); 00132 throw Exception("Log file %s does not have any entries", __filename); 00133 } 00134 00135 __interface = blackboard->open_for_writing(__logfile->interface_type(), 00136 __logfile->interface_id()); 00137 00138 try { 00139 __logfile->set_interface(__interface); 00140 } catch (Exception &e) { 00141 finalize(); 00142 throw; 00143 } 00144 00145 logger->log_info(name(), "Replaying from %s:", __filename); 00146 } 00147 00148 00149 void 00150 BBLogReplayThread::finalize() 00151 { 00152 delete __logfile; 00153 if (__filename) free(__filename); 00154 blackboard->close(__interface); 00155 } 00156 00157 00158 void 00159 BBLogReplayThread::once() 00160 { 00161 // Write first immediately, skip first offset 00162 __logfile->read_next(); 00163 __interface->write(); 00164 __last_offset = __logfile->entry_offset(); 00165 if (__logfile->has_next()) { 00166 __logfile->read_next(); 00167 __offsetdiff = __logfile->entry_offset() - __last_offset; 00168 __last_offset = __logfile->entry_offset(); 00169 } 00170 __last_loop.stamp(); 00171 } 00172 00173 void 00174 BBLogReplayThread::loop() 00175 { 00176 if (__logfile->has_next()) { 00177 00178 // check if there is time left to wait 00179 __now.stamp(); 00180 __loopdiff = __now - __last_loop; 00181 if ((__offsetdiff.in_sec() - __loopdiff.in_sec()) > __cfg_grace_period) { 00182 if (__cfg_non_blocking) { 00183 // need to keep waiting before posting, but in non-blocking mode 00184 // just wait for next loop 00185 return; 00186 } else { 00187 __waittime = __offsetdiff - __loopdiff; 00188 __waittime.wait(); 00189 } 00190 } 00191 00192 __interface->write(); 00193 __logfile->read_next(); 00194 00195 __last_loop.stamp(); 00196 __offsetdiff = __logfile->entry_offset() - __last_offset; 00197 __last_offset = __logfile->entry_offset(); 00198 00199 } else { 00200 if(__cfg_loop_replay){ 00201 logger->log_info(name(), "replay finished, looping"); 00202 __logfile->rewind(); 00203 } else { 00204 if (opmode() == OPMODE_CONTINUOUS) { 00205 // block 00206 logger->log_info(name(), "replay finished, sleeping"); 00207 WaitCondition waitcond; 00208 waitcond.wait(); 00209 } // else wait will just run once per loop 00210 } 00211 } 00212 }