SourceXtractorPlusPlus  0.15
Please provide a description of the project.
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1 
17 /*
18  * MultiThreadedMeasurement.cpp
19  *
20  * Created on: May 23, 2018
21  * Author: mschefer
22  */
23 
24 #include <chrono>
25 #include <ElementsKernel/Logging.h>
26 #include <csignal>
27 
30 
31 using namespace SourceXtractor;
32 
34 
35 
37  if (m_output_thread->joinable()) {
39  }
40 }
41 
43  m_output_thread = Euclid::make_unique<std::thread>(outputThreadStatic, this);
44 }
45 
47  m_input_done = true;
50  logger.debug() << "All worker threads done!";
51 }
52 
53 void
55  // Force computation of SourceID here, where the order is still deterministic
56  for (auto& source : *source_group) {
57  source.getProperty<SourceID>();
58  }
59 
60  // Put the new SourceGroup into the input queue
61  auto order_number = m_group_counter;
62  m_thread_pool->submit([this, order_number, source_group]() {
63  // Trigger measurements
64  for (auto& source : *source_group) {
65  m_source_to_row(source);
66  }
67  // Pass to the output thread
68  {
70  m_output_queue.emplace_back(order_number, source_group);
71  }
73  });
75 }
76 
78  logger.debug() << "Starting output thread";
79  try {
80  measurement->outputThreadLoop();
81  }
82  catch (const Elements::Exception& e) {
83  logger.fatal() << "Output thread got an exception!";
84  logger.fatal() << e.what();
85  if (!measurement->m_abort_raised.exchange(true)) {
86  logger.fatal() << "Aborting the execution";
87  ::raise(SIGTERM);
88  }
89  }
90  logger.debug() << "Stopping output thread";
91 }
92 
94  while (m_thread_pool->activeThreads() > 0) {
96 
97  // Wait for something in the output queue
98  if (m_output_queue.empty()) {
100  }
101 
102  // Process the output queue
103  while (!m_output_queue.empty()) {
104  notifyObservers(m_output_queue.front().second);
105  m_output_queue.pop_front();
106  }
107 
108  if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
109  m_output_queue.empty()) {
110  break;
111  }
112  }
113 }
static Logging getLogger(const std::string &name="")
void submit(Task task)
size_t running() const
size_t queued() const
size_t activeThreads() const
std::unique_ptr< std::thread > m_output_thread
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
static void outputThreadStatic(MultithreadedMeasurement *measurement)
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition: Observable.h:71
T join(T... args)
T joinable(T... args)
constexpr double e
static auto logger
Definition: WCS.cpp:44