diff --git a/pyomo/contrib/mpc/data/interpolation.py b/pyomo/contrib/mpc/data/interpolation.py new file mode 100644 index 00000000000..491013d7ae2 --- /dev/null +++ b/pyomo/contrib/mpc/data/interpolation.py @@ -0,0 +1,79 @@ +# ___________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2025 +# National Technology and Engineering Solutions of Sandia, LLC +# Under the terms of Contract DE-NA0003525 with National Technology and +# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain +# rights in this software. +# This software is distributed under the 3-clause BSD License. +# ___________________________________________________________________________ + +from bisect import bisect_right + + +def _get_time_index_vec(time_set, time_data): + """Get the position index of time_data above and below the times in + time_set. This can be used to find positions of points to interpolate + between. + + Parameters + ---------- + time_set: iterable + Time points to locate + time_data: iterable + Sorted time points to locate time_set in + + Returns + ------- + numpy.array + Position index of the first time in time_data greater than the + corresponding points time_set. If a time is less than all the times + in time_data return 1. If a time is greater than all times time_data + set return the last index of time_data. + """ + pos = [None] * len(time_set) + for i, t in enumerate(time_set): + pos[i] = bisect_right(time_data, t) + if pos[i] == 0: + pos[i] = 1 + elif pos[i] == len(time_data): + pos[i] = len(time_data) - 1 + return pos + + +def _get_interp_expr_vec(time_set, time_data, data, indexes=None): + """Return an array of floats interpolated at the time points in time_set + from data defined at time_data. + + Parameters + ---------- + time_set: iterable + Time points to locate + time_data: iterable + Sorted time points to locate time_set in + data: iterable + Data corresponding to times in time_data, must have the same + length as time data. + indexes: numpy.array + Numpy array of position indexes of the time points to interpolate in the + time data. The format is the same as returned by ``_get_time_index_vec()``. + If this is None, ``_get_time_index_vec()`` is called. The reason to pass + this is to avoid multiple position searches when interpolating multiple + outputs with the same time points. + + Returns + ------- + list + If data are Pyomo components, this will return Pyomo expressions. + If data are floats, this will return floats. + """ + if indexes is None: + indexes = _get_time_index_vec(time_set, time_data) + expr = [None] * len(time_set) + for i, (h, t) in enumerate(zip(indexes, time_set)): + l = h - 1 + expr[i] = data[l] + (data[h] - data[l]) / (time_data[h] - time_data[l]) * ( + t - time_data[l] + ) + return expr diff --git a/pyomo/contrib/mpc/data/series_data.py b/pyomo/contrib/mpc/data/series_data.py index c29c981879d..a922ebb5716 100644 --- a/pyomo/contrib/mpc/data/series_data.py +++ b/pyomo/contrib/mpc/data/series_data.py @@ -15,7 +15,10 @@ from pyomo.contrib.mpc.data.get_cuid import get_indexed_cuid from pyomo.contrib.mpc.data.dynamic_data_base import _is_iterable, _DynamicDataBase from pyomo.contrib.mpc.data.scalar_data import ScalarData - +from pyomo.contrib.mpc.data.interpolation import ( + _get_time_index_vec, + _get_interp_expr_vec, +) TimeSeriesTuple = namedtuple("TimeSeriesTuple", ["data", "time"]) @@ -152,6 +155,50 @@ def get_data_at_time(self, time=None, tolerance=0.0): indices = indices[0] return self.get_data_at_time_indices(indices) + def get_interpolated_data(self, time=None, tolerance=0.0): + """ + Returns the data associated with the provided time point or points by + linear interpolation. + + Parameters + ---------- + time: Float or iterable + The time point or points corresponding to returned data. + tolerance: float + Tolerance used when checking if time points are inside the data + range. + + Returns + ------- + TimeSeriesData or ~scalar_data.ScalarData + TimeSeriesData containing only the specified time points + or dict mapping CUIDs to values at the specified scalar time + point. + + """ + if time is None: + # If time is not specified, assume we want the entire time + # set. Skip all the overhead, don't create a new object, and + # return self. + return self + is_iterable = _is_iterable(time) + if not is_iterable: + time = [time] + for t in time: + if t > self._time[-1] + tolerance or t < self._time[0] - tolerance: + raise RuntimeError("Requesting interpolation outside data range.") + idxs = _get_time_index_vec(time, self._time) + data = {} + for cuid in self._data: + v = _get_interp_expr_vec(time, self._time, self._data[cuid], idxs) + data[cuid] = v + if is_iterable: + return TimeSeriesData(data, list(time)) + else: + for cuid in self._data: + data[cuid] = data[cuid][0] + return ScalarData(data) + def to_serializable(self): """ Convert to json-serializable object. diff --git a/pyomo/contrib/mpc/data/tests/test_series_data.py b/pyomo/contrib/mpc/data/tests/test_series_data.py index a25a18103df..d5c267d3ef3 100644 --- a/pyomo/contrib/mpc/data/tests/test_series_data.py +++ b/pyomo/contrib/mpc/data/tests/test_series_data.py @@ -126,6 +126,63 @@ def test_get_data_at_time_with_tolerance(self): with self.assertRaisesRegex(RuntimeError, msg): new_data = data.get_data_at_time(-0.01, tolerance=1e-3) + def test_get_data_interpolate(self): + m = self._make_model() + data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]} + data = TimeSeriesData(data_dict, m.time) + new_data = data.get_interpolated_data(0.05) + self.assertEqual(ScalarData({m.var[:, "A"]: 1.5, m.var[:, "B"]: 3}), new_data) + + t1 = 0.05 + new_data = data.get_interpolated_data([t1]) + self.assertEqual( + TimeSeriesData({m.var[:, "A"]: [1.5], m.var[:, "B"]: [3]}, [t1]), new_data + ) + + new_t = [0.05, 0.15] + new_data = data.get_interpolated_data(new_t) + self.assertEqual( + TimeSeriesData({m.var[:, "A"]: [1.5, 2.5], m.var[:, "B"]: [3, 5]}, new_t), + new_data, + ) + + def test_get_data_interpolate_range_check(self): + m = self._make_model() + data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]} + data = TimeSeriesData(data_dict, m.time) + msg = "Requesting interpolation outside data range." + with self.assertRaisesRegex(RuntimeError, msg): + new_data = data.get_interpolated_data(0.2 + 1e-6) + with self.assertRaisesRegex(RuntimeError, msg): + new_data = data.get_interpolated_data(0.0 - 1e-6) + new_data = data.get_interpolated_data(0.2 + 1e-6, tolerance=1e-5) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"]), 3, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"]), 6, 4) + + t1 = 0.2 + 1e-6 + with self.assertRaisesRegex(RuntimeError, msg): + new_data = data.get_interpolated_data([t1]) + new_data = data.get_interpolated_data([t1], tolerance=1e-5) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 3, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 6, 4) + + new_t = [0.0 - 1e-6, 0.2 + 1e-6] + with self.assertRaisesRegex(RuntimeError, msg): + new_data = data.get_interpolated_data(new_t) + new_data = data.get_interpolated_data(new_t, tolerance=1e-5) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 1, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 2, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[1], 3, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[1], 6, 4) + + # check that the exact endpoints don't raise an exception with 0 tol + new_t = [0.0, 0.2] + new_data = data.get_interpolated_data(new_t, tolerance=0.0) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 1, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 2, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[1], 3, 4) + self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[1], 6, 4) + def test_to_serializable(self): m = self._make_model() data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}