Coverage for middle_layer/propose/domain_layer/entities/observation_specification.py: 87.30%

252 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2026-03-09 06:13 +0000

1# Copyright 2024 Associated Universities, Inc. 

2# 

3# This file is part of Telescope Time Allocation Tools (TTAT). 

4# 

5# TTAT is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# any later version. 

9# 

10# TTAT is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with TTAT. If not, see <https://www.gnu.org/licenses/>. 

17# 

18# pyright: reportImportCycles=false 

19from abc import ABC, abstractmethod 

20from dataclasses import dataclass, field 

21from typing import TYPE_CHECKING, ClassVar, override 

22 

23import astropy.units as u 

24from astropy.coordinates import SkyCoord 

25from astropy.units import Quantity 

26from astropy.units.core import Unit 

27from sqlalchemy import Enum, ForeignKey 

28from sqlalchemy.orm import Mapped, mapped_column, relationship 

29 

30from allocate.domain_layer.entities.cadence import Cadence 

31from allocate.domain_layer.entities.temporal_reference import TemporalReference 

32from common import ZERO_SECONDS 

33from common.domain_layer import JSON_OBJECT, ValidUnits 

34from common.domain_layer.entities.base import Base 

35from propose.domain_layer.entities.reference_target import ReferenceTarget 

36from propose.domain_layer.entities.scan import ( 

37 Scan, 

38 ScanIntent, 

39 ScanList, 

40 Subscan, 

41 SubscanIntent, 

42 hardcoded_scan_intents, 

43) 

44from propose.domain_layer.entities.source import Source 

45from propose.domain_layer.services.slew_time_calculator_services import ( 

46 SlewTimeCalculator, 

47 calculate_slew_time_gbt, 

48 calculate_slew_time_vla, 

49 choose_slew_time_calculator, 

50) 

51from solicit.domain_layer.entities.capability import Facility 

52 

53if TYPE_CHECKING: 

54 from propose.domain_layer.entities.proposal import AllocationRequest 

55 

56time = u.get_physical_type("time") 

57 

58 

59class Schedulable(ABC): 

60 name: str 

61 field_source: SkyCoord 

62 time_on_source: float 

63 intent: ScanIntent 

64 # TODO: facility_configuration updates needed 

65 observation_configuration: dict[str, str] = field(default_factory=dict) 

66 slew_time_calculator: SlewTimeCalculator 

67 

68 

69@dataclass 

70class Calibrator(Schedulable): 

71 """Target used for Calibration""" 

72 

73 name: str 

74 field_source: SkyCoord 

75 time_on_source: float 

76 intent: ScanIntent 

77 slew_time_calculator: SlewTimeCalculator 

78 observation_configuration: dict[str, str] = field(default_factory=dict) 

79 

80 

81def __json__(self) -> JSON_OBJECT: 

82 return { 

83 "name": self.name, 

84 "rightAscension": str(self.field_source.ra.to_string(decimal=True, unit=Unit("degree"))), 

85 "declination": str(self.field_source.dec.to_string(decimal=True, unit=Unit("degree"))), 

86 "timeOnSource": self.time_on_source, 

87 "observationConfiguration": self.observation_configuration, 

88 "intent": self.intent.name, 

89 } 

90 

91 

92# the calibration strategy and obs spec generators are placeholders that will be filled in during future stories. 

93class CalibrationStrategy(ABC): 

94 @abstractmethod 

95 def get_calibrators(self, scan_intent_calibrate_ampli: ScanIntent) -> list: 

96 pass 

97 

98 

99class VLAContinuumCalibrationStrategy(CalibrationStrategy): 

100 def get_calibrators(self, scan_intent_calibrate_ampli: ScanIntent) -> list: 

101 return [ 

102 Calibrator( 

103 "our first calibrator", 

104 SkyCoord(ra=33 * Unit("degree"), dec=40.5 * Unit("degree")), 

105 1.2, 

106 scan_intent_calibrate_ampli, 

107 choose_slew_time_calculator(Facility("VLA")), 

108 {"vla_configuration": "B"}, 

109 ) 

110 ] 

111 

112 

113class GBTSpectralLineCalibrationStrategy(CalibrationStrategy): 

114 def get_calibrators(self, scan_intent_calibrate_ampli: ScanIntent) -> list: 

115 return [ 

116 Calibrator( 

117 "our first gbt calibrator", 

118 SkyCoord(ra=48 * Unit("degree"), dec=38.2 * Unit("degree")), 

119 1.7, 

120 scan_intent_calibrate_ampli, 

121 choose_slew_time_calculator(Facility("GBT")), 

122 ) 

123 ] 

124 

125 

126class ObservationSpecification(Base): 

127 __tablename__ = "observation_specifications" 

128 observation_specification_id: Mapped[int] = mapped_column(primary_key=True) 

129 allocation_request_id: Mapped[int] = mapped_column( 

130 ForeignKey("allocation_requests.allocation_request_id", ondelete="CASCADE"), 

131 ) 

132 allocation_request: Mapped["AllocationRequest"] = relationship(back_populates="observation_specifications") 

133 scans: Mapped[ScanList] = relationship( 

134 Scan, 

135 back_populates="observation_specification", 

136 cascade="all, delete", 

137 order_by=lambda: Scan.scan_id, 

138 collection_class=ScanList, 

139 passive_deletes=True, 

140 ) 

141 is_proposer_modified: Mapped[bool] 

142 time_range_start: Mapped[float] = mapped_column(default=0.0) 

143 time_range_end: Mapped[float] = mapped_column(default=24.0) 

144 temporal_reference: Mapped[TemporalReference] = mapped_column( 

145 Enum(TemporalReference, native_enum=False), 

146 ForeignKey("temporal_references.name"), 

147 nullable=False, 

148 ) # field(default=TemporalReference.LST) 

149 is_requested_filler: Mapped[bool] 

150 cadence_id: Mapped[int] = mapped_column(ForeignKey("cadences.cadence_id")) 

151 cadence = relationship(Cadence, cascade="all, delete", uselist=False, foreign_keys=[cadence_id]) 

152 

153 def __init__( 

154 self, 

155 allocation_request: "AllocationRequest", 

156 scans: list[Scan], 

157 is_proposer_modified: bool = False, 

158 time_range_start: float = 0.0, 

159 time_range_end: float = 24.0, 

160 temporal_reference: TemporalReference = TemporalReference.LST, 

161 is_requested_filler: bool = False, 

162 cadence: Cadence | None = None, 

163 ): 

164 super().__init__( 

165 allocation_request=allocation_request, 

166 scans=scans, 

167 is_proposer_modified=is_proposer_modified, 

168 time_range_start=time_range_start, 

169 time_range_end=time_range_end, 

170 temporal_reference=temporal_reference, 

171 is_requested_filler=is_requested_filler, 

172 cadence=cadence or allocation_request.make_cadence(), 

173 ) 

174 if allocation_request: 

175 self.allocation_request_id = allocation_request.allocation_request_id 

176 

177 def clone(self, allocation_request: "AllocationRequest|None" = None) -> "ObservationSpecification": 

178 os = ObservationSpecification( 

179 allocation_request=allocation_request or self.allocation_request, 

180 scans=[scan.clone(allocation_request or self.allocation_request) for scan in self.scans], 

181 is_proposer_modified=self.is_proposer_modified, 

182 time_range_start=self.time_range_start, 

183 time_range_end=self.time_range_end, 

184 temporal_reference=self.temporal_reference, 

185 is_requested_filler=self.is_requested_filler, 

186 cadence=self.cadence.clone(), 

187 ) 

188 os.cadence.allocation_request = allocation_request if allocation_request else self.allocation_request 

189 return os 

190 

191 def __eq__(self, other): 

192 return isinstance(other, ObservationSpecification) and ( 

193 self.allocation_request, 

194 self.is_proposer_modified, 

195 self.cadence, 

196 self.time_range_start, 

197 self.time_range_end, 

198 self.temporal_reference, 

199 self.is_requested_filler, 

200 ) == ( 

201 other.allocation_request, 

202 other.is_proposer_modified, 

203 other.cadence, 

204 other.time_range_start, 

205 other.time_range_end, 

206 other.temporal_reference, 

207 other.is_requested_filler, 

208 ) 

209 

210 @property 

211 def facility(self) -> Facility | None: 

212 return None if not self.allocation_request else self.allocation_request.facility 

213 

214 @override 

215 def __json__(self) -> JSON_OBJECT: 

216 # TODO: need to show time intervals for each scan. 

217 # for this we will need the ordering from scan lists. 

218 science_target_integration_times_json: list[JSON_OBJECT] = list() 

219 for ( 

220 science_target, 

221 integration_time, 

222 ) in self.scans.science_target_integration_times.items(): 

223 science_target_json = science_target.__json__() 

224 science_target_json["integrationTime"] = integration_time.value 

225 science_target_integration_times_json.append(science_target_json) 

226 observing_target_integration_times_json: list[JSON_OBJECT] = list() 

227 for ( 

228 observing_target, 

229 integration_time, 

230 ) in self.scans.observing_target_integration_times.items(): 

231 observing_target_json = observing_target.__json__() 

232 observing_target_json["integrationTime"] = integration_time.value 

233 observing_target_integration_times_json.append(observing_target_json) 

234 result: JSON_OBJECT = { 

235 "observationSpecificationId": self.observation_specification_id, 

236 "isProposerModified": self.is_proposer_modified, 

237 "scans": [scan.__json__() for scan in self.scans], 

238 "allocationRequestId": self.allocation_request_id, 

239 "scienceTargetIntegrationTimes": science_target_integration_times_json, 

240 "totalScienceTargetIntegrationTime": self.scans.total_science_target_integration_time.value, 

241 "observingTargetIntegrationTimes": observing_target_integration_times_json, 

242 "totalObservingTargetIntegrationTime": self.scans.total_observing_target_integration_time.value, 

243 "totalDurationOfObservationSpecification": self.scans.total_duration.value, 

244 "totalOverhead": self.scans.total_overhead.value, 

245 "isRequestedFiller": self.is_requested_filler, 

246 } 

247 return result 

248 

249 

250@dataclass 

251class PeakFocusObservingInstruction(ABC): 

252 """See the description of STT-628 for reference 

253 NB: Only children that add new fields will need to also use the @dataclass decorator 

254 """ 

255 

256 off_source_intent: SubscanIntent # Get the canonical/persisted one from the db 

257 scan_intents: ClassVar[list[ScanIntent]] 

258 

259 @abstractmethod 

260 def generate_scans( 

261 self, 

262 initial_setup_time: Quantity, 

263 observing_target: SkyCoord, 

264 target_name: str, 

265 ) -> list[Scan]: 

266 raise NotImplementedError 

267 

268 

269@dataclass 

270class ObservingInstruction(ABC): 

271 """See the description of STT-628 for reference 

272 NB: Only children that add new fields will need to also use the @dataclass decorator 

273 """ 

274 

275 target_name: str 

276 ra: Quantity 

277 dec: Quantity 

278 on_source_intent: SubscanIntent # Get the canonical/persisted one from the db 

279 scan_intents: list[ScanIntent] 

280 

281 @abstractmethod 

282 def generate_scans( 

283 self, 

284 initial_setup_time: Quantity, 

285 facility: Facility | None = None, 

286 acquisition_time_per_scan: Quantity | None = None, 

287 ) -> list[Scan]: 

288 """Generate Scan list for this OI 

289 

290 :param initial_setup_time: Slew time from previous target to this OI's 

291 :return: List of Scans 

292 """ 

293 raise NotImplementedError 

294 

295 

296@dataclass 

297class GBTOptionalCalibratorOI(ObservingInstruction): 

298 """General-ish GBT ObservingInstruction class to be used for 

299 Polarization, FluxDensity, and TestSource Calibrator OI's. 

300 These OI's are only included in a CalibrationPlan if their corresponding CRP answers are True, 

301 hence the name "OptionalCalibratorOI". 

302 See the description of STT-441 for reference. 

303 

304 NOTE: Let the CalibrationStrategy service handle the hard-coded target (RA, DEC, name) 

305 to allow for more flexibility in this class. 

306 """ 

307 

308 def generate_scans( 

309 self, 

310 initial_setup_time: Quantity, 

311 facility: Facility | None = None, 

312 acquisition_time_per_scan: Quantity | None = None, 

313 ) -> list[Scan]: 

314 subscans: list[Subscan] = [] 

315 SUBSCAN_COUNT = 2 

316 for i in range(SUBSCAN_COUNT): 

317 subscans.append( 

318 Subscan( 

319 i, 

320 self.on_source_intent, 

321 90 * u.s, 

322 initial_setup_time if i == 0 else ZERO_SECONDS, 

323 reference_target=ReferenceTarget( 

324 source=Source( 

325 name=self.target_name, 

326 lat=self.dec.to_value(u.deg), 

327 long=self.ra.to_value(u.deg), 

328 ), 

329 ), 

330 ), 

331 ) 

332 return [Scan(0, self.scan_intents, subscans, type(self).__name__)] 

333 

334 

335@dataclass 

336class VLACalibratorOI(ObservingInstruction): 

337 """General VLA ObservingInstruction class to be used for Calibrator OI's 

338 See the description of STT-616 for reference. 

339 

340 NOTE: Let the CalibrationStrategy service handle the hard-coded target (RA, DEC, name) 

341 to allow for more flexibility in this class. 

342 """ 

343 

344 def generate_scans( 

345 self, 

346 initial_setup_time: Quantity, 

347 facility: Facility | None = None, 

348 acquisition_time_per_scan: Quantity | None = None, 

349 ) -> list[Scan]: 

350 acquisition_time = 180 * u.s 

351 

352 if len(self.scan_intents) == 0 and self.scan_intents[0].name == "CALIBRATE_POL_LEAKAGE": 

353 acquisition_time = 300 * u.s 

354 return [ 

355 Scan( 

356 0, 

357 self.scan_intents, 

358 [ 

359 Subscan( 

360 0, 

361 self.on_source_intent, 

362 acquisition_time, 

363 initial_setup_time, 

364 reference_target=ReferenceTarget( 

365 source=Source( 

366 name=self.target_name, 

367 lat=self.dec.to_value(u.deg), 

368 long=self.ra.to_value(u.deg), 

369 ), 

370 ), 

371 ), 

372 ], 

373 type(self).__name__, 

374 ) 

375 ] 

376 

377 

378@dataclass 

379class GBTScienceOI(ObservingInstruction): 

380 """Observing Instruction for a GBT ScienceTarget 

381 See the description of STT-441 for reference. 

382 """ 

383 

384 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["OBSERVE_TARGET"]] 

385 total_acquisition_time: Quantity 

386 

387 def generate_scans( 

388 self, 

389 initial_setup_time: Quantity, 

390 facility: Facility | None = None, 

391 acquisition_time_per_scan: Quantity | None = None, 

392 ) -> list[Scan]: 

393 subscans: list[Subscan] = [] 

394 SUBSCAN_COUNT = 2 

395 for i in range(SUBSCAN_COUNT): 

396 subscans.append( 

397 Subscan( 

398 i, 

399 self.on_source_intent, 

400 max(1 * u.s, self.total_acquisition_time / SUBSCAN_COUNT), 

401 initial_setup_time if i == 0 else ZERO_SECONDS, 

402 reference_target=ReferenceTarget( 

403 source=Source( 

404 name=self.target_name, 

405 lat=self.dec.to_value(u.deg), 

406 long=self.ra.to_value(u.deg), 

407 ), 

408 ), 

409 ) 

410 ) 

411 return [Scan(0, self.scan_intents, subscans, type(self).__name__)] 

412 

413 

414@dataclass 

415class VLAScienceTargetOI(ObservingInstruction): 

416 """Observing Instruction for a VLA ScienceTarget 

417 See the description of STT-616 for reference. 

418 """ 

419 

420 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["OBSERVE_TARGET"]] 

421 total_acquisition_time: Quantity 

422 

423 def generate_scans( 

424 self, 

425 initial_setup_time: Quantity, 

426 facility: Facility | None = None, 

427 acquisition_time_per_scan: Quantity | None = None, 

428 ) -> list[Scan]: 

429 return [ 

430 Scan( 

431 0, 

432 self.scan_intents, 

433 [ 

434 Subscan( 

435 0, 

436 self.on_source_intent, 

437 acquisition_time_per_scan, 

438 initial_setup_time, 

439 reference_target=ReferenceTarget( 

440 source=Source( 

441 name=self.target_name, 

442 lat=self.dec.to_value(u.deg), 

443 long=self.ra.to_value(u.deg), 

444 ), 

445 ), 

446 ) 

447 ], 

448 type(self).__name__, 

449 ) 

450 ] 

451 

452 

453class PeakOI(PeakFocusObservingInstruction): 

454 """See the description of STT-441 for reference""" 

455 

456 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["CALIBRATE_POINTING"]] 

457 

458 def generate_scans( 

459 self, 

460 initial_setup_time: Quantity, 

461 observing_target: SkyCoord, 

462 target_name: str, 

463 ) -> list[Scan]: 

464 subscans: list[Subscan] = [] 

465 for i in range(4): 

466 subscans.append( 

467 Subscan( 

468 i, 

469 self.off_source_intent, 

470 30 * u.s, 

471 initial_setup_time if i == 0 else ZERO_SECONDS, 

472 reference_target=ReferenceTarget( 

473 source=Source( 

474 name=target_name, 

475 lat=observing_target.dec.to_value(u.deg), 

476 long=observing_target.ra.to_value(u.deg), 

477 ), 

478 ), 

479 ) 

480 ) 

481 

482 return [Scan(0, self.scan_intents, subscans, type(self).__name__)] 

483 

484 

485class FocusOI(PeakFocusObservingInstruction): 

486 """See the description of STT-441 for reference""" 

487 

488 scan_intents: ClassVar[list[ScanIntent]] = [hardcoded_scan_intents["CALIBRATE_FOCUS"]] 

489 

490 def generate_scans( 

491 self, 

492 initial_setup_time: Quantity, 

493 observing_target: SkyCoord, 

494 target_name: str, 

495 ) -> list[Scan]: 

496 return [ 

497 Scan( 

498 0, 

499 self.scan_intents, 

500 [ 

501 Subscan( 

502 0, 

503 self.off_source_intent, 

504 60 * u.s, 

505 initial_setup_time, 

506 reference_target=ReferenceTarget( 

507 source=Source( 

508 name=target_name, 

509 lat=observing_target.dec.to_value(u.deg), 

510 long=observing_target.ra.to_value(u.deg), 

511 ), 

512 ), 

513 ) 

514 ], 

515 type(self).__name__, 

516 ) 

517 ] 

518 

519 

520@dataclass 

521class CalibrationPlan(ABC): 

522 """this abstract class will be fleshed out when the GBT Calibration Plan is.""" 

523 

524 oi_set: list[ObservingInstruction | PeakFocusObservingInstruction] 

525 

526 @abstractmethod 

527 def generate_scan_list(self, facility: Facility) -> list[Scan]: 

528 raise NotImplementedError 

529 

530 

531class VLACalibrationPlan(CalibrationPlan): 

532 def generate_scan_list(self, facility: Facility) -> list[Scan]: 

533 initial_position = SkyCoord(ra=0 * Unit("degree"), dec=0 * Unit("degree")) # Initial condition of the array 

534 # Should be neither or both of the following 

535 pol_angle_ois = [oi for oi in self.oi_set if oi.scan_intents[0].name == "CALIBRATE_POL_ANGLE"] 

536 pol_leakage_ois = [oi for oi in self.oi_set if oi.scan_intents[0].name == "CALIBRATE_POL_LEAKAGE"] 

537 scan_list: list[Scan] = list() 

538 flux_oi = [ 

539 oi for oi in self.oi_set if oi.scan_intents[0].name == "CALIBRATE_FLUX" 

540 ].pop() # Has to be exactly 1 of these 

541 scan_list.extend( 

542 flux_oi.generate_scans( 

543 calculate_slew_time_vla(initial_position, SkyCoord(ra=flux_oi.ra, dec=flux_oi.dec)) * u.s, 

544 facility=facility, 

545 ) 

546 ) 

547 

548 if len(pol_angle_ois) == 1: 

549 pa_oi = pol_angle_ois[0] 

550 prev_subscan = scan_list[-1].subscans[-1] 

551 prev_point = prev_subscan.sky_coord 

552 scan_list.extend( 

553 pa_oi.generate_scans(calculate_slew_time_vla(prev_point, SkyCoord(ra=pa_oi.ra, dec=pa_oi.dec)) * u.s) 

554 ) 

555 if len(pol_leakage_ois) == 1: 

556 pl_oi = pol_leakage_ois[0] 

557 prev_subscan = scan_list[-1].subscans[-1] 

558 prev_point = prev_subscan.sky_coord 

559 scan_list.extend( 

560 pl_oi.generate_scans(calculate_slew_time_vla(prev_point, SkyCoord(ra=pl_oi.ra, dec=pl_oi.dec)) * u.s) 

561 ) 

562 

563 st_ois: list[VLAScienceTargetOI] = [oi for oi in self.oi_set if isinstance(oi, VLAScienceTargetOI)] 

564 pr_oi = [ 

565 oi 

566 for oi in self.oi_set 

567 if len(oi.scan_intents) == 2 

568 and hardcoded_scan_intents["CALIBRATE_AMPLI"] in oi.scan_intents 

569 and hardcoded_scan_intents["CALIBRATE_PHASE"] in oi.scan_intents 

570 ].pop() # Has to be exactly 1 of these 

571 pr_oi_point = SkyCoord(ra=pr_oi.ra, dec=pr_oi.dec) 

572 

573 # Limit the max number of Scans per ScienceTarget 

574 # so that users don't have to scroll forever through their ObsSpecs, 

575 # and they take a reasonable time to generate and load 

576 MAX_SCANS_PER_SCIENCE_TARGET = 3 

577 

578 # total time > 90 s results in three scans with 1/3 total time each. 

579 # total time < 1 s will result in a 1 s scan 

580 at_per_scan_per_st = [] 

581 num_iterations_per_target = [] 

582 

583 for st_oi in st_ois: 

584 at_per_scan_per_st.append( 

585 max(1 * u.s, st_oi.total_acquisition_time) 

586 if st_oi.total_acquisition_time < 90 * u.s 

587 else (1 / MAX_SCANS_PER_SCIENCE_TARGET * st_oi.total_acquisition_time) 

588 ) 

589 num_iterations_per_target.append(1) 

590 

591 # A PR OI is required before the first ST OI, then after subsequent ST OIs 

592 scan_list.extend( 

593 pr_oi.generate_scans( 

594 calculate_slew_time_vla(SkyCoord(ra=st_ois[0].ra, dec=st_ois[0].dec), pr_oi_point) * u.s, 

595 facility, 

596 ) 

597 ) 

598 for i, st_oi in enumerate(st_ois): 

599 while ( 

600 num_iterations_per_target[i] == 1 

601 or num_iterations_per_target[i] * at_per_scan_per_st[i] <= st_oi.total_acquisition_time 

602 ): 

603 prev_subscan = scan_list[-1].subscans[-1] 

604 prev_point = prev_subscan.sky_coord 

605 st_oi_point = SkyCoord(ra=st_oi.ra, dec=st_oi.dec) 

606 scan_list.extend( 

607 st_oi.generate_scans( 

608 calculate_slew_time_vla(prev_point, st_oi_point) * u.s, 

609 at_per_scan_per_st[i], 

610 facility, 

611 ) 

612 ) 

613 scan_list.extend( 

614 pr_oi.generate_scans( 

615 calculate_slew_time_vla(st_oi_point, pr_oi_point) * u.s, 

616 facility, 

617 ) 

618 ) 

619 num_iterations_per_target[i] += 1 

620 

621 bp_oi = [ 

622 oi for oi in self.oi_set if oi.scan_intents[0] == hardcoded_scan_intents["CALIBRATE_BANDPASS"] 

623 ].pop() # Has to be exactly 1 of these 

624 prev_subscan = scan_list[-1].subscans[-1] 

625 prev_point = prev_subscan.sky_coord 

626 scan_list.extend( 

627 bp_oi.generate_scans( 

628 calculate_slew_time_vla(prev_point, SkyCoord(ra=bp_oi.ra, dec=bp_oi.dec)) * u.s, 

629 facility, 

630 ) 

631 ) 

632 for i, scan in enumerate(scan_list): 

633 scan.position_in_list = i 

634 for k, subscan in enumerate(scan.subscans): 

635 subscan.position_in_list = k 

636 return scan_list 

637 

638 

639class GBTCalibrationPlan(CalibrationPlan): 

640 def generate_oi_list( 

641 self, 

642 ) -> list[ObservingInstruction | PeakFocusObservingInstruction]: 

643 """order the OIs by: 

644 1) Flux Density Calibrator if it exists 

645 2)Test Source Calibrator if it exists 

646 3)Polarization Calibrator if it exists 

647 4)Peak Calibration OI 

648 5)Focus Calibration OI 

649 6)Science OIs by increasing RA 

650 """ 

651 flux_cal = None 

652 test_cal = None 

653 pol_cal = None 

654 peak_cal = None 

655 focus_cal = None 

656 science_cals = list() 

657 ordered_list = list() 

658 

659 for oi in self.oi_set: 

660 # I tried to do this check with isinstance(), but it was always false. 

661 if "OptionalCalibratorOI" in str(oi.__class__): 

662 scan_intent_names = [intent.name for intent in oi.scan_intents] 

663 if "CALIBRATE_FLUX" in scan_intent_names: 

664 flux_cal = oi 

665 if "OBSERVE_TARGET" in scan_intent_names: 

666 test_cal = oi 

667 if "CALIBRATE_POL_LEAKAGE" in scan_intent_names: 

668 pol_cal = oi 

669 elif "PeakOI" in str(oi.__class__): 

670 peak_cal = oi 

671 elif "FocusOI" in str(oi.__class__): 

672 focus_cal = oi 

673 elif "ScienceOI" in str(oi.__class__): 

674 science_cals.append(oi) 

675 else: 

676 raise TypeError(f"unknown OI type: {oi.__class__}") 

677 

678 # sort the science OIs by RA: 

679 science_cals.sort(key=lambda soi: soi.ra) 

680 

681 if flux_cal is not None: 

682 ordered_list.append(flux_cal) 

683 if test_cal is not None: 

684 ordered_list.append(test_cal) 

685 if pol_cal is not None: 

686 ordered_list.append(pol_cal) 

687 if peak_cal is not None: 

688 ordered_list.append(peak_cal) 

689 if focus_cal is not None: 

690 ordered_list.append(focus_cal) 

691 if len(science_cals) > 0: 

692 ordered_list.extend(science_cals) 

693 

694 return ordered_list 

695 

696 def generate_scan_list(self, facility: Facility) -> list[Scan]: 

697 """ 

698 Call `generate_scans()` on the OI's in order 

699 :return: A list of Scans corresponding to the OIs in the CalibrationPlan 

700 """ 

701 scan_list: list[Scan] = list() 

702 # We need a "0th" subscan for when a Peak or Focus OI is first in the list 

703 prev_subscan = Subscan(0, None, 0 * u.s, 0 * u.s) 

704 prev_point = SkyCoord(ra=0 * u.degree, dec=0 * u.degree) 

705 for i, oi in enumerate(self.generate_oi_list()): 

706 if isinstance(oi, PeakFocusObservingInstruction): # isinstance respects inheritance 

707 if i > 0: 

708 prev_subscan = scan_list[-1].subscans[-1] 

709 prev_point = SkyCoord(ra=prev_point.right_ascension, dec=prev_point.declination) 

710 # Setup time is 0s because Peak & Focus OI's always use the previos Subscan's target 

711 scan_list.extend( 

712 oi.generate_scans( 

713 ZERO_SECONDS, 

714 prev_point, 

715 prev_subscan.name, 

716 ) 

717 ) 

718 else: 

719 next_point = SkyCoord(oi.ra, oi.dec) 

720 slew_time = calculate_slew_time_gbt(prev_point, next_point) 

721 scan_list.extend(oi.generate_scans(slew_time * ValidUnits.Time.value, facility)) 

722 prev_point = next_point 

723 for i, scan in enumerate(scan_list): 

724 scan.position_in_list = i 

725 for k, subscan in enumerate(scan.subscans): 

726 subscan.position_in_list = k 

727 return scan_list