Commit 472c6b47 authored by Achim Morschhauser's avatar Achim Morschhauser
Browse files

Bugfix with larger data gaps for plasmon filter

- Add data points (NaN) when gap is longer than filter block
- This guarantees continuous timeseries in 1 Hz output
parent 4443b581
......@@ -219,6 +219,11 @@ int Filter_Obs::lowpass_PLASMON_1Hz(buffer_obs* buffer_in,
// Timestamp as string for logging
std::string timestamp;
// Integer quotient for assigning filter1 blocks
int N;
bool debug=false;
// =====================================================================
// Filter1 downsamples to 16 Hz with a cut-off frequency of 5 Hz
......@@ -285,17 +290,14 @@ int Filter_Obs::lowpass_PLASMON_1Hz(buffer_obs* buffer_in,
// Load initial number of points for filter1.
// =====================================================================
// wait for data to arrive
buffer_in->pop(data);
while ( data->get_sec() == 0 ) {
buffer_in->pop(data);
}
// Stall to get the center of minute
buffer_in->pop(data);
int sec = data->get_sec();
while ( ( data->get_msec() < 500e6 ) || ( data->get_sec() == sec ) ) {
buffer_in->pop(data);
std::string tmp_str;
data->string(&tmp_str,4);
std::cerr << tmp_str << std::endl;
}
std::cerr << "------------- START PLASMON -------------" << std::endl;
......@@ -312,136 +314,169 @@ int Filter_Obs::lowpass_PLASMON_1Hz(buffer_obs* buffer_in,
//
// Continuously run concatenated filters one and two.
// =================================================
//
//int counter = 0;
//std::string datastr;
//
// For debugging only
int counter = 0;
std::string datastr;
while (1) {
// Grab new data
buffer_in->pop(data);
if (buffer_in->pop(data) != 0) {
return(0);
}
tofilter1.back()->get_time(&last_timespec);
data->get_time(&this_timespec);
last_msec = last_timespec.tv_nsec;
this_msec = this_timespec.tv_nsec;
//counter++;
/*
data->string(&datastr,5);
std::cerr << "INCOMING: " << std::endl;
std::cerr << data->get_sec()
<< "."
<< data->get_msec() << " "
<< datastr << " "
<< counter
<< std::endl;
*/
///*
if (debug) {
counter++;
data->string(&datastr,5);
//std::cerr << "INCOMING: " << std::endl;
std::cerr << datastr << " "
<< counter
<< std::endl;
}
//*/
// One data point is produced from filter1 for each block of
// data, corresponding to its output filter frequency
// Check here if we hit the next block
if ( (this_msec % filter1_delta_t_msec) < (last_msec % filter1_delta_t_msec) ) {
//fprintf(stderr, "number of elements in filter1: %lu\n", tofilter1.size());
//fprintf(stderr, "number of blocks in filter1: %lu\n", filter1_block_count);
// Save number of data in this block
filter1_blocks_info.push_back(filter1_block_numdata);
filter1_block_numdata = 0;
// Wait until filter1 queue is filled
if (filter1_block_count == filter1_block_len) {
try {
filter1_win = &filter1_wins.at(tofilter1.size());
// Filter data
filter_data_temp->filter_fir_sym(&tofilter1, filter1_win);
} catch (const std::out_of_range &oor) {
// if we get here, it means we don't have enough data to filter. A block probably got shortened.
// In this case, set data value to NaN
filter_data_temp->set_values_MD();
std::cerr << "Set values to MD!"
<< std::endl;
filter_data_temp->median_time(&tofilter1);
timestamp=filter_data_temp->get_time
("%Y-%m-%d %H:%M:%S",true);
std::cerr << "[" << timestamp << "]: "
<< "no filter1 - "
<< tofilter1.size()
<< std::endl;
}
//std::string tmp_str;
//filter_data_temp->string(&tmp_str,5);
//std::cerr << "DEBUG ADD FILTER2: " << tmp_str << std::endl;
// Append this data to the queue for filter 2
tofilter2.push_back(new data_obs_vector(filter_data_temp));
/*
filter_data_temp->string(&datastr,5);
std::cerr << "FILTER 1 APPLIED: " << std::endl;
std::cerr << filter_data_temp->get_sec() << "."
<< filter_data_temp->get_msec() << " "
<< datastr << " "
<< counter << " "
<< filter2_count << std::endl;
*/
// Drop the oldest data points, as determined by the oldest block
for (int ix = 0; ix < filter1_blocks_info.front(); ix++) {
delete(tofilter1[ix]);
}
tofilter1.erase(tofilter1.begin(), tofilter1.begin() + filter1_blocks_info.front());
// Drop the oldest block information
filter1_blocks_info.erase(filter1_blocks_info.begin());
// Wait until filter2 queue is filled
if (tofilter2.size() == filter2_block_len) {
// Wait for downsampling and apply filter2
if (filter2_count == filter2_block_step) {
//data->filter_fir_sym(&tofilter2,&filter2_win);
filter_data_temp->filter_fir_sym(&tofilter2, &filter2_win);
/*
filter_data_temp->string(&datastr,5);
std::cerr << "FILTER 2 APPLIED: " << std::endl;
std::cerr << filter_data_temp->get_sec()
<< "." << filter_data_temp->get_msec() << " "
<< datastr << " "
<< " " << counter
<< " " << filter2_block_step
<< std::endl;
*/
//counter = 0;
buffer_out->put(filter_data_temp);
filter2_count = 1;
} else {
filter2_count++;
}
// Delete oldest data from filter2
delete(tofilter2[0]);
tofilter2.erase(tofilter2.begin());
}
} else { // No filter was applied, blocks of filter1 are being filled
// Increase number of blocks counter
filter1_block_count++;
}
// Check here if we hit the next block, or if blocks are skipped
int sec = ((this_timespec.tv_sec) - (last_timespec.tv_sec))*1e9;
int N = (sec+this_msec)/filter1_delta_t_msec - last_msec/filter1_delta_t_msec;
if (debug) {
std::cerr << "N: " << N << std::endl;
fprintf(stderr, "number of elements in filter1: %lu\n",
tofilter1.size());
fprintf(stderr, "number of blocks in filter1: %u\n",
filter1_block_count);
}
}
// Set to NaN if blocks were skipped (see below)
//int N_fill = fmin(filter1_block_len - filter1_block_count + 1, N);
for (int i = 1; i <= N; i++) {
if (debug && i!=1) {
fprintf(stderr, "number of elements in filter1: %lu\n",
tofilter1.size());
fprintf(stderr, "number of blocks in filter1: %u\n",
filter1_block_count);
}
// Set to NaN if blocks were skipped
// Make sure to first clear old block
if (i != 1) {
// Append this data to the queue for filter 1
tofilter1.push_back(new data_obs_vector(data));
tofilter1.back()->set_values_MD();
filter1_block_numdata++;
// New time at center of next block
tofilter1.back()->set_msec(0);
tofilter1.back()->add_time(0,
(long) last_msec - (last_msec % filter1_delta_t_msec)
+ (1.0 / 2.0 + (i-1)) * filter1_delta_t_msec);
// Debug output
timestamp = tofilter1.back()->get_time("%Y-%m-%d %H:%M:%S",
true);
std::cerr << "[" << timestamp << "."
<< tofilter1.back()->get_msec() << "]: "
<< "Block skipped and NaN added: ";
std::cerr << N << " / " << filter1_block_count << " / "
<< filter1_block_numdata << " / "
<< i
<< std::endl;
}
// Save number of data in this block
filter1_blocks_info.push_back(filter1_block_numdata);
filter1_block_numdata = 0;
// Wait until filter1 queue is filled
if (filter1_block_count == filter1_block_len) {
try {
filter1_win = &filter1_wins.at(tofilter1.size());
// Filter data
filter_data_temp->filter_fir_sym(&tofilter1, filter1_win);
} catch (const std::out_of_range &oor) {
// if we get here, it means we don't have enough data to filter. A block probably got shortened.
// In this case, set data value to NaN
filter_data_temp->set_values_MD();
std::cerr << "Set values to MD!" << std::endl;
filter_data_temp->median_time(&tofilter1);
timestamp = filter_data_temp->get_time("%Y-%m-%d %H:%M:%S",
true);
std::cerr << "[" << timestamp << "]: " << "no filter1 - "
<< tofilter1.size() << std::endl;
}
if (abs(tofilter1.size() - freq_in) > 1) {
timestamp = filter_data_temp->get_time("%Y-%m-%d %H:%M:%S",
true);
std::cerr << "[" << timestamp << "]: "
<< "filter1 applied with unusual size - "
<< tofilter1.size() << " - " << freq_in
<< std::endl;
}
// Append this data to the queue for filter 2
tofilter2.push_back(new data_obs_vector(filter_data_temp));
// Drop the oldest data points, as determined by the oldest block
for (int ix = 0; ix < filter1_blocks_info.front(); ix++) {
delete (tofilter1[ix]);
}
tofilter1.erase(tofilter1.begin(),
tofilter1.begin() + filter1_blocks_info.front());
// Drop the oldest block information
filter1_blocks_info.erase(filter1_blocks_info.begin());
// Wait until filter2 queue is filled
if (tofilter2.size() == filter2_block_len) {
// Wait for downsampling and apply filter2
if (filter2_count == filter2_block_step) {
filter_data_temp->filter_fir_sym(&tofilter2,
&filter2_win);
if (debug) counter = 0;
buffer_out->put(filter_data_temp);
filter2_count = 1;
} else {
filter2_count++;
}
// Delete oldest data from filter2
delete (tofilter2[0]);
tofilter2.erase(tofilter2.begin());
}
}
// No filter was applied, blocks of filter1 are being
// filled
if (filter1_block_count != filter1_block_len) {
// Increase number of blocks counter
filter1_block_count++;
}
filter1_block_numdata++;
tofilter1.push_back(new data_obs_vector(data));
}
}
filter1_block_numdata++;
tofilter1.push_back(new data_obs_vector(data));
}
return (0);
return (0);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment