Pottu
Pipeline.hpp
Go to the documentation of this file.
1 
7 #ifndef H_POTTU_PIPELINE
8 #define H_POTTU_PIPELINE
9 
10 #include "DataSource.hpp"
11 #include "StageRawToStampedRaw.hpp"
13 #include "StageDetectorEvent.hpp"
16 #include "ContextBase.hpp"
17 
18 #if POTTU_ENABLE_MULTITHREADING == 1
19 #include <taskflow/taskflow.hpp>
20 #include <taskflow/algorithm/pipeline.hpp>
21 #endif
22 
23 #if POTTU_ROOT_SUPPORT == 1
24 #include <TROOT.h>
25 #endif
26 
27 #include <vector>
28 #include <memory>
29 #include <variant>
30 #include <thread>
31 
32 //#include <iostream>
33 
34 
35 namespace pottu {
36 
66  class Pipeline {
67  public:
68 
69 
76  void setSource( DataSource *stage ) noexcept {
77  _source = stage;
78  _builtPipeline.reset();
79  }
80 
81 
88  void setTimeStamper( StageRawToStampedRaw *stage ) noexcept {
89  _stamper = stage;
90  _builtPipeline.reset();
91  }
92 
93 
101  _deteventgen = stage;
102  _builtPipeline.reset();
103  }
104 
105 
113  void add( StageDetectorEvent *stage ) noexcept {
114  _stagesRest.emplace_back( stage );
115  _builtPipeline.reset();
116  }
117 
118 
126  void add( StageDetectorEventToDetectorEvent *stage ) noexcept {
127  _stagesRest.emplace_back( stage );
128  _builtPipeline.reset();
129  }
130 
131 
137  void add( StageDetectorEventFinal *stage ) noexcept {
138  _stagesRest.emplace_back( stage );
139  _builtPipeline.reset();
140  }
141 
142 
143 
159  void run( std::size_t num_threads = 1 ) {
160  if( !_source || !_stamper || !_deteventgen )
161  throw std::runtime_error( "Pipeline: Source, stamper or detector event generator is not set!" );
162 
163  if( num_threads == 0 ) {
164  num_threads = std::thread::hardware_concurrency();
165  if( num_threads == 0 )
166  num_threads = 4;
167  }
168 
169 #if POTTU_ENABLE_MULTITHREADING != 1
170  if( num_threads > 1 )
171  throw std::runtime_error( "Asking more than one thread but no multithreading enabled."
172  "Add: target_compile_definitions( yoursort PRIVATE POTTU_ENABLE_MULTITHREADING=1 )"
173  "into the CMakeLists.txt file.");
174 #endif
175 
176  if( !_builtPipeline || _builtPipeline->lineBuffers.size() != num_threads ) {
177  if( num_threads == 1 )
178  _buildSingleThreadedPipeline();
179  else
180  _buildMultiThreadedPipeline( num_threads );
181  }
182 
183  if( num_threads == 1 ) {
184  _runSingleThreaded();
185  } else {
186 #if POTTU_ENABLE_MULTITHREADING == 1
187  ContextBase::getActive().logInfo( "!!! Running now multithreaded pipeline!" );
188 
189  //_builtPipeline->taskflow.dump(std::cout);
190 
191  _builtPipeline->executor.run(_builtPipeline->taskflow).wait();
192 #endif
193  }
194  }
195 
196 
197  void _buildSingleThreadedPipeline() {
198  _builtPipeline.reset();
199  _builtPipeline = std::move( std::make_unique<BuiltPipeline>() );
200  _builtPipeline->lineBuffers.resize(1);
201 
202  auto &detevents = _builtPipeline->lineBuffers[0].detevents;
203  // Counting the number of stages that has separate output
204  std::size_t sepOutN{0};
205  for( auto &vstage : _stagesRest ) {
206  if( vstage.index() == 1 || vstage.index() == 2 )
207  ++sepOutN;
208  }
209  detevents.resize( sepOutN+1 );
210 
212  fmt::format( "Singlethreaded pipeline created with {} stages and {} buffers.",
213  3+_stagesRest.size(), 3+detevents.size() ) );
214  }
215 
216 
217  void _buildMultiThreadedPipeline( std::size_t num_lines ) {
218 
219 #if POTTU_ROOT_SUPPORT == 1
220  ROOT::EnableThreadSafety();
221 #endif
222 
223 #if POTTU_ENABLE_MULTITHREADING == 1
224  _builtPipeline.reset();
225  _builtPipeline = std::move( std::make_unique<BuiltPipeline>() );
226  _builtPipeline->lineBuffers.reserve(num_lines);
227  _builtPipeline->lineBuffers.resize(1);
228 
229  // Creating the buffers for the rest of the stages for _the first line_.
230  // Counting the number of stages that has separate output
231  std::size_t sepOutN{0};
232  for( auto &vstage : _stagesRest ) {
233  if( vstage.index() == 1 || vstage.index() == 2 )
234  ++sepOutN;
235  }
236  _builtPipeline->lineBuffers[0].detevents.resize( sepOutN+1 );
237 
238  // Copying the above buffers for the rest of the lines
239  for( std::size_t line = 1; line < num_lines; ++line )
240  _builtPipeline->lineBuffers.push_back( _builtPipeline->lineBuffers[0] );
241 
242 
243  // Creating tf::pipe's.
244  _builtPipeline->pipes.emplace_back(
245  tf::PipeType::SERIAL,
246  [this](tf::Pipeflow &pf)
247  {
248  if( _source->allDone() )
249  pf.stop();
250  else
251  _source->process( _builtPipeline->lineBuffers[pf.line()].rawitems );
252  } );
253  _builtPipeline->pipes.emplace_back(
254  tf::PipeType::SERIAL,
255  [this](tf::Pipeflow &pf)
256  {
257  _stamper->process( _builtPipeline->lineBuffers[pf.line()].rawitems,
258  _builtPipeline->lineBuffers[pf.line()].rawitems_ts );
259  } );
260  _builtPipeline->pipes.emplace_back(
261  tf::PipeType::SERIAL,
262  [this](tf::Pipeflow &pf)
263  {
264  _deteventgen->process( _builtPipeline->lineBuffers[pf.line()].rawitems_ts,
265  _builtPipeline->lineBuffers[pf.line()].detevents[0] );
266  } );
267 
268  std::size_t bufc{0};
269  for( auto &vstage : _stagesRest ) {
270  switch( vstage.index() ) {
271  case 0: {
272  auto p = std::get<StageDetectorEvent *>( vstage );
273 
274  _builtPipeline->pipes.emplace_back(
275  tf::PipeType::SERIAL,
276  [this,p,bufc](tf::Pipeflow &pf)
277  {
278  p->process( _builtPipeline->lineBuffers[pf.line()].detevents[bufc] );
279  } );
280  break;
281  }
282  case 1: {
283  auto p = std::get<StageDetectorEventToDetectorEvent *>( vstage );
284 
285  _builtPipeline->pipes.emplace_back(
286  tf::PipeType::SERIAL,
287  [this,p,bufc](tf::Pipeflow &pf)
288  {
289  p->process( _builtPipeline->lineBuffers[pf.line()].detevents[bufc],
290  _builtPipeline->lineBuffers[pf.line()].detevents[bufc+1] );
291  } );
292  bufc++;
293  break;
294  }
295  case 2: {
296  auto p = std::get<StageDetectorEventFinal *>( vstage );
297 
298  _builtPipeline->pipes.emplace_back(
299  tf::PipeType::SERIAL,
300  [this,p,bufc](tf::Pipeflow &pf)
301  {
302  // Copying the input data to next buffer before consuming it
303  _builtPipeline->lineBuffers[pf.line()].detevents[bufc+1] = _builtPipeline->lineBuffers[pf.line()].detevents[bufc];
304  p->process( _builtPipeline->lineBuffers[pf.line()].detevents[bufc] );
305  // Zeroing the input data so that it cannot be used anymore
306  _builtPipeline->lineBuffers[pf.line()].detevents[bufc].clear();
307  } );
308  bufc++;
309  break;
310  }
311  }
312 
313  }
314  _builtPipeline->pl.reset( num_lines, _builtPipeline->pipes.begin(), _builtPipeline->pipes.end() );
315  _builtPipeline->taskflow.composed_of(_builtPipeline->pl);
317  fmt::format( "Multithreaded pipeline created with {} stages, {} lines and {} buffers/line.",
318  3+_stagesRest.size(), _builtPipeline->lineBuffers.size(),
319  3+_builtPipeline->lineBuffers[0].detevents.size() ) );
320 #endif
321  }
322 
323 
324  void _runSingleThreaded() {
325  auto &buffers = _builtPipeline->lineBuffers[0];
326  while( !_source->allDone() ) {
327  _source->process( buffers.rawitems );
328  _stamper->process( buffers.rawitems, buffers.rawitems_ts );
329  _deteventgen->process( buffers.rawitems_ts, buffers.detevents[0] );
330  std::size_t bufc{0};
331  for( auto &vstage : _stagesRest ) {
332  switch( vstage.index() ) {
333  case 0:
334  std::get<StageDetectorEvent *>( vstage )
335  ->process( buffers.detevents[bufc] );
336  break;
337  case 1:
338  std::get<StageDetectorEventToDetectorEvent *>( vstage )
339  ->process( buffers.detevents[bufc], buffers.detevents[bufc+1] );
340  bufc++;
341  break;
342  case 2:
343  // Copying the input data to next buffer before consuming it
344  buffers.detevents[bufc+1] = buffers.detevents[bufc];
345  std::get<StageDetectorEventFinal *>( vstage )
346  ->process( buffers.detevents[bufc] );
347  bufc++;
348  break;
349  }
350  }
351 
352  }
353  }
354 
355 
356 
357  struct BuiltPipeline {
358 
359  struct LineBuffers {
360  std::vector<dataitem_t> rawitems;
361  std::vector<dataitem_ts_t> rawitems_ts;
362  std::vector< std::vector<DetectorEvent> > detevents;
363  };
364 
365  std::vector<LineBuffers> lineBuffers;
366 #if POTTU_ENABLE_MULTITHREADING == 1
367  using pipe_type = tf::Pipe<std::function<void(tf::Pipeflow&)>>;
368  tf::Taskflow taskflow;
369  tf::Executor executor;
370  std::vector< pipe_type > pipes;
371  tf::ScalablePipeline<std::vector<pipe_type>::iterator> pl;
372 #endif
373  };
374 
375 
376 
377  DataSource *_source{nullptr};
378  StageRawToStampedRaw *_stamper{nullptr};
379  StageStampedRawToDetectorEvent *_deteventgen{nullptr};
380  std::vector<
381  std::variant< StageDetectorEvent *,
382  StageDetectorEventToDetectorEvent *,
383  StageDetectorEventFinal *>> _stagesRest;
384 
385  std::unique_ptr<BuiltPipeline> _builtPipeline;
386 
387  };
388 
389 
390 }
391 
392 #endif
Abstract baseclass for all datasources.
Definition: DataSource.hpp:23
virtual bool allDone() const noexcept=0
Asking if all is done.
virtual void process(std::vector< dataitem_t > &dp)=0
Fills new dataitems to the container.
Pipeline capsules the pottu-stage pipeline loop and allows running it multithreaded.
Definition: Pipeline.hpp:66
void run(std::size_t num_threads=1)
Runs the defined pipeline until the data source has no more data.
Definition: Pipeline.hpp:159
void setSource(DataSource *stage) noexcept
Sets the data source.
Definition: Pipeline.hpp:76
void setTimeStamper(StageRawToStampedRaw *stage) noexcept
Sets the time stamper.
Definition: Pipeline.hpp:88
void add(StageDetectorEvent *stage) noexcept
Adds a normal processing stage.
Definition: Pipeline.hpp:113
void add(StageDetectorEventFinal *stage) noexcept
Adds a processing stage which consumes the data.
Definition: Pipeline.hpp:137
void setDetectorEventGenerator(StageStampedRawToDetectorEvent *stage) noexcept
Sets the detector event generator.
Definition: Pipeline.hpp:100
void add(StageDetectorEventToDetectorEvent *stage) noexcept
Adds a normal processing stage which has an internal detector event buffer.
Definition: Pipeline.hpp:126
Abstract baseclass for all stages which consumes (drains) the detector events.
Definition: StageDetectorEventFinal.hpp:25
Abstract baseclass for all stages which transforms detector events to detector events.
Definition: StageDetectorEventToDetectorEvent.hpp:23
Abstract baseclass for all stages which uses or modifies detector events.
Definition: StageDetectorEvent.hpp:20
Abstract baseclass for all raw data processing stages.
Definition: StageRawToStampedRaw.hpp:21
virtual void process(const std::vector< dataitem_t > &input, std::vector< dataitem_ts_t > &output)=0
Consumes raw data items and fills output with raw timestamped data items.
Abstract baseclass for all stages which creates detector events.
Definition: StageStampedRawToDetectorEvent.hpp:19
virtual void process(const std::vector< dataitem_ts_t > &input, std::vector< DetectorEvent > &output)=0
Consumes stamped raw data items and outputs detector events.
Definition: mainpage.dox:6
static void logInfo(const std::string &message, uint64_t token=0)
Sends log message with level DEBUG from anonymous source.
Definition: ContextBase.hpp:132
static ContextBase & getActive() noexcept
Returns active Context.
Definition: ContextBase.hpp:80
Definition: Pipeline.hpp:357