Coverage for middle_layer/allocate/domain_layer/services/pressure_cooker_service.py: 97.14%
105 statements
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
1# Copyright 2024 Associated Universities, Inc.
2#
3# This file is part of Telescope Time Allocation Tools (TTAT).
4#
5# TTAT is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# any later version.
9#
10# TTAT is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with TTAT. If not, see <https://www.gnu.org/licenses/>.
17import operator
18from collections import defaultdict
19from collections.abc import Iterable
20from datetime import datetime, timedelta
21from itertools import chain, compress, tee
22from typing import Callable, TypeVar, override
24import astropy.units as u
25import pendulum
27from allocate.domain_layer.entities.allocation_version import AllocationVersion
28from allocate.domain_layer.entities.available_time_model import AvailableTimeModel
29from allocate.domain_layer.entities.available_time_model_version import AvailableTimeModelVersion
30from allocate.domain_layer.entities.observation_specification_disposition import ObservationSpecificationDisposition
31from allocate.domain_layer.entities.time_bins import TimeBins
32from allocate.domain_layer.entities.time_block import TimeBlock
33from allocate.domain_layer.entities.time_model import TimeModel
34from allocate.domain_layer.entities.time_reservation import TimeReservation
35from common.domain_layer import JSON, JSON_OBJECT
38def pressure_cooker(
39 available_time_model_version: AvailableTimeModelVersion,
40 allocation_version: AllocationVersion | None = None,
41 apply_time_reservations: bool = True,
42) -> list[JSON_OBJECT]:
43 """
44 Assembles (and currently fakes most of) the data needed to graph pressure
45 plots for specified AllocationVersion and/or AvailableTimeModelVersion. An ATMV is required while an AV can be
46 optionally supplied.
48 :param available_time_model_version: AvailableTimeModelVersion to use in calculating pressure
49 :param allocation_version: Optional AllocationVersion to use in calculating pressure
50 :param apply_time_reservations: Defaults to True, apply time reservations to the pressure plot data. If False,
51 do not apply time reservations to the pressure plot data.
52 :return: pressure plot data as a list of dictionaries
53 """
54 result = []
56 # TODO: use of apply_time_reservations in timebin calculation is not currently implemented.
58 atms = available_time_model_version.available_time_models
59 spillage = SpillageTimeModel(atms)
60 atms_plus_spillover = atms + [spillage]
62 # create the time reservation time bins now, because it will be needed unconditionally
63 time_reservation_bins = available_time_model_version.reserved_time_bins() # gives negative numbers
65 # generate a fake one for the spillage time model (use `dict` here to prevent sharing)
66 time_reservation_bins[spillage] = AvailableTimeModelVersion.make_standard_reserved_bins()
68 osd_pressure_by_priority: dict[TimeModel, dict[str, TimeBins]] = dict((atm, {}) for atm in atms_plus_spillover)
70 remaining_osds: list[ObservationSpecificationDisposition] = (
71 list(chain(*(ad.observation_specification_dispositions for ad in allocation_version.allocation_dispositions)))
72 if allocation_version
73 else []
74 )
75 osd_atms: dict[int, list[int]] = {}
76 for osd in remaining_osds:
77 osd_atms[osd.observation_specification_disposition_id] = []
79 for atm in atms_plus_spillover:
80 available_time = atm.to_timebins() # gives positive numbers for available time
81 reserved_pressure = TimeBins()
82 for bins in [value for value in time_reservation_bins[atm].values()]:
83 reserved_pressure = reserved_pressure - bins # gives positive numbers for reserved time
85 # for each ATM we first need to filter out the time reservations and OSDs that could fit within the date range
86 # and whatever other criteria for filtering we are using, like the VLA configuration, based on
87 # the facility strategies
89 # initialize this priority to timebins instance
90 osd_pressure_by_priority[atm] = dict((priority, TimeBins()) for priority in ["A", "B", "C", "D", "N", "NP"])
91 osd_ids: list[int] = []
93 if allocation_version:
94 # separate the OSDs we can use from the remainder
95 remaining_osds, applicable = [list(x) for x in partition(atm.is_applicable, remaining_osds)]
97 # Iterate OSDs to fill Scheduling Priorities
98 for osd in applicable:
99 osd_pressure_by_priority[atm][osd.scheduling_priority.name] += osd.to_timebins()
100 osd_atms[osd.observation_specification_disposition_id].append(atm.available_time_model_id)
101 osd_ids.append(osd.observation_specification_disposition_id)
103 osd_pressure = TimeBins()
104 for bins in [osd for osd in osd_pressure_by_priority[atm].values()]:
105 osd_pressure = osd_pressure + bins
107 # if this ATM is the spillage ATM, now we can detect if it is empty and remove it from the output
108 if atm.is_time_spillage() and sum((tb.total_hours() for tb in osd_pressure_by_priority[atm].values())) == 0:
109 # break out of the loop here
110 continue
112 result.append(
113 create_pressure_json(
114 allocation_version,
115 available_time_model_version,
116 atm,
117 available_time,
118 osd_pressure_by_priority,
119 time_reservation_bins,
120 osd_ids,
121 )
122 )
124 return result
127def create_pressure_json(
128 allocation_version: AllocationVersion | None,
129 atm_version: AvailableTimeModelVersion,
130 atm: TimeModel,
131 available_time: TimeBins,
132 osd_pressure_by_priority: dict[TimeModel, dict[str, TimeBins]],
133 time_reservation_bins: dict[TimeModel, dict[str, TimeBins]],
134 osd_ids_p: list[int],
135) -> JSON_OBJECT:
136 # soothe the type checker
137 osd_ids: list[JSON] = list(osd_ids_p)
138 return {
139 "availableTimeModelId": atm.available_time_model_id if hasattr(atm, "available_time_model_id") else 0,
140 "allocationVersionId": None if allocation_version is None else allocation_version.allocation_version_id,
141 "availableTimeModelVersionId": atm_version.available_time_model_version_id,
142 "availableTimeModelName": atm.name,
143 "availableTimeModelStartDate": atm.start_date.isoformat(),
144 "availableTimeModelEndDate": atm.end_date.isoformat(),
145 "timeBins": [
146 {
147 "value": i,
148 "availableTime": available_time[i],
149 "schedulingPriorities": dict(
150 (priority, pressure[i]) for priority, pressure in osd_pressure_by_priority[atm].items()
151 ),
152 # correct the displayed pressure to be positive
153 "reservedTimes": [
154 {"label": label, "value": -pressure[i]}
155 for label, pressure in time_reservation_bins[atm].items()
156 if pressure.total_hours() != 0
157 ],
158 }
159 for i in range(0, 24)
160 ],
161 "osdIds": osd_ids,
162 }
165def create_calendar_json(
166 *,
167 allocation_version: AllocationVersion | None,
168 available_time_model_version: AvailableTimeModelVersion | None,
169 start_date: datetime,
170 days: int,
171) -> dict[str, dict[int, dict[str, list[int]]]]:
172 """
173 Create the data for a calendar plot.
175 Example:
176 {
177 2025-03-01: {
178 0...23: {osdIds: [], timeReservationIds:[]},
179 }
181 :param allocation_version: the AV to base on
182 :param available_time_model_version: the ATM version to base on
183 :param start_date: the start date (in case we don't want to use the one in the ATM?)
184 :param days: the number of days (in case we don't want to use the end date in the ATM?)
185 :return: a JSON of dates -> number of items on that date
186 """
187 # we want the result to be a dictionary of day -> number of reservations
188 result: defaultdict[str, dict[int, dict[str, list[int]]]] = defaultdict(
189 lambda: defaultdict(lambda: dict(osdIds=[], timeReservationIds=[]))
190 )
191 end_date = start_date + pendulum.duration(days=days)
193 # handle the allocation version
194 if allocation_version:
195 pass
197 # handle time reservations
198 if available_time_model_version:
199 for tr in available_time_model_version.reservations:
200 tr: TimeReservation = tr
201 for date in tr.repeats():
202 if start_date <= date <= end_date and tr.start_time < tr.stop_time:
203 # if we go from 3:01 to 4:01 then we need to mark time used in 3:00 and 4:00, so we are rounding
204 # down in the first case and up in the second case
205 for hour in range(int(tr.start_time.to_value(u.h)), int(tr.stop_time.to_value(u.h))):
206 result[date.strftime("%Y-%m-%d")][hour]["timeReservationIds"].append(tr.time_reservation_id)
208 # expand the range for midnight crossings
209 elif (
210 start_date - timedelta(days=1) <= date <= end_date + timedelta(days=1)
211 and tr.start_time > tr.stop_time
212 ):
213 # in the case we wrap midnight, we actually need two loops:
214 # one from start time to midnight
215 if date <= end_date:
216 for hour in range(int(tr.start_time.to_value(u.h)), 24):
217 result[date.strftime("%Y-%m-%d")][hour]["timeReservationIds"].append(tr.time_reservation_id)
218 # and another one, from midnight to the end time
219 # Note: this one has to be placed on the next day, which may or may not be in the range
220 tomorrow = date + timedelta(days=1)
221 if tomorrow <= end_date:
222 for hour in range(0, int(tr.stop_time.to_value(u.h))):
223 result[tomorrow.strftime("%Y-%m-%d")][hour]["timeReservationIds"].append(
224 tr.time_reservation_id
225 )
227 # TODO: Implement this for OSDs. We currently don't have a good notion of fixed-date OSDs, so we cannot implement now
228 return result
231class SpillageTimeModel(TimeModel):
232 """
233 Acts as an AvailableTimeModel, except it lacks restrictions on what can be added to it, and it cannot be persisted.
234 Exists solely to provide a home for pressure that does not fit within other ATMs, thus, it is the "spillage" from
235 running a pressure cooker.
236 """
238 def __init__(self, atms: list[AvailableTimeModel]):
239 """
240 We use the ATMs as a whole to create a fictional time window, using the earliest start and latest end.
241 :param atms: the ATMs relevant for calculating the start and end dates of this time spillage object.
242 """
243 self.available_time_model_id: int | None = None
244 self._start_date: datetime = min((atm.start_date for atm in atms), default=datetime.now())
245 self._end_date: datetime = max((atm.end_date for atm in atms), default=datetime.now() + timedelta(days=1))
247 @property
248 @override
249 def start_date(self):
250 return self._start_date
252 @property
253 @override
254 def end_date(self):
255 return self._end_date
257 @override
258 def is_time_spillage(self) -> bool:
259 return True
261 @override
262 def is_applicable(self, time_block: TimeBlock):
263 """
264 Unconditionally returns True regardless of the OSD
265 :param _: the OSD (we don't actually read this value)
266 :return: True, always
267 """
268 return True
270 @property
271 @override
272 def name(self) -> str:
273 return "Unallocated"
275 @override
276 def to_timebins(self) -> TimeBins:
277 return TimeBins.from_start_duration(0 * u.h, 24 * u.h, 24 * u.h, ((self.end_date - self.start_date).days))
280# Stolen from more-itertools: https://more-itertools.readthedocs.io/en/stable/_modules/more_itertools/recipes.html#partition
281G = TypeVar("G")
284def partition(pred: Callable[[G], bool], iterable: Iterable[G]) -> tuple[Iterable[G], Iterable[G]]:
285 """
286 Returns a 2-tuple of iterables derived from the input iterable.
287 The first yields the items that have ``pred(item) == False``.
288 The second yields the items that have ``pred(item) == True``.
290 >>> is_odd = lambda x: x % 2 != 0
291 >>> iterable = range(10)
292 >>> even_items, odd_items = partition(is_odd, iterable)
293 >>> list(even_items), list(odd_items)
294 ([0, 2, 4, 6, 8], [1, 3, 5, 7, 9])
296 If *pred* is None, :func:`bool` is used.
298 >>> iterable = [0, 1, False, True, '', ' ']
299 >>> false_items, true_items = partition(None, iterable)
300 >>> list(false_items), list(true_items)
301 ([0, False, ''], [1, True, ' '])
302 """
303 if pred is None:
304 pred = bool
306 t1, t2, p = tee(iterable, 3)
307 p1, p2 = tee(map(pred, p))
308 return (compress(t1, map(operator.not_, p1)), compress(t2, p2))