Coverage for foxplot / series.py: 93%

83 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-15 10:53 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3# 

4# SPDX-License-Identifier: Apache-2.0 

5 

6"""Series data unpacked from input dictionaries.""" 

7 

8import logging 

9from os.path import commonprefix 

10from typing import Dict, Literal, Optional, Union 

11 

12import numpy as np 

13from numpy.typing import NDArray 

14 

15from .exceptions import FoxplotError 

16from .labeled_series import LabeledSeries 

17 

18UNIT_TO_SECONDS: Dict[str, float] = { 

19 "s": 1.0, 

20 "M": 60.0, 

21 "H": 3600.0, 

22 "d": 3600.0 * 24.0, 

23 "m": 3600.0 * 24.0 * 30.0, 

24 "y": 3600.0 * 24.0 * 365.0, 

25} 

26 

27 

28def _operator_label(op: str, label: str, other_label: str) -> str: 

29 prefix = commonprefix([label, other_label]) 

30 n = len(prefix) 

31 return f"{prefix}({label[n:]} {op} {other_label[n:]})" 

32 

33 

34class Series(LabeledSeries): 

35 """Front class for time-series that users interact with.""" 

36 

37 _times: Optional[NDArray[np.float64]] 

38 _values: NDArray[np.float64] 

39 

40 def __init__( 

41 self, 

42 label: str, 

43 values: NDArray[np.float64], 

44 times: Optional[NDArray[np.float64]], 

45 ): 

46 """Initialize a new series. 

47 

48 Args: 

49 label: Label of the series in the input data. 

50 values: Values as a NumPy array. 

51 times: Corresponding time values as a NumPy array. 

52 """ 

53 super().__init__(label) 

54 self._times = times 

55 self._values = values 

56 

57 def __add__(self, other) -> "Series": 

58 """Sum of two series. 

59 

60 Args: 

61 other: Other series. 

62 """ 

63 return Series( 

64 label=_operator_label("+", self._label, other._label), 

65 values=self._values + other._values, 

66 times=self._times, 

67 ) 

68 

69 def __len__(self) -> int: 

70 """Length of the indexed series.""" 

71 return self._values.shape[0] 

72 

73 def __mul__(self, other: Union[float, "Series"]) -> "Series": 

74 """Elementwise product between two series. 

75 

76 Args: 

77 other: Other series. 

78 """ 

79 if isinstance(other, Series): 

80 return Series( 

81 label=_operator_label("*", self._label, other._label), 

82 values=self._values * other._values, 

83 times=self._times, 

84 ) 

85 return Series( 

86 label=_operator_label("*", self._label, str(other)), 

87 values=self._values * other, 

88 times=self._times, 

89 ) 

90 

91 def __neg__(self) -> "Series": 

92 """Unitary minus applied to the series.""" 

93 return Series( 

94 label=f"-{self._label}", 

95 values=-self._values, 

96 times=self._times, 

97 ) 

98 

99 def __repr__(self) -> str: 

100 """String representation of the series.""" 

101 return f"Time series with values: {self._values}" 

102 

103 def __truediv__(self, other: Union[int, float, "Series"]) -> "Series": 

104 """Elementwise ratio between two series. 

105 

106 Args: 

107 other: Other series. 

108 """ 

109 if isinstance(other, Series): 

110 return Series( 

111 label=_operator_label("/", self._label, other._label), 

112 values=self._values / other._values, 

113 times=self._times, 

114 ) 

115 return Series( 

116 label=_operator_label("/", self._label, str(other)), 

117 values=self._values / other, 

118 times=self._times, 

119 ) 

120 

121 def abs(self) -> "Series": 

122 """Return the series of absolute values of this series. 

123 

124 Returns: 

125 Array of windowed standard deviations along the series. 

126 """ 

127 return Series( 

128 label=f"abs({self._label})", 

129 values=np.abs(self._values), 

130 times=self._times, 

131 ) 

132 

133 def deriv( 

134 self, 

135 unit: Literal["s", "M", "H", "d", "m", "y"], 

136 cutoff: float = 0.0, 

137 ) -> "Series": 

138 """Time-derivative with optional low-pass filtering. 

139 

140 Args: 

141 unit: Time unit of the derivative and of the cutoff period. 

142 Use ``"s"`` for seconds, ``"M"`` for minute, ``"H"`` for 

143 hour, ``"d"`` for day, ``"m"`` for month, ``"y"`` for year. 

144 cutoff: Cutoff period of low-pass filtering, in the given unit. 

145 

146 Returns: 

147 Finite-difference derivative of the time series. If the series 

148 has unit [U], its time-derivative will be in [U] / [T] where 

149 [T] is the time unit specified by ``unit``. 

150 """ 

151 if self._times is None: 

152 raise FoxplotError(f"Unset time values for series '{self._label}'") 

153 nb_steps = len(self._times) 

154 filtered_output = None 

155 outputs = [] 

156 cutoff_period_s = UNIT_TO_SECONDS[unit] * cutoff 

157 for i in range(nb_steps - 1): 

158 dt = self._times[i + 1] - self._times[i] 

159 if dt < 0.0: 

160 logging.warning( 

161 "Invalid timestep dt=%f at time=%f", 

162 dt, 

163 self._times[i], 

164 ) 

165 outputs.append(np.nan) 

166 continue 

167 finite_diff = (self._values[i + 1] - self._values[i]) / dt 

168 if cutoff_period_s < 2 * dt or filtered_output is None: 

169 filtered_output = finite_diff 

170 outputs.append(finite_diff) 

171 else: 

172 gamma = 1.0 - np.exp(-dt / cutoff_period_s) 

173 filtered_output += gamma * (finite_diff - filtered_output) 

174 outputs.append(filtered_output) 

175 outputs.append(outputs[-1]) 

176 assert len(outputs) == len(self._values) == len(self._times) 

177 label = f"deriv({self._label}, unit={unit}" 

178 if cutoff > 1e-10: 

179 label += f", cutoff={cutoff} {unit}" 

180 label += ")" 

181 return Series( 

182 label=label, 

183 values=np.array(outputs) * UNIT_TO_SECONDS[unit], 

184 times=self._times, 

185 ) 

186 

187 def low_pass_filter(self, cutoff_period: float) -> "Series": 

188 """Apply low-pass filter to a time series. 

189 

190 Args: 

191 cutoff_period: Cutoff period of the low-pass filter. 

192 

193 Returns: 

194 Low-pass filtered time series. 

195 """ 

196 if self._times is None: 

197 raise FoxplotError(f"Unset time values for series '{self._label}'") 

198 nb_steps = len(self._times) 

199 output = self._values[0] 

200 outputs = [output] 

201 for i in range(nb_steps - 1): 

202 dt = self._times[i + 1] - self._times[i] 

203 if cutoff_period < 2 * dt: 

204 logging.warning( 

205 "Nyquist-Shannon sampling theorem: " 

206 "at time=%f, dt=%f but cutoff_period=%f", 

207 self._times[i], 

208 dt, 

209 cutoff_period, 

210 ) 

211 outputs.append(np.nan) 

212 continue 

213 forgetting_factor = np.exp(-dt / cutoff_period) 

214 output += (1.0 - forgetting_factor) * (self._values[i] - output) 

215 outputs.append(output) 

216 assert len(outputs) == len(self._values) == len(self._times) 

217 return Series( 

218 label=f"low_pass_filter({self._label}, {cutoff_period=})", 

219 values=np.array(outputs), 

220 times=self._times, 

221 ) 

222 

223 def std(self, window_size: int) -> "Series": 

224 """Return the rolling standard deviation of the series. 

225 

226 Args: 

227 window_size: Size of the rolling window in which to compute 

228 standard deviations. 

229 

230 Returns: 

231 Array of windowed standard deviations along the series. 

232 """ 

233 return Series( 

234 label=f"std({self._label}, {window_size})", 

235 values=np.std( 

236 np.lib.stride_tricks.sliding_window_view( 

237 self._values, window_size 

238 ), 

239 axis=-1, 

240 ), 

241 times=self._times, 

242 )