7 #ifndef H_POTTU_PIPELINE
8 #define H_POTTU_PIPELINE
18 #if POTTU_ENABLE_MULTITHREADING == 1
19 #include <taskflow/taskflow.hpp>
20 #include <taskflow/algorithm/pipeline.hpp>
23 #if POTTU_ROOT_SUPPORT == 1
78 _builtPipeline.reset();
90 _builtPipeline.reset();
101 _deteventgen = stage;
102 _builtPipeline.reset();
114 _stagesRest.emplace_back( stage );
115 _builtPipeline.reset();
127 _stagesRest.emplace_back( stage );
128 _builtPipeline.reset();
138 _stagesRest.emplace_back( stage );
139 _builtPipeline.reset();
159 void run( std::size_t num_threads = 1 ) {
160 if( !_source || !_stamper || !_deteventgen )
161 throw std::runtime_error(
"Pipeline: Source, stamper or detector event generator is not set!" );
163 if( num_threads == 0 ) {
164 num_threads = std::thread::hardware_concurrency();
165 if( num_threads == 0 )
169 #if POTTU_ENABLE_MULTITHREADING != 1
170 if( num_threads > 1 )
171 throw std::runtime_error(
"Asking more than one thread but no multithreading enabled."
172 "Add: target_compile_definitions( yoursort PRIVATE POTTU_ENABLE_MULTITHREADING=1 )"
173 "into the CMakeLists.txt file.");
176 if( !_builtPipeline || _builtPipeline->lineBuffers.size() != num_threads ) {
177 if( num_threads == 1 )
178 _buildSingleThreadedPipeline();
180 _buildMultiThreadedPipeline( num_threads );
183 if( num_threads == 1 ) {
184 _runSingleThreaded();
186 #if POTTU_ENABLE_MULTITHREADING == 1
191 _builtPipeline->executor.run(_builtPipeline->taskflow).wait();
197 void _buildSingleThreadedPipeline() {
198 _builtPipeline.reset();
199 _builtPipeline = std::move( std::make_unique<BuiltPipeline>() );
200 _builtPipeline->lineBuffers.resize(1);
202 auto &detevents = _builtPipeline->lineBuffers[0].detevents;
204 std::size_t sepOutN{0};
205 for(
auto &vstage : _stagesRest ) {
206 if( vstage.index() == 1 || vstage.index() == 2 )
209 detevents.resize( sepOutN+1 );
212 fmt::format(
"Singlethreaded pipeline created with {} stages and {} buffers.",
213 3+_stagesRest.size(), 3+detevents.size() ) );
217 void _buildMultiThreadedPipeline( std::size_t num_lines ) {
219 #if POTTU_ROOT_SUPPORT == 1
220 ROOT::EnableThreadSafety();
223 #if POTTU_ENABLE_MULTITHREADING == 1
224 _builtPipeline.reset();
225 _builtPipeline = std::move( std::make_unique<BuiltPipeline>() );
226 _builtPipeline->lineBuffers.reserve(num_lines);
227 _builtPipeline->lineBuffers.resize(1);
231 std::size_t sepOutN{0};
232 for(
auto &vstage : _stagesRest ) {
233 if( vstage.index() == 1 || vstage.index() == 2 )
236 _builtPipeline->lineBuffers[0].detevents.resize( sepOutN+1 );
239 for( std::size_t line = 1; line < num_lines; ++line )
240 _builtPipeline->lineBuffers.push_back( _builtPipeline->lineBuffers[0] );
244 _builtPipeline->pipes.emplace_back(
245 tf::PipeType::SERIAL,
246 [
this](tf::Pipeflow &pf)
251 _source->
process( _builtPipeline->lineBuffers[pf.line()].rawitems );
253 _builtPipeline->pipes.emplace_back(
254 tf::PipeType::SERIAL,
255 [
this](tf::Pipeflow &pf)
257 _stamper->
process( _builtPipeline->lineBuffers[pf.line()].rawitems,
258 _builtPipeline->lineBuffers[pf.line()].rawitems_ts );
260 _builtPipeline->pipes.emplace_back(
261 tf::PipeType::SERIAL,
262 [
this](tf::Pipeflow &pf)
264 _deteventgen->
process( _builtPipeline->lineBuffers[pf.line()].rawitems_ts,
265 _builtPipeline->lineBuffers[pf.line()].detevents[0] );
269 for(
auto &vstage : _stagesRest ) {
270 switch( vstage.index() ) {
272 auto p = std::get<StageDetectorEvent *>( vstage );
274 _builtPipeline->pipes.emplace_back(
275 tf::PipeType::SERIAL,
276 [
this,p,bufc](tf::Pipeflow &pf)
278 p->process( _builtPipeline->lineBuffers[pf.line()].detevents[bufc] );
283 auto p = std::get<StageDetectorEventToDetectorEvent *>( vstage );
285 _builtPipeline->pipes.emplace_back(
286 tf::PipeType::SERIAL,
287 [
this,p,bufc](tf::Pipeflow &pf)
289 p->process( _builtPipeline->lineBuffers[pf.line()].detevents[bufc],
290 _builtPipeline->lineBuffers[pf.line()].detevents[bufc+1] );
296 auto p = std::get<StageDetectorEventFinal *>( vstage );
298 _builtPipeline->pipes.emplace_back(
299 tf::PipeType::SERIAL,
300 [
this,p,bufc](tf::Pipeflow &pf)
303 _builtPipeline->lineBuffers[pf.line()].detevents[bufc+1] = _builtPipeline->lineBuffers[pf.line()].detevents[bufc];
304 p->process( _builtPipeline->lineBuffers[pf.line()].detevents[bufc] );
306 _builtPipeline->lineBuffers[pf.line()].detevents[bufc].clear();
314 _builtPipeline->pl.reset( num_lines, _builtPipeline->pipes.begin(), _builtPipeline->pipes.end() );
315 _builtPipeline->taskflow.composed_of(_builtPipeline->pl);
317 fmt::format(
"Multithreaded pipeline created with {} stages, {} lines and {} buffers/line.",
318 3+_stagesRest.size(), _builtPipeline->lineBuffers.size(),
319 3+_builtPipeline->lineBuffers[0].detevents.size() ) );
324 void _runSingleThreaded() {
325 auto &buffers = _builtPipeline->lineBuffers[0];
327 _source->
process( buffers.rawitems );
328 _stamper->
process( buffers.rawitems, buffers.rawitems_ts );
329 _deteventgen->
process( buffers.rawitems_ts, buffers.detevents[0] );
331 for(
auto &vstage : _stagesRest ) {
332 switch( vstage.index() ) {
334 std::get<StageDetectorEvent *>( vstage )
335 ->process( buffers.detevents[bufc] );
338 std::get<StageDetectorEventToDetectorEvent *>( vstage )
339 ->process( buffers.detevents[bufc], buffers.detevents[bufc+1] );
344 buffers.detevents[bufc+1] = buffers.detevents[bufc];
345 std::get<StageDetectorEventFinal *>( vstage )
346 ->process( buffers.detevents[bufc] );
360 std::vector<dataitem_t> rawitems;
361 std::vector<dataitem_ts_t> rawitems_ts;
362 std::vector< std::vector<DetectorEvent> > detevents;
365 std::vector<LineBuffers> lineBuffers;
366 #if POTTU_ENABLE_MULTITHREADING == 1
367 using pipe_type = tf::Pipe<std::function<void(tf::Pipeflow&)>>;
368 tf::Taskflow taskflow;
369 tf::Executor executor;
370 std::vector< pipe_type > pipes;
371 tf::ScalablePipeline<std::vector<pipe_type>::iterator> pl;
379 StageStampedRawToDetectorEvent *_deteventgen{
nullptr};
381 std::variant< StageDetectorEvent *,
382 StageDetectorEventToDetectorEvent *,
383 StageDetectorEventFinal *>> _stagesRest;
385 std::unique_ptr<BuiltPipeline> _builtPipeline;
Abstract baseclass for all datasources.
Definition: DataSource.hpp:23
virtual bool allDone() const noexcept=0
Asking if all is done.
virtual void process(std::vector< dataitem_t > &dp)=0
Fills new dataitems to the container.
Pipeline capsules the pottu-stage pipeline loop and allows running it multithreaded.
Definition: Pipeline.hpp:66
void run(std::size_t num_threads=1)
Runs the defined pipeline until the data source has no more data.
Definition: Pipeline.hpp:159
void setSource(DataSource *stage) noexcept
Sets the data source.
Definition: Pipeline.hpp:76
void setTimeStamper(StageRawToStampedRaw *stage) noexcept
Sets the time stamper.
Definition: Pipeline.hpp:88
void add(StageDetectorEvent *stage) noexcept
Adds a normal processing stage.
Definition: Pipeline.hpp:113
void add(StageDetectorEventFinal *stage) noexcept
Adds a processing stage which consumes the data.
Definition: Pipeline.hpp:137
void setDetectorEventGenerator(StageStampedRawToDetectorEvent *stage) noexcept
Sets the detector event generator.
Definition: Pipeline.hpp:100
void add(StageDetectorEventToDetectorEvent *stage) noexcept
Adds a normal processing stage which has an internal detector event buffer.
Definition: Pipeline.hpp:126
Abstract baseclass for all stages which consumes (drains) the detector events.
Definition: StageDetectorEventFinal.hpp:25
Abstract baseclass for all stages which transforms detector events to detector events.
Definition: StageDetectorEventToDetectorEvent.hpp:23
Abstract baseclass for all stages which uses or modifies detector events.
Definition: StageDetectorEvent.hpp:20
Abstract baseclass for all raw data processing stages.
Definition: StageRawToStampedRaw.hpp:21
virtual void process(const std::vector< dataitem_t > &input, std::vector< dataitem_ts_t > &output)=0
Consumes raw data items and fills output with raw timestamped data items.
Abstract baseclass for all stages which creates detector events.
Definition: StageStampedRawToDetectorEvent.hpp:19
virtual void process(const std::vector< dataitem_ts_t > &input, std::vector< DetectorEvent > &output)=0
Consumes stamped raw data items and outputs detector events.
Definition: mainpage.dox:6
static void logInfo(const std::string &message, uint64_t token=0)
Sends log message with level DEBUG from anonymous source.
Definition: ContextBase.hpp:132
static ContextBase & getActive() noexcept
Returns active Context.
Definition: ContextBase.hpp:80
Definition: Pipeline.hpp:359
Definition: Pipeline.hpp:357