IBSimu 1.0.4
|
00001 00005 /* Copyright (c) 2005-2009 Taneli Kalvas. All rights reserved. 00006 * 00007 * You can redistribute this software and/or modify it under the terms 00008 * of the GNU General Public License as published by the Free Software 00009 * Foundation; either version 2 of the License, or (at your option) 00010 * any later version. 00011 * 00012 * This library is distributed in the hope that it will be useful, but 00013 * WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00015 * General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with this library (file "COPYING" included in the package); 00019 * if not, write to the Free Software Foundation, Inc., 51 Franklin 00020 * Street, Fifth Floor, Boston, MA 02110-1301 USA 00021 * 00022 * If you have questions about your rights to use or distribute this 00023 * software, please contact Berkeley Lab's Technology Transfer 00024 * Department at TTD@lbl.gov. Other questions, comments and bug 00025 * reports should be sent directly to the author via email at 00026 * taneli.kalvas@jyu.fi. 00027 * 00028 * NOTICE. This software was developed under partial funding from the 00029 * U.S. Department of Energy. As such, the U.S. Government has been 00030 * granted for itself and others acting on its behalf a paid-up, 00031 * nonexclusive, irrevocable, worldwide license in the Software to 00032 * reproduce, prepare derivative works, and perform publicly and 00033 * display publicly. Beginning five (5) years after the date 00034 * permission to assert copyright is obtained from the U.S. Department 00035 * of Energy, and subject to any subsequent five (5) year renewals, 00036 * the U.S. Government is granted for itself and others acting on its 00037 * behalf a paid-up, nonexclusive, irrevocable, worldwide license in 00038 * the Software to reproduce, prepare derivative works, distribute 00039 * copies to the public, perform publicly and display publicly, and to 00040 * permit others to do so. 00041 */ 00042 00043 #ifndef SCHEDULER_HPP 00044 #define SCHEDULER_HPP 1 00045 00046 00047 #include <pthread.h> 00048 #include <iostream> 00049 #include <vector> 00050 #include <deque> 00051 //#include <sys/time.h> 00052 00053 00054 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER; 00055 00056 00083 template <class Solv, class Prob, class Err> 00084 class Scheduler { 00085 00086 class Consumer { 00087 00088 /* 00089 enum consumer_status_e { 00090 CONSUMER_CREATED = 0, 00091 CONSUMER_RUNNING, 00092 CONSUMER_FINISHED 00093 }; 00094 */ 00095 00096 //pthread_mutex_t _mutex; //!< \brief Mutex for active check 00097 pthread_t _thread; 00098 Solv *_solver; 00099 Scheduler *_scheduler; 00100 //struct timeval _t0; 00101 //std::vector<struct timeval> _t; 00102 00103 void *consumer_main( void ) { 00104 Prob *p; 00105 //struct timeval t; 00106 00107 //pthread_mutex_lock( &_mutex ); 00108 //_status = CONSUMER_RUNNING; 00109 //pthread_mutex_unlock( &_mutex ); 00110 00111 while( (p = _scheduler->get_next_problem()) ) { 00112 try { 00113 //gettimeofday( &t, NULL ); 00114 //_t.push_back( t ); 00115 (*_solver)( p, *_scheduler ); 00116 //gettimeofday( &t, NULL ); 00117 //_t.push_back( t ); 00118 } catch( Err e ) { 00119 //std::cout << "on_error\n"; 00120 // Handle error and stop solving 00121 _scheduler->on_error( e, p ); 00122 break; 00123 }; 00124 _scheduler->put_solved_problem( p ); 00125 } 00126 00127 //std::cout << "Exiting consumer\n"; 00128 //pthread_mutex_lock( &_mutex ); 00129 //_status = CONSUMER_FINISHED; 00130 //pthread_mutex_unlock( &_mutex ); 00131 return( NULL ); 00132 } 00133 00134 public: 00135 00136 static void *consumer_entry( void *data ) { 00137 Consumer *consumer = (Consumer *)data; 00138 return( consumer->consumer_main() ); 00139 } 00140 00141 Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 00142 00143 //pthread_mutex_init( &_mutex, NULL ); 00144 //std::cout << "Start\n"; 00145 //gettimeofday( &_t0, NULL ); 00146 } 00147 00148 ~Consumer() { 00149 //pthread_mutex_lock( &cout_mutex ); 00150 //std::cout << "End\n"; 00151 //for( size_t a = 0; a < _t.size(); a++ ) { 00152 //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 00153 //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n"; 00154 //a++; 00155 //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 00156 //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n"; 00157 //} 00158 //pthread_mutex_unlock( &cout_mutex ); 00159 } 00160 00161 void run( void ) { 00162 pthread_create( &_thread, NULL, consumer_entry, (void *)this ); 00163 } 00164 00165 void join( void ) { 00166 //pthread_mutex_lock( &_mutex ); 00167 //if( _status == CONSUMER_FINISHED ) { 00168 //pthread_mutex_unlock( &_mutex ); 00169 //return; 00170 //} else if( _status == CONSUMER_CREATED ) { 00171 // 00172 //} 00173 //pthread_mutex_unlock( &_mutex ); 00174 pthread_join( _thread, NULL ); 00175 } 00176 00177 }; 00178 00179 00180 pthread_mutex_t _mutex; 00181 pthread_cond_t _scheduler_cond; 00182 pthread_cond_t _producer_cond; 00183 pthread_cond_t _consumer_cond; 00184 00185 size_t _problems_in_c; 00186 size_t _problems_out_c; 00187 size_t _problems_err_c; 00188 std::deque<Prob*> _problems_in; 00189 std::deque<Prob*> _problems_out; 00190 00191 pthread_t _scheduler_thread; 00192 std::vector<Consumer *> _consumers; 00193 00194 bool _running; 00195 bool _error; 00196 bool _done; 00197 bool _finish; 00198 std::vector<Err> _err; 00199 std::vector<Prob *> _prob; 00200 00201 00207 void on_error( Err &e, Prob *p ) { 00208 pthread_mutex_lock( &_mutex ); 00209 _err.push_back( e ); 00210 _prob.push_back( p ); 00211 _problems_err_c++; 00212 _error = true; 00213 pthread_cond_broadcast( &_scheduler_cond ); 00214 pthread_mutex_unlock( &_mutex ); 00215 } 00216 00217 00218 Prob *get_next_problem( void ) { 00219 Prob *ret; 00220 pthread_mutex_lock( &_mutex ); 00221 00222 if( _done || _error ) { 00223 pthread_mutex_unlock( &_mutex ); 00224 return( NULL ); 00225 } 00226 00227 if( _problems_in.empty() ) { 00228 // Signal producer that problems are spent 00229 pthread_cond_signal( &_scheduler_cond ); 00230 while( _problems_in.empty() ) { 00231 // Wait for new problems 00232 pthread_cond_wait( &_consumer_cond, &_mutex ); 00233 if( _done || _error ) { 00234 pthread_mutex_unlock( &_mutex ); 00235 return( NULL ); 00236 } 00237 } 00238 } 00239 00240 // Return next problem 00241 ret = _problems_in.front(); 00242 _problems_in.pop_front(); 00243 pthread_mutex_unlock( &_mutex ); 00244 return( ret ); 00245 } 00246 00247 00248 void put_solved_problem( Prob *p ) { 00249 pthread_mutex_lock( &_mutex ); 00250 _problems_out_c++; 00251 //std::cout << "put_solved_problem(): " << _problems_out_c << "\n"; 00252 _problems_out.push_back( p ); 00253 pthread_mutex_unlock( &_mutex ); 00254 } 00255 00256 00257 void *scheduler_main( void ) { 00258 00259 // Moved from 00260 for( size_t a = 0; a < _consumers.size(); a++ ) 00261 _consumers[a]->run(); 00262 00263 pthread_mutex_lock( &_mutex ); 00264 00265 while( 1 ) { 00266 // Wait until all consumers are done with all problems or error occurs 00267 while( !(_problems_in.empty() || _done || _error) ) { 00268 //std::cout << "scheduler_main(): scheduler_cond wait 1\n"; 00269 pthread_cond_wait( &_scheduler_cond, &_mutex ); 00270 } 00271 00272 if( (_finish && _problems_in_c == _problems_out_c+_problems_err_c) || 00273 _done || _error ) 00274 break; 00275 00276 // Problems temporarily done 00277 pthread_cond_wait( &_scheduler_cond, &_mutex ); 00278 //std::cout << "scheduler_main(): prob_in = " << _problems_in_c 00279 //<< " prob_out = " << _problems_out_c << "\n"; 00280 //std::cout << "scheduler_main(): scheduler_cond wait 2\n"; 00281 00282 // Signal consumers to wake up 00283 pthread_cond_broadcast( &_consumer_cond ); 00284 } 00285 00286 // Broadcast done 00287 _done = true; 00288 pthread_cond_broadcast( &_consumer_cond ); 00289 pthread_mutex_unlock( &_mutex ); 00290 00291 // Join all consumers 00292 //std::cout << "scheduler_main(): Scheduler waiting in join\n"; 00293 for( size_t a = 0; a < _consumers.size(); a++ ) 00294 _consumers[a]->join(); 00295 00296 pthread_cond_broadcast( &_producer_cond ); 00297 //std::cout << "scheduler_main(): Exiting scheduler\n"; 00298 _running = false; 00299 return( NULL ); 00300 } 00301 00302 00303 00304 00305 static void *scheduler_entry( void *data ) { 00306 Scheduler *scheduler = (Scheduler *)data; 00307 return( scheduler->scheduler_main() ); 00308 } 00309 00310 00311 public: 00312 00313 00319 Scheduler( std::vector<Solv *> s ) 00320 : _problems_in_c(0), _problems_out_c(0), _problems_err_c(0), _running(false) { 00321 00322 pthread_mutex_init( &_mutex, NULL ); 00323 pthread_cond_init( &_scheduler_cond, NULL ); 00324 pthread_cond_init( &_consumer_cond, NULL ); 00325 pthread_cond_init( &_producer_cond, NULL ); 00326 00327 // Create consumer threads 00328 for( size_t a = 0; a < s.size(); a++ ) 00329 _consumers.push_back( new Consumer( s[a], this ) ); 00330 } 00331 00332 00335 ~Scheduler() { 00336 finish(); 00337 pthread_join( _scheduler_thread, NULL ); 00338 00339 pthread_mutex_destroy( &_mutex ); 00340 pthread_cond_destroy( &_scheduler_cond ); 00341 pthread_cond_destroy( &_consumer_cond ); 00342 pthread_cond_destroy( &_producer_cond ); 00343 00344 // Delete consumer threads 00345 for( size_t a = 0; a < _consumers.size(); a++ ) 00346 delete _consumers[a]; 00347 } 00348 00349 00355 template <class Cont> 00356 size_t get_solved_problems( Cont &c ) { 00357 pthread_mutex_lock( &_mutex ); 00358 size_t r = _problems_out.size(); 00359 while( !_problems_out.empty() ) { 00360 c.push_back( _problems_out.front() ); 00361 _problems_out.pop_front(); 00362 } 00363 pthread_mutex_unlock( &_mutex ); 00364 return( r ); 00365 } 00366 00367 00370 bool is_error( void ) { 00371 // No mutex needed for one bit read 00372 return( _error ); 00373 } 00374 00375 00378 bool is_running( void ) { 00379 // No mutex needed for one bit read 00380 return( _running ); 00381 } 00382 00383 00390 template <class Cont1, class Cont2> 00391 size_t get_errors( Cont1 &e, Cont2 &p ) { 00392 pthread_mutex_lock( &_mutex ); 00393 size_t r = _err.size(); 00394 for( size_t a = 0; a < _err.size(); a++ ) { 00395 e.push_back( _err[a] ); 00396 p.push_back( _prob[a] ); 00397 } 00398 _err.clear(); 00399 _prob.clear(); 00400 pthread_mutex_unlock( &_mutex ); 00401 return( r ); 00402 } 00403 00410 void run( void ) { 00411 00412 if( _running ) 00413 return; 00414 _running = true; 00415 _error = false; 00416 _done = false; 00417 _finish = false; 00418 _err.clear(); 00419 _prob.clear(); 00420 pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this ); 00421 } 00422 00423 00426 void add_problem( Prob *p ) { 00427 00428 pthread_mutex_lock( &_mutex ); 00429 _problems_in_c++; 00430 _problems_in.push_back( p ); 00431 pthread_cond_broadcast( &_scheduler_cond ); 00432 pthread_mutex_unlock( &_mutex ); 00433 } 00434 00435 00438 void add_problems( std::vector<Prob *> p ) { 00439 00440 pthread_mutex_lock( &_mutex ); 00441 _problems_in_c += p.size(); 00442 _problems_in.insert( _problems_in.end(), p.begin(), p.end() ); 00443 pthread_cond_broadcast( &_scheduler_cond ); 00444 pthread_mutex_unlock( &_mutex ); 00445 } 00446 00447 00453 bool finish( void ) { 00454 if( _finish ) 00455 return( true ); 00456 if( !_running ) 00457 return( false ); 00458 00459 pthread_mutex_lock( &_mutex ); 00460 _finish = true; 00461 //std::cout << "finish(): scheduler_cond broadcast\n"; 00462 pthread_cond_broadcast( &_scheduler_cond ); 00463 00464 //std::cout << "finish(): producer_cond wait\n"; 00465 pthread_cond_wait( &_producer_cond, &_mutex ); 00466 pthread_mutex_unlock( &_mutex ); 00467 00468 if( _error ) 00469 return( false ); 00470 return( true ); 00471 } 00472 00473 00474 friend class Consumer; 00475 }; 00476 00477 00478 00479 #endif 00480 00481 00482 00483 00484 00485 00486 00487 00488 00489 00490 00491 00492 00493 00494 00495 00496 00497 00498