Pottu
FileReader.hpp
Go to the documentation of this file.
1 
7 #ifndef H_POTTU_FILEREADER
8 #define H_POTTU_FILEREADER
9 
10 
11 #include "TDRFile.hpp"
12 //#include "DataPacketRawItems.hpp"
13 #include "dataitem.hpp"
14 #include "ContextBase.hpp"
15 #include "DataSource.hpp"
16 
17 
18 #include <fmt/base.h>
19 
20 #include <deque>
21 #include <vector>
22 #include <type_traits>
23 #include <cassert>
24 #include <memory>
25 #include <chrono>
26 
27 
28 namespace pottu {
29 
37  class FileReader : public DataSource {
38  public:
39 
46  FileReader( uint32_t blocksAtOnce = 1 )
47  : _ctxh( ContextBase::getActive().createHandle("FileReader") ),
48  _blocksAtOnce(blocksAtOnce)
49  {
50  if( blocksAtOnce < 1 )
51  throw std::runtime_error( "FileReader(): blocksAtOnce must be at least 1!" );
52  }
53 
62  void addFile( const std::string &filename ) {
63  auto f = std::make_unique<TDRFile>( filename );
64  f->open();
65  f->close();
66  _totalsize += f->getFilesize();
67  _ctxh->logInfo( fmt::format( "File {} added", f->getPath() ) );
68  _filesWaiting.push_back( std::move(f) );
69  }
70 
71 
78  virtual void process( std::vector<dataitem_t> &dp ) {
79  static std::vector<uint8_t> buf;
80  auto proci = _ctxh->processInstance();
81  dp.clear();
82 
83  for( uint32_t i = 0; i < _blocksAtOnce; ++i ) {
84 
85  if( !_currentFile ) {
86  _ctxh->logInfo( fmt::format( "No file open. Trying to open next one if any." ) );
87  if( _filesWaiting.size() ) {
88  _currentFile = std::move( _filesWaiting.front() );
89  _filesWaiting.pop_front();
90  } else {
91  _ctxh->logInfo( fmt::format( "All files processed." ) );
92  return;
93  }
94  _currentFile->open();
95  _ctxh->logInfo( fmt::format( "Starting to process file {}.", _currentFile->getPath() ) );
96  }
97  buf.resize( _currentFile->getBlocksize() );
98 
99  // Reading next block. len is length of usefull data
100  // excluding the header. buf length is however always
101  // blocksize and contains the header.
102  size_t len = _currentFile->readBlock(buf.data());
103  //fmt::print( "sequence: {}, len: {}\n", _currentFile->getLastHeader().sequence, len );
104  if( len % 8 != 0 ) {
105  /*
106  _ctxh->logWarning( fmt::format( "Header size not multiple of 8. file: {}, sequence: {}, len: {}",
107  _currentFile->getPath(), _currentFile->getLastHeader().sequence, len ) );
108  */
109  len = (len / 8)*8;
110  }
111 
112  if( len ) {
113  // Making room for new data at the end of vector
114  // data. Then copying data there with memcpy (cannot
115  // use insert since the alignment of blockbuffer is
116  // wrong).
117  auto firstNew = dp.size();
118  dp.resize( dp.size() + len/8 );
119 
120  /*
121  fmt::print( "{:12} {:12} {:12}\n",
122  dp.size(), &*dp.end()-(dp.data()+firstNew), len/8.0 );
123  */
124  std::memcpy( dp.data()+firstNew, buf.data()+24, len );
125  ++_blocksRead;
126  _totalBytesRead += _currentFile->getBlocksize();
127 
128  // Swapping dataitems if needed
129  if( _currentFile->isWordSwapRequired() ) {
130  for( auto it = dp.begin()+firstNew; it != dp.end(); ++it )
131  *it = it->asSwapped();
132  }
133  }
134 
135  if( _currentFile->isEof() ) {
136  _ctxh->logInfo( fmt::format( "File {} processed", _currentFile->getPath() ) );
137  _totalBytesReadFromFile += _currentFile->getGZFile().getPosition();
138  _currentFile->close();
139  _filesProcessed.push_back( std::move(_currentFile) );
140  }
141 
142  }
143 
144 
145  if( _reportProgress ) {
146  auto t = std::chrono::steady_clock::now();
147  if( t-_lastProgressReport > _maxReportInterval ) {
148  double dt = (std::chrono::duration<double>( t-_lastProgressReport ) ).count();
149  double progress = getProgress();
150  double estLeftMinutes = (1.0-progress)/( (progress-_lastProgress)/dt ) / 60.0;
151  _ctxh->logInfo( fmt::format( "Progress: {:>3}/{:<3}, {:6.3f} %, est. {:6.2f} min",
152  _filesProcessed.size()+1,
153  _filesProcessed.size()+1+_filesWaiting.size(),
154  progress*100,
155  estLeftMinutes ) );
156  _lastProgressReport = t;
157  _lastProgress = progress;
158  }
159  }
160 
161  }
162 
167  virtual bool allDone() const noexcept { return !_currentFile && _filesWaiting.empty(); }
168 
173  uint64_t getBlocksRead() const { return _blocksRead; }
174 
182  uint64_t getTotalBytesRead() const { return _totalBytesRead; }
183 
190  uint64_t getTotalBytesReadFromFile() const {
191  return _totalBytesReadFromFile + (_currentFile ? _currentFile->getGZFile().getPosition() : 0 );
192  }
193 
200  double getProgress() const noexcept {
201  return getTotalBytesReadFromFile()/double(_totalsize);
202  }
203 
209  void setProgressReport( bool state=true ) noexcept {
210  _reportProgress = state;
211  }
212 
217  bool getProgressReport() const noexcept { return _reportProgress; }
218 
219 
220  private:
221 
222  ContextHandle *_ctxh;
223 
224  uint32_t _blocksAtOnce;
225 
226  std::unique_ptr<TDRFile> _currentFile;
227  std::deque<std::unique_ptr<TDRFile>> _filesWaiting;
228  std::deque<std::unique_ptr<TDRFile>> _filesProcessed;
229 
230 
231  uint64_t _totalsize{0};
232  uint64_t _blocksRead{0};
233  uint64_t _totalBytesRead{0};
234  uint64_t _totalBytesReadFromFile{0};
235 
236  bool _reportProgress{true};
237  std::chrono::steady_clock::duration _maxReportInterval{std::chrono::seconds(2)};
238  std::chrono::steady_clock::time_point _lastProgressReport{std::chrono::steady_clock::now()};
239  double _lastProgress{0};
240  };
241 
242 
243 }
244 
245 
246 #endif
Handle to context used by specific class.
Definition: ContextBase.hpp:188
Abstract baseclass for all datasources.
Definition: DataSource.hpp:23
Datasource which reads TDR data from a disk.
Definition: FileReader.hpp:37
void addFile(const std::string &filename)
Adds a new TDR file into the queue.
Definition: FileReader.hpp:62
uint64_t getTotalBytesRead() const
Returns number of total bytes read this far by this object.
Definition: FileReader.hpp:182
uint64_t getTotalBytesReadFromFile() const
Returns number of total bytes read from all files without unpacking.
Definition: FileReader.hpp:190
void setProgressReport(bool state=true) noexcept
Sets the state of automatic progress reporting.
Definition: FileReader.hpp:209
virtual bool allDone() const noexcept
Tells if all the files are processed.
Definition: FileReader.hpp:167
bool getProgressReport() const noexcept
Returns the state of automatic progress reporting.
Definition: FileReader.hpp:217
uint64_t getBlocksRead() const
Returns number of blocks read.
Definition: FileReader.hpp:173
virtual void process(std::vector< dataitem_t > &dp)
Reads data from file to a container.
Definition: FileReader.hpp:78
FileReader(uint32_t blocksAtOnce=1)
Construct the Filereader object.
Definition: FileReader.hpp:46
double getProgress() const noexcept
Returns the fraction of read unpacked bytes to the total size of the all files queued.
Definition: FileReader.hpp:200
Defines the raw low level dataitems used in tdr datastreams and files.
Definition: mainpage.dox:6
Default context for printing and collecting statistics.
Definition: ContextBase.hpp:41