Skip to content

[sc-241273] Add interpolation to Transpose & Sync #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: feature/sc-241110-add-step-column
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions custom-recipes/pi-system-transpose/recipe.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,34 @@
"type": "STRING",
"mandatory": true
},
{
"name": "type_of_interpolation",
"label": "Type of interpolation",
"description": "",
"type": "SELECT",
"selectChoices": [
{"value": "last_value", "label": "Last value received"},
{"value": "interpolation", "label": "Interpolation"},
{"value": "auto", "label": "Mixed (based on step)"}
],
"defaultValue": "last_value"
},
{
"name": "step_column_name",
"label": "Step column",
"description": "Column containing the Step information",
"type": "COLUMN",
"allowedColumnTypes": [
"tinyint",
"smallint",
"int",
"bigint",
"string",
"boolean"
],
"columnRole": "input_dataset",
"visibilityCondition": "model.type_of_interpolation=='auto'"
},
{
"name": "show_advanced_parameters",
"label": "Show advanced parameters",
Expand Down
71 changes: 58 additions & 13 deletions custom-recipes/pi-system-transpose/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dateutil.parser
from column_name import normalise_name
from osisoft_plugin_common import reorder_dataframe
from datetime import datetime


logger = SafeLogger("pi-system plugin", forbiden_keys=["token", "password"])
Expand Down Expand Up @@ -36,18 +37,27 @@ def parse_timestamp_and_value(line):
return date, value


def get_datetime_from_string(datetime):
def get_epoch_from_string(datetime_string):
try:
_ = dateutil.parser.isoparse(datetime)
return datetime
utc_time = datetime.strptime(datetime_string, "%Y-%m-%dT%H:%M:%SZ")
epoch_time = (utc_time - datetime(1970, 1, 1)).total_seconds()
except Exception:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably log an error here? I don't think the calling code will be able to handle this?

return epoch_time


def get_datetime_from_string(datetime_string):
try:
_ = dateutil.parser.isoparse(datetime_string)
return datetime_string
except Exception:
pass
return None


def get_datetime_from_pandas(datetime):
def get_datetime_from_pandas(datetime_string):
try:
time_stamp = datetime.strftime('%Y-%m-%dT%H:%M:%SZ')
time_stamp = datetime_string.strftime('%Y-%m-%dT%H:%M:%SZ')
return time_stamp
except Exception:
pass
Expand All @@ -63,7 +73,7 @@ def get_datetime_from_row(row, datetime_column):
return formated_datetime


def get_latest_values_at_timestamp(file_handles, seek_timestamp):
def get_values_at_timestamp(file_handles, seek_timestamp, step_attributes):
attribute_index = 0
values = {}
for attribute_path in file_handles:
Expand All @@ -85,19 +95,42 @@ def get_latest_values_at_timestamp(file_handles, seek_timestamp):
next_timestamps_cache[attribute_index] = attribute_timestamp
next_values_cache[attribute_index] = attribute_value
next_cached_timestamp = next_timestamps_cache[attribute_index]
if step_attributes.get(attribute_path) is True:
calculated_value = interpolate(
current_timestamps_cache[attribute_index],
current_values_cache[attribute_index],
next_timestamps_cache[attribute_index],
next_values_cache[attribute_index],
seek_timestamp
)
else:
calculated_value = current_values_cache[attribute_index]
if should_add_timestamps_columns:
values.update({
"{}{}".format(attribute_path, OSIsoftConstants.TIMESTAMP_COLUMN_SUFFIX): current_timestamps_cache[attribute_index],
"{}{}".format(attribute_path, OSIsoftConstants.VALUE_COLUMN_SUFFIX): current_values_cache[attribute_index]
"{}{}".format(attribute_path, OSIsoftConstants.VALUE_COLUMN_SUFFIX): calculated_value
})
else:
values.update({
attribute_path: current_values_cache[attribute_index]
attribute_path: calculated_value
})
attribute_index = attribute_index + 1
return values


def interpolate(previous_timestamp, previous_value, next_timestamp, next_value, time_now):
previous_timestamp = get_epoch_from_string(previous_timestamp)
next_timestamp = get_epoch_from_string(next_timestamp)
time_now = get_epoch_from_string(time_now)
if previous_timestamp is None or next_timestamp is None or time_now is None:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ It's possible previous_timestamp == next_timestamp - this happens when there is no value for the attribute after the seek time, so these lines in the previous code will run then the while loop is ended, leaving things equal:

            current_timestamps_cache[attribute_index] = next_timestamps_cache[attribute_index]
            current_values_cache[attribute_index] = next_values_cache[attribute_index]

This will create a divide by zero error in the code below - an easy thing to do is catch it here like this.
(I.e. use the previous value just like the non-interpolated case in this situation)

    if previous_timestamp == next_timestamp:
        return previous_value        

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact we could make it the following - this will avoid doing the interpolation calculation when it is not necessary as we have an exact value:

    if previous_timestamp == next_timestamp or time_now == previous_timestamp :
        return previous_value

if previous_timestamp == next_timestamp or time_now == previous_timestamp:
return previous_value
rate_of_change = (float(next_value) - float(previous_value)) / (float(next_timestamp) - float(previous_timestamp))
value_now = float(previous_value) + rate_of_change * (float(time_now) - float(previous_timestamp))
return value_now


def clean_cache(paths_to_file_handles):
logger.info("Polling done, cleaning the cache files")
# Close and delete all cache files
Expand Down Expand Up @@ -162,12 +195,17 @@ def get_column_name_specifications():
previous_server_url = ""
paths_to_file_handles = {}
file_counter = 0
step_attributes = {}

type_of_interpolation = config.get("type_of_interpolation", "last_value")
if type_of_interpolation == "auto":
step_column_name = config.get("step_column_name", "Step")

# Cache each attribute
logger.info("Caching all attributes in {}".format(temp_location.name))
for index, input_parameters_row in input_parameters_dataframe.iterrows():
datetime = get_datetime_from_row(input_parameters_row, datetime_column)
if not datetime:
row_datetime = get_datetime_from_row(input_parameters_row, datetime_column)
if not row_datetime:
continue
attribute_path = input_parameters_row.get(input_paths_column)
if should_make_column_names_db_compatible:
Expand All @@ -181,7 +219,14 @@ def get_column_name_specifications():
if attribute_path == reference_attribute_path:
time_reference_file = file_counter
file_counter = file_counter + 1
paths_to_file_handles[attribute_path].writelines("{}|{}\n".format(datetime, value))
paths_to_file_handles[attribute_path].writelines("{}|{}\n".format(row_datetime, value))

if type_of_interpolation == "auto":
is_step_attribute = input_parameters_row.get(step_column_name)
if is_step_attribute == "True" or is_step_attribute is True:
step_attributes[attribute_path] = True
elif type_of_interpolation == "interpolation":
step_attributes[attribute_path] = True

logger.info("Cached all {} attributes".format(file_counter))

Expand Down Expand Up @@ -210,15 +255,15 @@ def get_column_name_specifications():
next_timestamps_cache.pop(0)
next_values_cache.pop(0)

logger.info("Polling all attributes into final dataset")
# For each timestamp of synchronizer attribute, read the most up to date value of all other attributes
# Write all that, one column per attribute
first_dataframe = True
logger.info("Polling all attributes into final dataset")
with output_dataset.get_writer() as writer:
for line in reference_values_file:
unnested_items_rows = []
timestamp, value = parse_timestamp_and_value(line)
output_columns_dictionary = get_latest_values_at_timestamp(paths_to_file_handles, timestamp)
output_columns_dictionary = get_values_at_timestamp(paths_to_file_handles, timestamp, step_attributes)
output_columns_dictionary.update({
OSIsoftConstants.TIMESTAMP_COLUMN_NAME: timestamp,
reference_attribute_path: value
Expand Down