Skip to content

Add time interpolation to mpc data #3559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions pyomo/contrib/mpc/data/interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2025
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________

from bisect import bisect_right


def _get_time_index_vec(time_set, time_data):
"""Get the position index of time_data above and below the times in
time_set. This can be used to find positions of points to interpolate
between.
Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in
Returns
-------
numpy.array
Position index of the first time in time_data greater than the
corresponding points time_set. If a time is less than all the times
in time_data return 1. If a time is greater than all times time_data
set return the last index of time_data.
"""
pos = [None] * len(time_set)
for i, t in enumerate(time_set):
pos[i] = bisect_right(time_data, t)
if pos[i] == 0:
pos[i] = 1
elif pos[i] == len(time_data):
pos[i] = len(time_data) - 1
return pos


def _get_interp_expr_vec(time_set, time_data, data, indexes=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indexes isn't described in the docstring

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is indexes used? If not, can it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used. Updated doc string.

"""Return an array of floats interpolated at the time points in time_set
from data defined at time_data.
Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in
data: iterable
Data corresponding to times in time_data, must have the same
length as time data.
indexes: numpy.array
Numpy array of position indexes of the time points to interpolate in the
time data. The format is the same as returned by ``_get_time_index_vec()``.
If this is None, ``_get_time_index_vec()`` is called. The reason to pass
this is to avoid multiple position searches when interpolating multiple
outputs with the same time points.
Returns
-------
list
If data are Pyomo components, this will return Pyomo expressions.
If data are floats, this will return floats.
"""
if indexes is None:
indexes = _get_time_index_vec(time_set, time_data)
expr = [None] * len(time_set)
for i, (h, t) in enumerate(zip(indexes, time_set)):
l = h - 1
expr[i] = data[l] + (data[h] - data[l]) / (time_data[h] - time_data[l]) * (
t - time_data[l]
)
return expr
49 changes: 48 additions & 1 deletion pyomo/contrib/mpc/data/series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from pyomo.contrib.mpc.data.get_cuid import get_indexed_cuid
from pyomo.contrib.mpc.data.dynamic_data_base import _is_iterable, _DynamicDataBase
from pyomo.contrib.mpc.data.scalar_data import ScalarData

from pyomo.contrib.mpc.data.interpolation import (
_get_time_index_vec,
_get_interp_expr_vec,
)

TimeSeriesTuple = namedtuple("TimeSeriesTuple", ["data", "time"])

Expand Down Expand Up @@ -152,6 +155,50 @@ def get_data_at_time(self, time=None, tolerance=0.0):
indices = indices[0]
return self.get_data_at_time_indices(indices)

def get_interpolated_data(self, time=None, tolerance=0.0):
"""
Returns the data associated with the provided time point or points by
linear interpolation.
Parameters
----------
time: Float or iterable
The time point or points corresponding to returned data.
tolerance: float
Tolerance used when checking if time points are inside the data
range.
Returns
-------
TimeSeriesData or ~scalar_data.ScalarData
TimeSeriesData containing only the specified time points
or dict mapping CUIDs to values at the specified scalar time
point.
"""
if time is None:
# If time is not specified, assume we want the entire time
# set. Skip all the overhead, don't create a new object, and
# return self.
return self
is_iterable = _is_iterable(time)
if not is_iterable:
time = [time]
for t in time:
if t > self._time[-1] + tolerance or t < self._time[0] - tolerance:
raise RuntimeError("Requesting interpolation outside data range.")
idxs = _get_time_index_vec(time, self._time)
data = {}
for cuid in self._data:
v = _get_interp_expr_vec(time, self._time, self._data[cuid], idxs)
data[cuid] = v
if is_iterable:
return TimeSeriesData(data, list(time))
else:
for cuid in self._data:
data[cuid] = data[cuid][0]
return ScalarData(data)

def to_serializable(self):
"""
Convert to json-serializable object.
Expand Down
57 changes: 57 additions & 0 deletions pyomo/contrib/mpc/data/tests/test_series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,63 @@ def test_get_data_at_time_with_tolerance(self):
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_data_at_time(-0.01, tolerance=1e-3)

def test_get_data_interpolate(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
data = TimeSeriesData(data_dict, m.time)
new_data = data.get_interpolated_data(0.05)
self.assertEqual(ScalarData({m.var[:, "A"]: 1.5, m.var[:, "B"]: 3}), new_data)

t1 = 0.05
new_data = data.get_interpolated_data([t1])
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5], m.var[:, "B"]: [3]}, [t1]), new_data
)

new_t = [0.05, 0.15]
new_data = data.get_interpolated_data(new_t)
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5, 2.5], m.var[:, "B"]: [3, 5]}, new_t),
new_data,
)

def test_get_data_interpolate_range_check(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
data = TimeSeriesData(data_dict, m.time)
msg = "Requesting interpolation outside data range."
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data(0.2 + 1e-6)
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data(0.0 - 1e-6)
new_data = data.get_interpolated_data(0.2 + 1e-6, tolerance=1e-5)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"]), 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"]), 6, 4)

t1 = 0.2 + 1e-6
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data([t1])
new_data = data.get_interpolated_data([t1], tolerance=1e-5)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 6, 4)

new_t = [0.0 - 1e-6, 0.2 + 1e-6]
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data(new_t)
new_data = data.get_interpolated_data(new_t, tolerance=1e-5)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 1, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 2, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[1], 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[1], 6, 4)

# check that the exact endpoints don't raise an exception with 0 tol
new_t = [0.0, 0.2]
new_data = data.get_interpolated_data(new_t, tolerance=0.0)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 1, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 2, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[1], 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[1], 6, 4)

def test_to_serializable(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
Expand Down
Loading