Coverage for middle_layer/propose/domain_layer/entities/observation_specification.py: 87.30%
252 statements
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
1# Copyright 2024 Associated Universities, Inc.
2#
3# This file is part of Telescope Time Allocation Tools (TTAT).
4#
5# TTAT is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# any later version.
9#
10# TTAT is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with TTAT. If not, see <https://www.gnu.org/licenses/>.
17#
18# pyright: reportImportCycles=false
19from abc import ABC, abstractmethod
20from dataclasses import dataclass, field
21from typing import TYPE_CHECKING, ClassVar, override
23import astropy.units as u
24from astropy.coordinates import SkyCoord
25from astropy.units import Quantity
26from astropy.units.core import Unit
27from sqlalchemy import Enum, ForeignKey
28from sqlalchemy.orm import Mapped, mapped_column, relationship
30from allocate.domain_layer.entities.cadence import Cadence
31from allocate.domain_layer.entities.temporal_reference import TemporalReference
32from common import ZERO_SECONDS
33from common.domain_layer import JSON_OBJECT, ValidUnits
34from common.domain_layer.entities.base import Base
35from propose.domain_layer.entities.reference_target import ReferenceTarget
36from propose.domain_layer.entities.scan import (
37 Scan,
38 ScanIntent,
39 ScanList,
40 Subscan,
41 SubscanIntent,
42 hardcoded_scan_intents,
43)
44from propose.domain_layer.entities.source import Source
45from propose.domain_layer.services.slew_time_calculator_services import (
46 SlewTimeCalculator,
47 calculate_slew_time_gbt,
48 calculate_slew_time_vla,
49 choose_slew_time_calculator,
50)
51from solicit.domain_layer.entities.capability import Facility
53if TYPE_CHECKING:
54 from propose.domain_layer.entities.proposal import AllocationRequest
56time = u.get_physical_type("time")
59class Schedulable(ABC):
60 name: str
61 field_source: SkyCoord
62 time_on_source: float
63 intent: ScanIntent
64 # TODO: facility_configuration updates needed
65 observation_configuration: dict[str, str] = field(default_factory=dict)
66 slew_time_calculator: SlewTimeCalculator
69@dataclass
70class Calibrator(Schedulable):
71 """Target used for Calibration"""
73 name: str
74 field_source: SkyCoord
75 time_on_source: float
76 intent: ScanIntent
77 slew_time_calculator: SlewTimeCalculator
78 observation_configuration: dict[str, str] = field(default_factory=dict)
81def __json__(self) -> JSON_OBJECT:
82 return {
83 "name": self.name,
84 "rightAscension": str(self.field_source.ra.to_string(decimal=True, unit=Unit("degree"))),
85 "declination": str(self.field_source.dec.to_string(decimal=True, unit=Unit("degree"))),
86 "timeOnSource": self.time_on_source,
87 "observationConfiguration": self.observation_configuration,
88 "intent": self.intent.name,
89 }
92# the calibration strategy and obs spec generators are placeholders that will be filled in during future stories.
93class CalibrationStrategy(ABC):
94 @abstractmethod
95 def get_calibrators(self, scan_intent_calibrate_ampli: ScanIntent) -> list:
96 pass
99class VLAContinuumCalibrationStrategy(CalibrationStrategy):
100 def get_calibrators(self, scan_intent_calibrate_ampli: ScanIntent) -> list:
101 return [
102 Calibrator(
103 "our first calibrator",
104 SkyCoord(ra=33 * Unit("degree"), dec=40.5 * Unit("degree")),
105 1.2,
106 scan_intent_calibrate_ampli,
107 choose_slew_time_calculator(Facility("VLA")),
108 {"vla_configuration": "B"},
109 )
110 ]
113class GBTSpectralLineCalibrationStrategy(CalibrationStrategy):
114 def get_calibrators(self, scan_intent_calibrate_ampli: ScanIntent) -> list:
115 return [
116 Calibrator(
117 "our first gbt calibrator",
118 SkyCoord(ra=48 * Unit("degree"), dec=38.2 * Unit("degree")),
119 1.7,
120 scan_intent_calibrate_ampli,
121 choose_slew_time_calculator(Facility("GBT")),
122 )
123 ]
126class ObservationSpecification(Base):
127 __tablename__ = "observation_specifications"
128 observation_specification_id: Mapped[int] = mapped_column(primary_key=True)
129 allocation_request_id: Mapped[int] = mapped_column(
130 ForeignKey("allocation_requests.allocation_request_id", ondelete="CASCADE"),
131 )
132 allocation_request: Mapped["AllocationRequest"] = relationship(back_populates="observation_specifications")
133 scans: Mapped[ScanList] = relationship(
134 Scan,
135 back_populates="observation_specification",
136 cascade="all, delete",
137 order_by=lambda: Scan.scan_id,
138 collection_class=ScanList,
139 passive_deletes=True,
140 )
141 is_proposer_modified: Mapped[bool]
142 time_range_start: Mapped[float] = mapped_column(default=0.0)
143 time_range_end: Mapped[float] = mapped_column(default=24.0)
144 temporal_reference: Mapped[TemporalReference] = mapped_column(
145 Enum(TemporalReference, native_enum=False),
146 ForeignKey("temporal_references.name"),
147 nullable=False,
148 ) # field(default=TemporalReference.LST)
149 is_requested_filler: Mapped[bool]
150 cadence_id: Mapped[int] = mapped_column(ForeignKey("cadences.cadence_id"))
151 cadence = relationship(Cadence, cascade="all, delete", uselist=False, foreign_keys=[cadence_id])
153 def __init__(
154 self,
155 allocation_request: "AllocationRequest",
156 scans: list[Scan],
157 is_proposer_modified: bool = False,
158 time_range_start: float = 0.0,
159 time_range_end: float = 24.0,
160 temporal_reference: TemporalReference = TemporalReference.LST,
161 is_requested_filler: bool = False,
162 cadence: Cadence | None = None,
163 ):
164 super().__init__(
165 allocation_request=allocation_request,
166 scans=scans,
167 is_proposer_modified=is_proposer_modified,
168 time_range_start=time_range_start,
169 time_range_end=time_range_end,
170 temporal_reference=temporal_reference,
171 is_requested_filler=is_requested_filler,
172 cadence=cadence or allocation_request.make_cadence(),
173 )
174 if allocation_request:
175 self.allocation_request_id = allocation_request.allocation_request_id
177 def clone(self, allocation_request: "AllocationRequest|None" = None) -> "ObservationSpecification":
178 os = ObservationSpecification(
179 allocation_request=allocation_request or self.allocation_request,
180 scans=[scan.clone(allocation_request or self.allocation_request) for scan in self.scans],
181 is_proposer_modified=self.is_proposer_modified,
182 time_range_start=self.time_range_start,
183 time_range_end=self.time_range_end,
184 temporal_reference=self.temporal_reference,
185 is_requested_filler=self.is_requested_filler,
186 cadence=self.cadence.clone(),
187 )
188 os.cadence.allocation_request = allocation_request if allocation_request else self.allocation_request
189 return os
191 def __eq__(self, other):
192 return isinstance(other, ObservationSpecification) and (
193 self.allocation_request,
194 self.is_proposer_modified,
195 self.cadence,
196 self.time_range_start,
197 self.time_range_end,
198 self.temporal_reference,
199 self.is_requested_filler,
200 ) == (
201 other.allocation_request,
202 other.is_proposer_modified,
203 other.cadence,
204 other.time_range_start,
205 other.time_range_end,
206 other.temporal_reference,
207 other.is_requested_filler,
208 )
210 @property
211 def facility(self) -> Facility | None:
212 return None if not self.allocation_request else self.allocation_request.facility
214 @override
215 def __json__(self) -> JSON_OBJECT:
216 # TODO: need to show time intervals for each scan.
217 # for this we will need the ordering from scan lists.
218 science_target_integration_times_json: list[JSON_OBJECT] = list()
219 for (
220 science_target,
221 integration_time,
222 ) in self.scans.science_target_integration_times.items():
223 science_target_json = science_target.__json__()
224 science_target_json["integrationTime"] = integration_time.value
225 science_target_integration_times_json.append(science_target_json)
226 observing_target_integration_times_json: list[JSON_OBJECT] = list()
227 for (
228 observing_target,
229 integration_time,
230 ) in self.scans.observing_target_integration_times.items():
231 observing_target_json = observing_target.__json__()
232 observing_target_json["integrationTime"] = integration_time.value
233 observing_target_integration_times_json.append(observing_target_json)
234 result: JSON_OBJECT = {
235 "observationSpecificationId": self.observation_specification_id,
236 "isProposerModified": self.is_proposer_modified,
237 "scans": [scan.__json__() for scan in self.scans],
238 "allocationRequestId": self.allocation_request_id,
239 "scienceTargetIntegrationTimes": science_target_integration_times_json,
240 "totalScienceTargetIntegrationTime": self.scans.total_science_target_integration_time.value,
241 "observingTargetIntegrationTimes": observing_target_integration_times_json,
242 "totalObservingTargetIntegrationTime": self.scans.total_observing_target_integration_time.value,
243 "totalDurationOfObservationSpecification": self.scans.total_duration.value,
244 "totalOverhead": self.scans.total_overhead.value,
245 "isRequestedFiller": self.is_requested_filler,
246 }
247 return result
250@dataclass
251class PeakFocusObservingInstruction(ABC):
252 """See the description of STT-628 for reference
253 NB: Only children that add new fields will need to also use the @dataclass decorator
254 """
256 off_source_intent: SubscanIntent # Get the canonical/persisted one from the db
257 scan_intents: ClassVar[list[ScanIntent]]
259 @abstractmethod
260 def generate_scans(
261 self,
262 initial_setup_time: Quantity,
263 observing_target: SkyCoord,
264 target_name: str,
265 ) -> list[Scan]:
266 raise NotImplementedError
269@dataclass
270class ObservingInstruction(ABC):
271 """See the description of STT-628 for reference
272 NB: Only children that add new fields will need to also use the @dataclass decorator
273 """
275 target_name: str
276 ra: Quantity
277 dec: Quantity
278 on_source_intent: SubscanIntent # Get the canonical/persisted one from the db
279 scan_intents: list[ScanIntent]
281 @abstractmethod
282 def generate_scans(
283 self,
284 initial_setup_time: Quantity,
285 facility: Facility | None = None,
286 acquisition_time_per_scan: Quantity | None = None,
287 ) -> list[Scan]:
288 """Generate Scan list for this OI
290 :param initial_setup_time: Slew time from previous target to this OI's
291 :return: List of Scans
292 """
293 raise NotImplementedError
296@dataclass
297class GBTOptionalCalibratorOI(ObservingInstruction):
298 """General-ish GBT ObservingInstruction class to be used for
299 Polarization, FluxDensity, and TestSource Calibrator OI's.
300 These OI's are only included in a CalibrationPlan if their corresponding CRP answers are True,
301 hence the name "OptionalCalibratorOI".
302 See the description of STT-441 for reference.
304 NOTE: Let the CalibrationStrategy service handle the hard-coded target (RA, DEC, name)
305 to allow for more flexibility in this class.
306 """
308 def generate_scans(
309 self,
310 initial_setup_time: Quantity,
311 facility: Facility | None = None,
312 acquisition_time_per_scan: Quantity | None = None,
313 ) -> list[Scan]:
314 subscans: list[Subscan] = []
315 SUBSCAN_COUNT = 2
316 for i in range(SUBSCAN_COUNT):
317 subscans.append(
318 Subscan(
319 i,
320 self.on_source_intent,
321 90 * u.s,
322 initial_setup_time if i == 0 else ZERO_SECONDS,
323 reference_target=ReferenceTarget(
324 source=Source(
325 name=self.target_name,
326 lat=self.dec.to_value(u.deg),
327 long=self.ra.to_value(u.deg),
328 ),
329 ),
330 ),
331 )
332 return [Scan(0, self.scan_intents, subscans, type(self).__name__)]
335@dataclass
336class VLACalibratorOI(ObservingInstruction):
337 """General VLA ObservingInstruction class to be used for Calibrator OI's
338 See the description of STT-616 for reference.
340 NOTE: Let the CalibrationStrategy service handle the hard-coded target (RA, DEC, name)
341 to allow for more flexibility in this class.
342 """
344 def generate_scans(
345 self,
346 initial_setup_time: Quantity,
347 facility: Facility | None = None,
348 acquisition_time_per_scan: Quantity | None = None,
349 ) -> list[Scan]:
350 acquisition_time = 180 * u.s
352 if len(self.scan_intents) == 0 and self.scan_intents[0].name == "CALIBRATE_POL_LEAKAGE":
353 acquisition_time = 300 * u.s
354 return [
355 Scan(
356 0,
357 self.scan_intents,
358 [
359 Subscan(
360 0,
361 self.on_source_intent,
362 acquisition_time,
363 initial_setup_time,
364 reference_target=ReferenceTarget(
365 source=Source(
366 name=self.target_name,
367 lat=self.dec.to_value(u.deg),
368 long=self.ra.to_value(u.deg),
369 ),
370 ),
371 ),
372 ],
373 type(self).__name__,
374 )
375 ]
378@dataclass
379class GBTScienceOI(ObservingInstruction):
380 """Observing Instruction for a GBT ScienceTarget
381 See the description of STT-441 for reference.
382 """
384 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["OBSERVE_TARGET"]]
385 total_acquisition_time: Quantity
387 def generate_scans(
388 self,
389 initial_setup_time: Quantity,
390 facility: Facility | None = None,
391 acquisition_time_per_scan: Quantity | None = None,
392 ) -> list[Scan]:
393 subscans: list[Subscan] = []
394 SUBSCAN_COUNT = 2
395 for i in range(SUBSCAN_COUNT):
396 subscans.append(
397 Subscan(
398 i,
399 self.on_source_intent,
400 max(1 * u.s, self.total_acquisition_time / SUBSCAN_COUNT),
401 initial_setup_time if i == 0 else ZERO_SECONDS,
402 reference_target=ReferenceTarget(
403 source=Source(
404 name=self.target_name,
405 lat=self.dec.to_value(u.deg),
406 long=self.ra.to_value(u.deg),
407 ),
408 ),
409 )
410 )
411 return [Scan(0, self.scan_intents, subscans, type(self).__name__)]
414@dataclass
415class VLAScienceTargetOI(ObservingInstruction):
416 """Observing Instruction for a VLA ScienceTarget
417 See the description of STT-616 for reference.
418 """
420 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["OBSERVE_TARGET"]]
421 total_acquisition_time: Quantity
423 def generate_scans(
424 self,
425 initial_setup_time: Quantity,
426 facility: Facility | None = None,
427 acquisition_time_per_scan: Quantity | None = None,
428 ) -> list[Scan]:
429 return [
430 Scan(
431 0,
432 self.scan_intents,
433 [
434 Subscan(
435 0,
436 self.on_source_intent,
437 acquisition_time_per_scan,
438 initial_setup_time,
439 reference_target=ReferenceTarget(
440 source=Source(
441 name=self.target_name,
442 lat=self.dec.to_value(u.deg),
443 long=self.ra.to_value(u.deg),
444 ),
445 ),
446 )
447 ],
448 type(self).__name__,
449 )
450 ]
453class PeakOI(PeakFocusObservingInstruction):
454 """See the description of STT-441 for reference"""
456 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["CALIBRATE_POINTING"]]
458 def generate_scans(
459 self,
460 initial_setup_time: Quantity,
461 observing_target: SkyCoord,
462 target_name: str,
463 ) -> list[Scan]:
464 subscans: list[Subscan] = []
465 for i in range(4):
466 subscans.append(
467 Subscan(
468 i,
469 self.off_source_intent,
470 30 * u.s,
471 initial_setup_time if i == 0 else ZERO_SECONDS,
472 reference_target=ReferenceTarget(
473 source=Source(
474 name=target_name,
475 lat=observing_target.dec.to_value(u.deg),
476 long=observing_target.ra.to_value(u.deg),
477 ),
478 ),
479 )
480 )
482 return [Scan(0, self.scan_intents, subscans, type(self).__name__)]
485class FocusOI(PeakFocusObservingInstruction):
486 """See the description of STT-441 for reference"""
488 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["CALIBRATE_FOCUS"]]
490 def generate_scans(
491 self,
492 initial_setup_time: Quantity,
493 observing_target: SkyCoord,
494 target_name: str,
495 ) -> list[Scan]:
496 return [
497 Scan(
498 0,
499 self.scan_intents,
500 [
501 Subscan(
502 0,
503 self.off_source_intent,
504 60 * u.s,
505 initial_setup_time,
506 reference_target=ReferenceTarget(
507 source=Source(
508 name=target_name,
509 lat=observing_target.dec.to_value(u.deg),
510 long=observing_target.ra.to_value(u.deg),
511 ),
512 ),
513 )
514 ],
515 type(self).__name__,
516 )
517 ]
520@dataclass
521class CalibrationPlan(ABC):
522 """this abstract class will be fleshed out when the GBT Calibration Plan is."""
524 oi_set: list[ObservingInstruction | PeakFocusObservingInstruction]
526 @abstractmethod
527 def generate_scan_list(self, facility: Facility) -> list[Scan]:
528 raise NotImplementedError
531class VLACalibrationPlan(CalibrationPlan):
532 def generate_scan_list(self, facility: Facility) -> list[Scan]:
533 initial_position = SkyCoord(ra=0 * Unit("degree"), dec=0 * Unit("degree")) # Initial condition of the array
534 # Should be neither or both of the following
535 pol_angle_ois = [oi for oi in self.oi_set if oi.scan_intents[0].name == "CALIBRATE_POL_ANGLE"]
536 pol_leakage_ois = [oi for oi in self.oi_set if oi.scan_intents[0].name == "CALIBRATE_POL_LEAKAGE"]
537 scan_list: list[Scan] = list()
538 flux_oi = [
539 oi for oi in self.oi_set if oi.scan_intents[0].name == "CALIBRATE_FLUX"
540 ].pop() # Has to be exactly 1 of these
541 scan_list.extend(
542 flux_oi.generate_scans(
543 calculate_slew_time_vla(initial_position, SkyCoord(ra=flux_oi.ra, dec=flux_oi.dec)) * u.s,
544 facility=facility,
545 )
546 )
548 if len(pol_angle_ois) == 1:
549 pa_oi = pol_angle_ois[0]
550 prev_subscan = scan_list[-1].subscans[-1]
551 prev_point = prev_subscan.sky_coord
552 scan_list.extend(
553 pa_oi.generate_scans(calculate_slew_time_vla(prev_point, SkyCoord(ra=pa_oi.ra, dec=pa_oi.dec)) * u.s)
554 )
555 if len(pol_leakage_ois) == 1:
556 pl_oi = pol_leakage_ois[0]
557 prev_subscan = scan_list[-1].subscans[-1]
558 prev_point = prev_subscan.sky_coord
559 scan_list.extend(
560 pl_oi.generate_scans(calculate_slew_time_vla(prev_point, SkyCoord(ra=pl_oi.ra, dec=pl_oi.dec)) * u.s)
561 )
563 st_ois: list[VLAScienceTargetOI] = [oi for oi in self.oi_set if isinstance(oi, VLAScienceTargetOI)]
564 pr_oi = [
565 oi
566 for oi in self.oi_set
567 if len(oi.scan_intents) == 2
568 and hardcoded_scan_intents["CALIBRATE_AMPLI"] in oi.scan_intents
569 and hardcoded_scan_intents["CALIBRATE_PHASE"] in oi.scan_intents
570 ].pop() # Has to be exactly 1 of these
571 pr_oi_point = SkyCoord(ra=pr_oi.ra, dec=pr_oi.dec)
573 # Limit the max number of Scans per ScienceTarget
574 # so that users don't have to scroll forever through their ObsSpecs,
575 # and they take a reasonable time to generate and load
576 MAX_SCANS_PER_SCIENCE_TARGET = 3
578 # total time > 90 s results in three scans with 1/3 total time each.
579 # total time < 1 s will result in a 1 s scan
580 at_per_scan_per_st = []
581 num_iterations_per_target = []
583 for st_oi in st_ois:
584 at_per_scan_per_st.append(
585 max(1 * u.s, st_oi.total_acquisition_time)
586 if st_oi.total_acquisition_time < 90 * u.s
587 else (1 / MAX_SCANS_PER_SCIENCE_TARGET * st_oi.total_acquisition_time)
588 )
589 num_iterations_per_target.append(1)
591 # A PR OI is required before the first ST OI, then after subsequent ST OIs
592 scan_list.extend(
593 pr_oi.generate_scans(
594 calculate_slew_time_vla(SkyCoord(ra=st_ois[0].ra, dec=st_ois[0].dec), pr_oi_point) * u.s,
595 facility,
596 )
597 )
598 for i, st_oi in enumerate(st_ois):
599 while (
600 num_iterations_per_target[i] == 1
601 or num_iterations_per_target[i] * at_per_scan_per_st[i] <= st_oi.total_acquisition_time
602 ):
603 prev_subscan = scan_list[-1].subscans[-1]
604 prev_point = prev_subscan.sky_coord
605 st_oi_point = SkyCoord(ra=st_oi.ra, dec=st_oi.dec)
606 scan_list.extend(
607 st_oi.generate_scans(
608 calculate_slew_time_vla(prev_point, st_oi_point) * u.s,
609 at_per_scan_per_st[i],
610 facility,
611 )
612 )
613 scan_list.extend(
614 pr_oi.generate_scans(
615 calculate_slew_time_vla(st_oi_point, pr_oi_point) * u.s,
616 facility,
617 )
618 )
619 num_iterations_per_target[i] += 1
621 bp_oi = [
622 oi for oi in self.oi_set if oi.scan_intents[0] == hardcoded_scan_intents["CALIBRATE_BANDPASS"]
623 ].pop() # Has to be exactly 1 of these
624 prev_subscan = scan_list[-1].subscans[-1]
625 prev_point = prev_subscan.sky_coord
626 scan_list.extend(
627 bp_oi.generate_scans(
628 calculate_slew_time_vla(prev_point, SkyCoord(ra=bp_oi.ra, dec=bp_oi.dec)) * u.s,
629 facility,
630 )
631 )
632 for i, scan in enumerate(scan_list):
633 scan.position_in_list = i
634 for k, subscan in enumerate(scan.subscans):
635 subscan.position_in_list = k
636 return scan_list
639class GBTCalibrationPlan(CalibrationPlan):
640 def generate_oi_list(
641 self,
642 ) -> list[ObservingInstruction | PeakFocusObservingInstruction]:
643 """order the OIs by:
644 1) Flux Density Calibrator if it exists
645 2)Test Source Calibrator if it exists
646 3)Polarization Calibrator if it exists
647 4)Peak Calibration OI
648 5)Focus Calibration OI
649 6)Science OIs by increasing RA
650 """
651 flux_cal = None
652 test_cal = None
653 pol_cal = None
654 peak_cal = None
655 focus_cal = None
656 science_cals = list()
657 ordered_list = list()
659 for oi in self.oi_set:
660 # I tried to do this check with isinstance(), but it was always false.
661 if "OptionalCalibratorOI" in str(oi.__class__):
662 scan_intent_names = [intent.name for intent in oi.scan_intents]
663 if "CALIBRATE_FLUX" in scan_intent_names:
664 flux_cal = oi
665 if "OBSERVE_TARGET" in scan_intent_names:
666 test_cal = oi
667 if "CALIBRATE_POL_LEAKAGE" in scan_intent_names:
668 pol_cal = oi
669 elif "PeakOI" in str(oi.__class__):
670 peak_cal = oi
671 elif "FocusOI" in str(oi.__class__):
672 focus_cal = oi
673 elif "ScienceOI" in str(oi.__class__):
674 science_cals.append(oi)
675 else:
676 raise TypeError(f"unknown OI type: {oi.__class__}")
678 # sort the science OIs by RA:
679 science_cals.sort(key=lambda soi: soi.ra)
681 if flux_cal is not None:
682 ordered_list.append(flux_cal)
683 if test_cal is not None:
684 ordered_list.append(test_cal)
685 if pol_cal is not None:
686 ordered_list.append(pol_cal)
687 if peak_cal is not None:
688 ordered_list.append(peak_cal)
689 if focus_cal is not None:
690 ordered_list.append(focus_cal)
691 if len(science_cals) > 0:
692 ordered_list.extend(science_cals)
694 return ordered_list
696 def generate_scan_list(self, facility: Facility) -> list[Scan]:
697 """
698 Call `generate_scans()` on the OI's in order
699 :return: A list of Scans corresponding to the OIs in the CalibrationPlan
700 """
701 scan_list: list[Scan] = list()
702 # We need a "0th" subscan for when a Peak or Focus OI is first in the list
703 prev_subscan = Subscan(0, None, 0 * u.s, 0 * u.s)
704 prev_point = SkyCoord(ra=0 * u.degree, dec=0 * u.degree)
705 for i, oi in enumerate(self.generate_oi_list()):
706 if isinstance(oi, PeakFocusObservingInstruction): # isinstance respects inheritance
707 if i > 0:
708 prev_subscan = scan_list[-1].subscans[-1]
709 prev_point = SkyCoord(ra=prev_point.right_ascension, dec=prev_point.declination)
710 # Setup time is 0s because Peak & Focus OI's always use the previos Subscan's target
711 scan_list.extend(
712 oi.generate_scans(
713 ZERO_SECONDS,
714 prev_point,
715 prev_subscan.name,
716 )
717 )
718 else:
719 next_point = SkyCoord(oi.ra, oi.dec)
720 slew_time = calculate_slew_time_gbt(prev_point, next_point)
721 scan_list.extend(oi.generate_scans(slew_time * ValidUnits.Time.value, facility))
722 prev_point = next_point
723 for i, scan in enumerate(scan_list):
724 scan.position_in_list = i
725 for k, subscan in enumerate(scan.subscans):
726 subscan.position_in_list = k
727 return scan_list