7 #ifndef H_POTTU_FILEREADER
8 #define H_POTTU_FILEREADER
22 #include <type_traits>
47 : _ctxh(
ContextBase::getActive().createHandle(
"FileReader") ),
48 _blocksAtOnce(blocksAtOnce)
50 if( blocksAtOnce < 1 )
51 throw std::runtime_error(
"FileReader(): blocksAtOnce must be at least 1!" );
62 void addFile(
const std::string &filename ) {
63 auto f = std::make_unique<TDRFile>( filename );
66 _totalsize += f->getFilesize();
67 _ctxh->logInfo( fmt::format(
"File {} added", f->getPath() ) );
68 _filesWaiting.push_back( std::move(f) );
78 virtual void process( std::vector<dataitem_t> &dp ) {
79 static std::vector<uint8_t> buf;
80 auto proci = _ctxh->processInstance();
83 for( uint32_t i = 0; i < _blocksAtOnce; ++i ) {
86 _ctxh->logInfo( fmt::format(
"No file open. Trying to open next one if any." ) );
87 if( _filesWaiting.size() ) {
88 _currentFile = std::move( _filesWaiting.front() );
89 _filesWaiting.pop_front();
91 _ctxh->logInfo( fmt::format(
"All files processed." ) );
95 _ctxh->logInfo( fmt::format(
"Starting to process file {}.", _currentFile->getPath() ) );
97 buf.resize( _currentFile->getBlocksize() );
102 size_t len = _currentFile->readBlock(buf.data());
117 auto firstNew = dp.size();
118 dp.resize( dp.size() + len/8 );
124 std::memcpy( dp.data()+firstNew, buf.data()+24, len );
126 _totalBytesRead += _currentFile->getBlocksize();
129 if( _currentFile->isWordSwapRequired() ) {
130 for(
auto it = dp.begin()+firstNew; it != dp.end(); ++it )
131 *it = it->asSwapped();
135 if( _currentFile->isEof() ) {
136 _ctxh->logInfo( fmt::format(
"File {} processed", _currentFile->getPath() ) );
137 _totalBytesReadFromFile += _currentFile->getGZFile().getPosition();
138 _currentFile->close();
139 _filesProcessed.push_back( std::move(_currentFile) );
145 if( _reportProgress ) {
146 auto t = std::chrono::steady_clock::now();
147 if( t-_lastProgressReport > _maxReportInterval ) {
148 double dt = (std::chrono::duration<double>( t-_lastProgressReport ) ).count();
150 double estLeftMinutes = (1.0-progress)/( (progress-_lastProgress)/dt ) / 60.0;
151 _ctxh->logInfo( fmt::format(
"Progress: {:>3}/{:<3}, {:6.3f} %, est. {:6.2f} min",
152 _filesProcessed.size()+1,
153 _filesProcessed.size()+1+_filesWaiting.size(),
156 _lastProgressReport = t;
157 _lastProgress = progress;
167 virtual bool allDone() const noexcept {
return !_currentFile && _filesWaiting.empty(); }
191 return _totalBytesReadFromFile + (_currentFile ? _currentFile->getGZFile().getPosition() : 0 );
210 _reportProgress = state;
224 uint32_t _blocksAtOnce;
226 std::unique_ptr<TDRFile> _currentFile;
227 std::deque<std::unique_ptr<TDRFile>> _filesWaiting;
228 std::deque<std::unique_ptr<TDRFile>> _filesProcessed;
231 uint64_t _totalsize{0};
232 uint64_t _blocksRead{0};
233 uint64_t _totalBytesRead{0};
234 uint64_t _totalBytesReadFromFile{0};
236 bool _reportProgress{
true};
237 std::chrono::steady_clock::duration _maxReportInterval{std::chrono::seconds(2)};
238 std::chrono::steady_clock::time_point _lastProgressReport{std::chrono::steady_clock::now()};
239 double _lastProgress{0};
Handle to context used by specific class.
Definition: ContextBase.hpp:188
Abstract baseclass for all datasources.
Definition: DataSource.hpp:23
Datasource which reads TDR data from a disk.
Definition: FileReader.hpp:37
void addFile(const std::string &filename)
Adds a new TDR file into the queue.
Definition: FileReader.hpp:62
uint64_t getTotalBytesRead() const
Returns number of total bytes read this far by this object.
Definition: FileReader.hpp:182
uint64_t getTotalBytesReadFromFile() const
Returns number of total bytes read from all files without unpacking.
Definition: FileReader.hpp:190
void setProgressReport(bool state=true) noexcept
Sets the state of automatic progress reporting.
Definition: FileReader.hpp:209
virtual bool allDone() const noexcept
Tells if all the files are processed.
Definition: FileReader.hpp:167
bool getProgressReport() const noexcept
Returns the state of automatic progress reporting.
Definition: FileReader.hpp:217
uint64_t getBlocksRead() const
Returns number of blocks read.
Definition: FileReader.hpp:173
virtual void process(std::vector< dataitem_t > &dp)
Reads data from file to a container.
Definition: FileReader.hpp:78
FileReader(uint32_t blocksAtOnce=1)
Construct the Filereader object.
Definition: FileReader.hpp:46
double getProgress() const noexcept
Returns the fraction of read unpacked bytes to the total size of the all files queued.
Definition: FileReader.hpp:200
Defines the raw low level dataitems used in tdr datastreams and files.
Definition: mainpage.dox:6
Default context for printing and collecting statistics.
Definition: ContextBase.hpp:41