Coverage for middle_layer/allocate/domain_layer/services/pressure_cooker_service.py: 97.14%

105 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2026-03-09 06:13 +0000

1# Copyright 2024 Associated Universities, Inc. 

2# 

3# This file is part of Telescope Time Allocation Tools (TTAT). 

4# 

5# TTAT is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# any later version. 

9# 

10# TTAT is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with TTAT. If not, see <https://www.gnu.org/licenses/>. 

17import operator 

18from collections import defaultdict 

19from collections.abc import Iterable 

20from datetime import datetime, timedelta 

21from itertools import chain, compress, tee 

22from typing import Callable, TypeVar, override 

23 

24import astropy.units as u 

25import pendulum 

26 

27from allocate.domain_layer.entities.allocation_version import AllocationVersion 

28from allocate.domain_layer.entities.available_time_model import AvailableTimeModel 

29from allocate.domain_layer.entities.available_time_model_version import AvailableTimeModelVersion 

30from allocate.domain_layer.entities.observation_specification_disposition import ObservationSpecificationDisposition 

31from allocate.domain_layer.entities.time_bins import TimeBins 

32from allocate.domain_layer.entities.time_block import TimeBlock 

33from allocate.domain_layer.entities.time_model import TimeModel 

34from allocate.domain_layer.entities.time_reservation import TimeReservation 

35from common.domain_layer import JSON, JSON_OBJECT 

36 

37 

38def pressure_cooker( 

39 available_time_model_version: AvailableTimeModelVersion, 

40 allocation_version: AllocationVersion | None = None, 

41 apply_time_reservations: bool = True, 

42) -> list[JSON_OBJECT]: 

43 """ 

44 Assembles (and currently fakes most of) the data needed to graph pressure 

45 plots for specified AllocationVersion and/or AvailableTimeModelVersion. An ATMV is required while an AV can be 

46 optionally supplied. 

47 

48 :param available_time_model_version: AvailableTimeModelVersion to use in calculating pressure 

49 :param allocation_version: Optional AllocationVersion to use in calculating pressure 

50 :param apply_time_reservations: Defaults to True, apply time reservations to the pressure plot data. If False, 

51 do not apply time reservations to the pressure plot data. 

52 :return: pressure plot data as a list of dictionaries 

53 """ 

54 result = [] 

55 

56 # TODO: use of apply_time_reservations in timebin calculation is not currently implemented. 

57 

58 atms = available_time_model_version.available_time_models 

59 spillage = SpillageTimeModel(atms) 

60 atms_plus_spillover = atms + [spillage] 

61 

62 # create the time reservation time bins now, because it will be needed unconditionally 

63 time_reservation_bins = available_time_model_version.reserved_time_bins() # gives negative numbers 

64 

65 # generate a fake one for the spillage time model (use `dict` here to prevent sharing) 

66 time_reservation_bins[spillage] = AvailableTimeModelVersion.make_standard_reserved_bins() 

67 

68 osd_pressure_by_priority: dict[TimeModel, dict[str, TimeBins]] = dict((atm, {}) for atm in atms_plus_spillover) 

69 

70 remaining_osds: list[ObservationSpecificationDisposition] = ( 

71 list(chain(*(ad.observation_specification_dispositions for ad in allocation_version.allocation_dispositions))) 

72 if allocation_version 

73 else [] 

74 ) 

75 osd_atms: dict[int, list[int]] = {} 

76 for osd in remaining_osds: 

77 osd_atms[osd.observation_specification_disposition_id] = [] 

78 

79 for atm in atms_plus_spillover: 

80 available_time = atm.to_timebins() # gives positive numbers for available time 

81 reserved_pressure = TimeBins() 

82 for bins in [value for value in time_reservation_bins[atm].values()]: 

83 reserved_pressure = reserved_pressure - bins # gives positive numbers for reserved time 

84 

85 # for each ATM we first need to filter out the time reservations and OSDs that could fit within the date range 

86 # and whatever other criteria for filtering we are using, like the VLA configuration, based on 

87 # the facility strategies 

88 

89 # initialize this priority to timebins instance 

90 osd_pressure_by_priority[atm] = dict((priority, TimeBins()) for priority in ["A", "B", "C", "D", "N", "NP"]) 

91 osd_ids: list[int] = [] 

92 

93 if allocation_version: 

94 # separate the OSDs we can use from the remainder 

95 remaining_osds, applicable = [list(x) for x in partition(atm.is_applicable, remaining_osds)] 

96 

97 # Iterate OSDs to fill Scheduling Priorities 

98 for osd in applicable: 

99 osd_pressure_by_priority[atm][osd.scheduling_priority.name] += osd.to_timebins() 

100 osd_atms[osd.observation_specification_disposition_id].append(atm.available_time_model_id) 

101 osd_ids.append(osd.observation_specification_disposition_id) 

102 

103 osd_pressure = TimeBins() 

104 for bins in [osd for osd in osd_pressure_by_priority[atm].values()]: 

105 osd_pressure = osd_pressure + bins 

106 

107 # if this ATM is the spillage ATM, now we can detect if it is empty and remove it from the output 

108 if atm.is_time_spillage() and sum((tb.total_hours() for tb in osd_pressure_by_priority[atm].values())) == 0: 

109 # break out of the loop here 

110 continue 

111 

112 result.append( 

113 create_pressure_json( 

114 allocation_version, 

115 available_time_model_version, 

116 atm, 

117 available_time, 

118 osd_pressure_by_priority, 

119 time_reservation_bins, 

120 osd_ids, 

121 ) 

122 ) 

123 

124 return result 

125 

126 

127def create_pressure_json( 

128 allocation_version: AllocationVersion | None, 

129 atm_version: AvailableTimeModelVersion, 

130 atm: TimeModel, 

131 available_time: TimeBins, 

132 osd_pressure_by_priority: dict[TimeModel, dict[str, TimeBins]], 

133 time_reservation_bins: dict[TimeModel, dict[str, TimeBins]], 

134 osd_ids_p: list[int], 

135) -> JSON_OBJECT: 

136 # soothe the type checker 

137 osd_ids: list[JSON] = list(osd_ids_p) 

138 return { 

139 "availableTimeModelId": atm.available_time_model_id if hasattr(atm, "available_time_model_id") else 0, 

140 "allocationVersionId": None if allocation_version is None else allocation_version.allocation_version_id, 

141 "availableTimeModelVersionId": atm_version.available_time_model_version_id, 

142 "availableTimeModelName": atm.name, 

143 "availableTimeModelStartDate": atm.start_date.isoformat(), 

144 "availableTimeModelEndDate": atm.end_date.isoformat(), 

145 "timeBins": [ 

146 { 

147 "value": i, 

148 "availableTime": available_time[i], 

149 "schedulingPriorities": dict( 

150 (priority, pressure[i]) for priority, pressure in osd_pressure_by_priority[atm].items() 

151 ), 

152 # correct the displayed pressure to be positive 

153 "reservedTimes": [ 

154 {"label": label, "value": -pressure[i]} 

155 for label, pressure in time_reservation_bins[atm].items() 

156 if pressure.total_hours() != 0 

157 ], 

158 } 

159 for i in range(0, 24) 

160 ], 

161 "osdIds": osd_ids, 

162 } 

163 

164 

165def create_calendar_json( 

166 *, 

167 allocation_version: AllocationVersion | None, 

168 available_time_model_version: AvailableTimeModelVersion | None, 

169 start_date: datetime, 

170 days: int, 

171) -> dict[str, dict[int, dict[str, list[int]]]]: 

172 """ 

173 Create the data for a calendar plot. 

174 

175 Example: 

176 { 

177 2025-03-01: { 

178 0...23: {osdIds: [], timeReservationIds:[]}, 

179 } 

180 

181 :param allocation_version: the AV to base on 

182 :param available_time_model_version: the ATM version to base on 

183 :param start_date: the start date (in case we don't want to use the one in the ATM?) 

184 :param days: the number of days (in case we don't want to use the end date in the ATM?) 

185 :return: a JSON of dates -> number of items on that date 

186 """ 

187 # we want the result to be a dictionary of day -> number of reservations 

188 result: defaultdict[str, dict[int, dict[str, list[int]]]] = defaultdict( 

189 lambda: defaultdict(lambda: dict(osdIds=[], timeReservationIds=[])) 

190 ) 

191 end_date = start_date + pendulum.duration(days=days) 

192 

193 # handle the allocation version 

194 if allocation_version: 

195 pass 

196 

197 # handle time reservations 

198 if available_time_model_version: 

199 for tr in available_time_model_version.reservations: 

200 tr: TimeReservation = tr 

201 for date in tr.repeats(): 

202 if start_date <= date <= end_date and tr.start_time < tr.stop_time: 

203 # if we go from 3:01 to 4:01 then we need to mark time used in 3:00 and 4:00, so we are rounding 

204 # down in the first case and up in the second case 

205 for hour in range(int(tr.start_time.to_value(u.h)), int(tr.stop_time.to_value(u.h))): 

206 result[date.strftime("%Y-%m-%d")][hour]["timeReservationIds"].append(tr.time_reservation_id) 

207 

208 # expand the range for midnight crossings 

209 elif ( 

210 start_date - timedelta(days=1) <= date <= end_date + timedelta(days=1) 

211 and tr.start_time > tr.stop_time 

212 ): 

213 # in the case we wrap midnight, we actually need two loops: 

214 # one from start time to midnight 

215 if date <= end_date: 

216 for hour in range(int(tr.start_time.to_value(u.h)), 24): 

217 result[date.strftime("%Y-%m-%d")][hour]["timeReservationIds"].append(tr.time_reservation_id) 

218 # and another one, from midnight to the end time 

219 # Note: this one has to be placed on the next day, which may or may not be in the range 

220 tomorrow = date + timedelta(days=1) 

221 if tomorrow <= end_date: 

222 for hour in range(0, int(tr.stop_time.to_value(u.h))): 

223 result[tomorrow.strftime("%Y-%m-%d")][hour]["timeReservationIds"].append( 

224 tr.time_reservation_id 

225 ) 

226 

227 # TODO: Implement this for OSDs. We currently don't have a good notion of fixed-date OSDs, so we cannot implement now 

228 return result 

229 

230 

231class SpillageTimeModel(TimeModel): 

232 """ 

233 Acts as an AvailableTimeModel, except it lacks restrictions on what can be added to it, and it cannot be persisted. 

234 Exists solely to provide a home for pressure that does not fit within other ATMs, thus, it is the "spillage" from 

235 running a pressure cooker. 

236 """ 

237 

238 def __init__(self, atms: list[AvailableTimeModel]): 

239 """ 

240 We use the ATMs as a whole to create a fictional time window, using the earliest start and latest end. 

241 :param atms: the ATMs relevant for calculating the start and end dates of this time spillage object. 

242 """ 

243 self.available_time_model_id: int | None = None 

244 self._start_date: datetime = min((atm.start_date for atm in atms), default=datetime.now()) 

245 self._end_date: datetime = max((atm.end_date for atm in atms), default=datetime.now() + timedelta(days=1)) 

246 

247 @property 

248 @override 

249 def start_date(self): 

250 return self._start_date 

251 

252 @property 

253 @override 

254 def end_date(self): 

255 return self._end_date 

256 

257 @override 

258 def is_time_spillage(self) -> bool: 

259 return True 

260 

261 @override 

262 def is_applicable(self, time_block: TimeBlock): 

263 """ 

264 Unconditionally returns True regardless of the OSD 

265 :param _: the OSD (we don't actually read this value) 

266 :return: True, always 

267 """ 

268 return True 

269 

270 @property 

271 @override 

272 def name(self) -> str: 

273 return "Unallocated" 

274 

275 @override 

276 def to_timebins(self) -> TimeBins: 

277 return TimeBins.from_start_duration(0 * u.h, 24 * u.h, 24 * u.h, ((self.end_date - self.start_date).days)) 

278 

279 

280# Stolen from more-itertools: https://more-itertools.readthedocs.io/en/stable/_modules/more_itertools/recipes.html#partition 

281G = TypeVar("G") 

282 

283 

284def partition(pred: Callable[[G], bool], iterable: Iterable[G]) -> tuple[Iterable[G], Iterable[G]]: 

285 """ 

286 Returns a 2-tuple of iterables derived from the input iterable. 

287 The first yields the items that have ``pred(item) == False``. 

288 The second yields the items that have ``pred(item) == True``. 

289 

290 >>> is_odd = lambda x: x % 2 != 0 

291 >>> iterable = range(10) 

292 >>> even_items, odd_items = partition(is_odd, iterable) 

293 >>> list(even_items), list(odd_items) 

294 ([0, 2, 4, 6, 8], [1, 3, 5, 7, 9]) 

295 

296 If *pred* is None, :func:`bool` is used. 

297 

298 >>> iterable = [0, 1, False, True, '', ' '] 

299 >>> false_items, true_items = partition(None, iterable) 

300 >>> list(false_items), list(true_items) 

301 ([0, False, ''], [1, True, ' ']) 

302 """ 

303 if pred is None: 

304 pred = bool 

305 

306 t1, t2, p = tee(iterable, 3) 

307 p1, p2 = tee(map(pred, p)) 

308 return (compress(t1, map(operator.not_, p1)), compress(t2, p2))