Coverage for physioblocks / library / functions / piecewise.py: 99%
74 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-09 16:40 +0100
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-09 16:40 +0100
1# SPDX-FileCopyrightText: Copyright INRIA
2#
3# SPDX-License-Identifier: LGPL-3.0-only
4#
5# Copyright INRIA
6#
7# This file is part of PhysioBlocks, a library mostly developed by the
8# [Ananke project-team](https://team.inria.fr/ananke) at INRIA.
9#
10# Authors:
11# - Colin Drieu
12# - Dominique Chapelle
13# - François Kimmig
14# - Philippe Moireau
15#
16# PhysioBlocks is free software: you can redistribute it and/or modify it under the
17# terms of the GNU Lesser General Public License as published by the Free Software
18# Foundation, version 3 of the License.
19#
20# PhysioBlocks is distributed in the hope that it will be useful, but WITHOUT ANY
21# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
22# PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
23#
24# You should have received a copy of the GNU Lesser General Public License along with
25# PhysioBlocks. If not, see <https://www.gnu.org/licenses/>.
27"""Declare functions to define piecewise functions in the configuration"""
29from __future__ import annotations
31from dataclasses import dataclass
32from typing import Any
34import numpy as np
35from numpy.typing import NDArray
37from physioblocks.registers.type_register import register_type
38from physioblocks.simulation import AbstractFunction
40# Piecewise linear function type name
41PIECEWISE_LINEAR_NAME = "piecewise_linear"
44@register_type(PIECEWISE_LINEAR_NAME)
45@dataclass
46class PiecewiseLinear(AbstractFunction):
47 """
48 Defines an evaluation method to get the value of a piecewise function
49 for the given time.
50 """
52 points_abscissas: NDArray[np.float64]
53 """The function abscissas"""
55 points_ordinates: NDArray[np.float64]
56 """The function ordinates"""
58 left_value: np.float64 | None = None
59 """Function value when the evaluation point is before the provided abscissas.
60 If it is not provided, it is the first ordinate value."""
62 right_value: np.float64 | None = None
63 """Function value when the evaluation point is after the provided abscissas.
64 If it is not provided, it is the last ordinate value."""
66 def eval(self, time: np.float64) -> Any:
67 """
68 Evaluate piecewise function for the given time.
70 :param time: the evaluated time
71 :type time: np.float64
73 :return: the activation point_value
74 :rtype: np.float64
75 """
76 return np.interp(
77 time,
78 self.points_abscissas,
79 self.points_ordinates,
80 self.left_value,
81 self.right_value,
82 )
85# Piecewise linear periodic function id
86PIECEWISE_LINEAR_PERIODIC_NAME = "piecewise_linear_periodic"
89@register_type(PIECEWISE_LINEAR_PERIODIC_NAME)
90@dataclass
91class PiecewiseLinearPeriodic(AbstractFunction):
92 """
93 Defines an evaluation method to get the value of a piecewise periodic function
94 for the given time.
95 """
97 period: np.float64
98 """The function period"""
100 points_abscissas: NDArray[np.float64]
101 """The function abscissas"""
103 points_ordinates: NDArray[np.float64]
104 """The function ordinates"""
106 def eval(self, time: np.float64) -> Any:
107 """
108 Evaluate piecewise periodic function for the given time.
110 :param time: the evaluated time
111 :type time: np.float64
113 :return: the activation point_value
114 :rtype: np.float64
115 """
116 return np.interp(
117 time, self.points_abscissas, self.points_ordinates, period=self.period
118 )
121# Rescale Two Phases function id
122RESCALE_TWO_PHASES_FUNCTION_NAME = "rescale_two_phases_function"
125@register_type(RESCALE_TWO_PHASES_FUNCTION_NAME)
126class RescaleTwoPhasesFunction(AbstractFunction):
127 """
128 Rescale each part of the input function (a linear interpollation) depending on
129 the proportion of variation of phase 0 when the period differs from the
130 reference function.
131 """
133 rescaled_period: float
134 """The actual function period"""
136 reference_function: list[tuple[float, float]]
137 """The reference function. Coordinates format is ``(abscissa, ordinate)``"""
139 alpha: float
140 """Proportion of the variation of phase 0"""
142 phases: list[int]
143 """For each intervals point of the reference, determine if it belong to
144 phase 0 or 1."""
146 def __init__(
147 self,
148 rescaled_period: float,
149 reference_function: list[tuple[float, float]],
150 alpha: float,
151 phases: list[int],
152 ):
153 if len(phases) != len(reference_function) - 1:
154 raise ValueError(
155 "A phase should be defined for each interval defined in the "
156 "reference function."
157 )
159 if any([elem not in [0, 1] for elem in phases]):
160 raise ValueError(
161 str.format(
162 "There are only two phases allowed: 0 or 1, got {0}.", phases
163 )
164 )
166 if alpha >= 1.0 or alpha <= 0.0:
167 raise ValueError(
168 str.format(
169 "The proportion of the variation of phase 0 should be in ]0, 1[, "
170 "got {0}",
171 alpha,
172 )
173 )
175 # check if the reference function is sorted
176 if (
177 all(
178 [
179 reference_function[k][0] > reference_function[k - 1][0]
180 for k in range(1, len(reference_function))
181 ]
182 )
183 is False
184 ):
185 raise ValueError(
186 str.format(
187 "Reference function abscissas should be sorted, got {0}",
188 [coord[0] for coord in reference_function],
189 )
190 )
192 self.reference_function = reference_function
193 reference_period = (
194 self.reference_function[-1][0] - self.reference_function[0][0]
195 )
196 self.phases = phases
197 self.alpha = alpha
199 self.beta = rescaled_period / reference_period
200 duration_phase_0_ref = sum(
201 [
202 self.reference_function[index][0]
203 - self.reference_function[index - 1][0]
204 for index in range(1, len(self.reference_function))
205 if phases[index - 1] == 0
206 ]
207 )
208 duration_phase_1_ref = reference_period - duration_phase_0_ref
210 scale_factor_phase_0 = (
211 1.0
212 + self.alpha * (self.beta - 1.0) * reference_period / duration_phase_0_ref
213 )
214 scale_factor_phase_1 = (
215 1.0
216 + (1.0 - self.alpha)
217 * (self.beta - 1.0)
218 * reference_period
219 / duration_phase_1_ref
220 )
222 if scale_factor_phase_0 <= 0 or scale_factor_phase_1 <= 0:
223 raise ValueError(
224 str.format(
225 "Scale factors should not be negatives. Got ({0}, {1}) for "
226 "phase 0 and 1 respectivly. You can try changing alpha.",
227 scale_factor_phase_0,
228 scale_factor_phase_1,
229 )
230 )
231 abscissas = [self.reference_function[0][0]]
232 for index in range(1, len(self.reference_function)):
233 rescaled_abs = (
234 abscissas[index - 1]
235 + (
236 self.reference_function[index][0]
237 - self.reference_function[index - 1][0]
238 )
239 * scale_factor_phase_0
240 if phases[index - 1] == 0
241 else abscissas[index - 1]
242 + (
243 self.reference_function[index][0]
244 - self.reference_function[index - 1][0]
245 )
246 * scale_factor_phase_1
247 )
248 abscissas.append(rescaled_abs)
250 ordinates = [value[1] for value in self.reference_function]
252 self.rescaled_period = abscissas[-1]
253 self.function_abcissas = np.array(abscissas)
254 self.function_ordinates = np.array(ordinates)
256 def eval(self, time: float) -> Any:
257 """
258 Evaluate the function.
260 :param time: the evaluated time
261 :type time: float
263 :return: the function value
264 :rtype: np.float64
265 """
266 return np.interp(
267 time,
268 self.function_abcissas,
269 self.function_ordinates,
270 period=self.rescaled_period,
271 )