Coverage for middle_layer/testdata/application_layer/services/allocation_request_generator.py: 92.11%
279 statements
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
1# Copyright 2023 Associated Universities, Inc.
2#
3# This file is part of Telescope Time Allocation Tools (TTAT).
4#
5# TTAT is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# any later version.
9#
10# TTAT is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with TTAT. If not, see <https://www.gnu.org/licenses/>.
17import json
18from copy import deepcopy
19from datetime import datetime, time, timedelta
20from random import choice, getrandbits, randint, random
22import astropy.units as u
24from allocate.domain_layer.entities.cadence import Cadence
25from common import get_middle_layer
26from common.application_layer.orm_repositories.orm_repository import ORMRepository
27from common.test_helpers import random_future_date, random_time
28from propose.domain_layer.entities.allocation_request_fixed_date import AllocationRequestFixedDate
29from propose.domain_layer.entities.calibration_parameter import CalibrationParameter
30from propose.domain_layer.entities.field_source import FieldSource
31from propose.domain_layer.entities.hardware_configuration import HardwareConfiguration
32from propose.domain_layer.entities.performance_parameter import PerformanceParameter
33from propose.domain_layer.entities.proposal import AllocationRequest, CapabilityRequest, Proposal, RequestType
34from propose.domain_layer.entities.reference_target import ReferenceTarget
35from propose.domain_layer.entities.scan import ScanIntent, SubscanIntent, hardcoded_scan_intents
36from propose.domain_layer.entities.science_target import ScienceTarget
37from propose.domain_layer.entities.source import (
38 CalibrationStrategy,
39 PointingPattern,
40 SchedulingStrategy,
41 Source,
42)
43from propose.domain_layer.entities.spectral_specification import SpectralSpecification
44from propose.domain_layer.services.observation_specification_generator_service import (
45 generate_observation_specifications,
46)
47from propose.domain_layer.services.reference_target_generator_service import generate_reference_targets
48from propose.domain_layer.services.science_target_list_generator_service import generate_science_target_list
49from solicit.domain_layer.entities.array_configuration import ArrayConfigurationConfiguration
50from solicit.domain_layer.entities.backend import BackendConfiguration
51from solicit.domain_layer.entities.external_joint_parameter import ExternalJointParameter
52from solicit.domain_layer.entities.frontend import FrontendConfiguration
53from solicit.domain_layer.entities.solicitation_facility_capability import SolicitationFacilityCapability
56def generate_allocation_requests(
57 repo: ORMRepository,
58 proposal: Proposal,
59 test_type: str,
60 make_obspecs: bool,
61 do_submit: bool,
62 all_scan_intents: dict[str, ScanIntent],
63 all_subscan_intents: dict[str, SubscanIntent],
64 calibrator_list: dict,
65 sfcs: list[SolicitationFacilityCapability],
66):
67 if test_type == "Deterministic":
68 generate_deterministic_allocation_requests(
69 repo, proposal, make_obspecs, do_submit, all_scan_intents, all_subscan_intents, calibrator_list, sfcs
70 )
71 else:
72 generate_test_demo_allocation_requests(
73 repo,
74 proposal,
75 test_type,
76 make_obspecs,
77 do_submit,
78 all_scan_intents,
79 all_subscan_intents,
80 calibrator_list,
81 sfcs,
82 )
85def generate_deterministic_allocation_requests(
86 repo: ORMRepository,
87 proposal: Proposal,
88 make_obspecs: bool,
89 do_submit: bool,
90 all_scan_intents: dict[str, ScanIntent],
91 all_subscan_intents: dict[str, SubscanIntent],
92 calibrator_list: dict,
93 sfcs: list[SolicitationFacilityCapability],
94):
95 # Generate ARs, CRs, and obspecs
96 proposal_num = int(proposal.author_copy.title[19])
97 for i in range(0, 2): # make exactly 2 ARs per proposal
98 # choose an sfc
99 sfc = sfcs[0] # TODO: exercise all capabilities/facilities?
100 ar = AllocationRequest(
101 allocation_request_name=f"Deterministic Allocation Request {i}",
102 facility=sfc.facility,
103 proposal_copy=proposal.author_copy,
104 )
105 # No need to manually append this AR - the constructor handles this
107 # let's choose a request type
108 if proposal_num % 4 == 0 and i == 0:
109 ar.request_type = RequestType.TRIGGERED
110 elif proposal_num % 4 == 1:
111 ar.request_type = RequestType.DYNAMIC
112 elif proposal_num % 4 == 2:
113 ar.request_type = RequestType.FIXED_DATE
114 else:
115 ar.request_type = RequestType.MONITOR
117 match ar.request_type:
118 case RequestType.FIXED_DATE:
119 ar.fixed_dates = [
120 AllocationRequestFixedDate(fixed_date=date)
121 for date in [datetime.now() + timedelta(days=30), datetime.now() + timedelta(days=40)]
122 ]
124 case RequestType.TRIGGERED:
125 ar.trigger_event = "ligo detection"
126 ar.trigger_expiry_date = datetime.now() + timedelta(days=30)
127 # Seed five alternative sources (not added to ar.sources)
128 for idx in range(5):
129 src = Source(
130 allocation_request=ar,
131 name=f"Alt-{i}-{idx}",
132 lat=randint(-90, 90) * 1.0, # Just something simple with a single decimal place
133 long=randint(-180, 180) * 1.0,
134 )
135 ar.alternative_sources.append(src)
137 case RequestType.MONITOR:
138 future_dates = sorted(
139 [
140 datetime.now() + timedelta(days=30),
141 datetime.now() + timedelta(days=40),
142 datetime.now() + timedelta(days=50),
143 ]
144 )
146 ar.monitoring_earliest_start_date = future_dates[0]
147 ar.monitoring_latest_start_date = future_dates[1]
148 ar.monitoring_latest_end_date = future_dates[2]
149 ar.monitoring_date_tolerance = randint(0, 4)
151 # TODO: Cadences are unstable on this entity for the time being
152 # cadence_metacount = randint(1, 3)
153 # ar.monitoring_cadence = Cadence(repeat_counts=[randint(1,10) for _ in range(cadence_metacount)],
154 # deltas=[randint(1,24) for _ in range(cadence_metacount)],
155 # tolerances=[randint(1,3) for _ in range(cadence_metacount)])
156 # ar.monitoring_cadence_tolerance = randint(1, 10) # ???
157 # ar.monitoring_repeat_count = randint(1, 10)
159 # choose some random times
160 ar.earliest_start_time = time(hour=8, minute=9, second=10)
161 ar.latest_start_time = time(hour=11, minute=12, second=13)
162 ar.latest_end_time = time(hour=14, minute=15, second=16)
164 ar.night_time_only = False
165 ar.commensal = False
166 if do_submit:
167 ar_copy = ar.clone()
168 proposal.observatory_copy.allocation_requests.append(ar_copy)
169 else:
170 ar_copy = None
172 for j in range(0, 4):
173 generate_deterministic_capability_request(
174 session=repo.session,
175 ar=ar,
176 sfc=sfc,
177 ar_copy=ar_copy,
178 capability_request_name=f"Deterministic Capability Request {j}",
179 calibrator_list=calibrator_list,
180 )
182 if make_obspecs:
183 generate_science_target_list(ar)
184 generate_reference_targets(ar)
186 if do_submit and ar:
187 ar.science_targets.extend([st.clone(ar) for st in ar.science_targets])
188 ar.reference_targets.extend([rt.clone() for rt in ar.reference_targets])
190 obspecs = generate_observation_specifications(ar)
191 for os in obspecs:
192 os.is_requested_filler = False # 0% chance of filler
193 os.is_proposer_modified = False # 0% chance of proposer modified
195 if do_submit and ar_copy:
196 obspecs_copy = [obspec.clone(ar_copy) for obspec in obspecs]
197 for orig, copy in zip(obspecs, obspecs_copy):
198 copy.is_requested_filler = orig.is_requested_filler
199 copy.is_proposer_modified = orig.is_proposer_modified
201 else:
202 obspecs_copy = None
204 for observation_specifications in (obspecs, obspecs_copy):
205 if observation_specifications:
206 for i, _ in enumerate(observation_specifications):
207 scans = observation_specifications[i].scans
208 for j, _ in enumerate(scans):
209 scans[j].scan_intents = [
210 all_scan_intents[scan_intent.name] for scan_intent in scans[j].scan_intents
211 ]
212 subscans = scans[j].subscans
213 for k, _ in enumerate(subscans):
214 subscans[k].subscan_intent = all_subscan_intents[subscans[k].subscan_intent.name]
215 ar.observation_specifications = obspecs
216 if do_submit:
217 ar_copy.observation_specifications = obspecs_copy
220def generate_test_demo_allocation_requests(
221 repo: ORMRepository,
222 proposal: Proposal,
223 test_type: str,
224 make_obspecs: bool,
225 do_submit: bool,
226 all_scan_intents: dict[str, ScanIntent],
227 all_subscan_intents: dict[str, SubscanIntent],
228 calibrator_list: dict,
229 sfcs: list[SolicitationFacilityCapability],
230):
231 # Generate ARs, CRs, and obspecs
232 for i in range(0, randint(1, 3)):
233 # choose an sfc
234 sfc = choice(sfcs)
235 ar = AllocationRequest(
236 allocation_request_name=f"{test_type} Allocation Request {i}",
237 facility=sfc.facility,
238 proposal_copy=proposal.author_copy,
239 )
240 # No need to manually append this AR - the constructor handles this
242 # let's choose a request type
243 ar.request_type = choice(list(RequestType))
245 match ar.request_type:
246 case RequestType.FIXED_DATE:
247 ar.fixed_dates = [
248 AllocationRequestFixedDate(fixed_date=date)
249 for date in [random_future_date() for _ in range(choice([1, 2]))]
250 ]
252 case RequestType.TRIGGERED:
253 ar.trigger_event = "ligo detection"
254 ar.trigger_expiry_date = random_future_date()
255 # Seed five alternative sources (not added to ar.sources)
256 for idx in range(5):
257 src = Source(
258 allocation_request=ar,
259 name=f"Alt-{i}-{idx}",
260 lat=randint(-90, 90) * 1.0, # Just something simple with a single decimal place
261 long=randint(-180, 180) * 1.0,
262 )
263 ar.alternative_sources.append(src)
265 case RequestType.MONITOR:
266 future_dates = sorted([random_future_date() for _ in range(3)])
268 ar.monitoring_earliest_start_date = future_dates[0]
269 ar.monitoring_latest_start_date = future_dates[1]
270 ar.monitoring_latest_end_date = future_dates[2]
271 ar.monitoring_date_tolerance = randint(0, 4)
273 # TODO: Cadences are unstable on this entity for the time being
274 # cadence_metacount = randint(1, 3)
275 # ar.monitoring_cadence = Cadence(repeat_counts=[randint(1,10) for _ in range(cadence_metacount)],
276 # deltas=[randint(1,24) for _ in range(cadence_metacount)],
277 # tolerances=[randint(1,3) for _ in range(cadence_metacount)])
278 # ar.monitoring_cadence_tolerance = randint(1, 10) # ???
279 # ar.monitoring_repeat_count = randint(1, 10)
281 # choose some random times
282 ar.earliest_start_time = random_time()
283 ar.latest_start_time = random_time()
284 ar.latest_end_time = random_time()
286 ar.night_time_only = choice([True, False])
287 ar.commensal = choice([True, False])
288 if do_submit:
289 ar_copy = ar.clone()
290 ar_copy.proposal_copy = proposal.observatory_copy
291 else:
292 ar_copy = None
294 for j in range(0, randint(2, 10)):
295 generate_capability_request(
296 session=repo.session,
297 ar=ar,
298 sfc=sfc,
299 ar_copy=ar_copy,
300 capability_request_name=f"{test_type} Capability Request {j}",
301 calibrator_list=calibrator_list,
302 )
304 if make_obspecs:
305 generate_science_target_list(ar)
306 generate_reference_targets(ar)
308 if do_submit and ar_copy:
309 ar_copy.science_targets.extend([st.clone(ar_copy) for st in ar.science_targets])
310 ar_copy.reference_targets.extend([rt.clone() for rt in ar.reference_targets])
312 obspecs = generate_observation_specifications(ar)
313 for os in obspecs:
314 os.is_requested_filler = choice([True, False, False, False, False]) # 20% chance of filler
315 os.is_proposer_modified = choice([True, False, False, False, False]) # 20% chance of proposer modified
317 obspecs_copy = []
318 if do_submit and ar_copy:
319 for obspec in obspecs:
320 obspecs_copy.append(obspec.clone(ar_copy))
322 for observation_specifications in (obspecs, obspecs_copy):
323 if observation_specifications:
324 for i, _ in enumerate(observation_specifications):
325 scans = observation_specifications[i].scans
326 for j, _ in enumerate(scans):
327 scans[j].scan_intents = [
328 all_scan_intents[scan_intent.name] for scan_intent in scans[j].scan_intents
329 ]
330 subscans = scans[j].subscans
331 for k, _ in enumerate(subscans):
332 subscans[k].subscan_intent = all_subscan_intents[subscans[k].subscan_intent.name]
333 ar.observation_specifications = obspecs
334 if do_submit and ar_copy:
335 ar_copy.observation_specifications = obspecs_copy
338# values used for calculating performance parameters
339vla_config_values = ("A", "B", "C", "D")
340b_max_values = (36.4, 11.1, 3.4, 1.03)
341b_min_values = (0.68, 0.21, 0.035, 0.035)
342c = 2.998e5
345def create_field_source(calibrator: dict, random_index: int, allocation_request: AllocationRequest):
346 """
347 Create a new FieldSource instance from calibrator data.
349 Args:
350 calibrator: Dictionary with calibrator data
351 random_index: index for array values
353 Returns:
354 A new FieldSource instance
355 """
356 return FieldSource(
357 source=Source(
358 name=calibrator["name"],
359 lat=calibrator["coord"]["Declination"],
360 long=calibrator["coord"]["Right Ascension"],
361 coordinate_system=calibrator["coord"]["coordinate system"],
362 allocation_request=allocation_request,
363 ),
364 peak_continuum_flux_density=(
365 calibrator["peak continuum flux density"][random_index] if random_index is not None else 0
366 ),
367 )
370def create_spectral_specification(calibrator: dict, random_index: int):
371 """
372 Create a new SpectralSpecification instance from calibrator data.
374 Args:
375 calibrator: Dictionary with calibrator data
376 random_index: index for array values
378 Returns:
379 A new SpectralSpecification instance
380 """
381 return SpectralSpecification(
382 spectral_specification_name=calibrator["name"],
383 center_frequency=calibrator["center frequency"][random_index] if random_index is not None else 1,
384 bandwidth=0, # placeholder value, will be updated at facility check
385 spectral_resolution=0, # same
386 )
389def create_performance_parameter(fs):
390 """
391 Create a new PerformanceParameter instance.
393 Args:
394 fs: field source to derive rms_sensitivity
396 Returns:
397 A new PerformanceParameter instance
398 """
399 # Calculate rms_sensitivity based on field source if provided
400 rms_sensitivity = 0.1 * fs.peak_continuum_flux_density if (fs and fs.peak_continuum_flux_density) else 0.022
402 return PerformanceParameter(
403 angular_resolution=0.139,
404 largest_angular_scale=0,
405 rms_sensitivity=rms_sensitivity,
406 # elevation_min=elevation_min,
407 # elevation_max=elevation_max,
408 # weather_type=weather_type if weather_type is not None else WeatherConditions.ANY,
409 # dynamic_range=dynamic_range,
410 )
413def create_calibration_parameter():
414 """
415 Create a new CalibrationParameter instance.
417 Args:
419 Returns:
420 A new CalibrationParameter instance
421 """
422 return CalibrationParameter(
423 flux_density_calibration=False,
424 test_source=False,
425 polarization_calibration=False,
426 )
429def generate_capability_request(
430 session,
431 ar: AllocationRequest,
432 sfc: SolicitationFacilityCapability,
433 ar_copy: AllocationRequest = None,
434 capability_request_name: str = "Test Capability Request",
435 field_sources: list[FieldSource] | None = None,
436 spectral_specs: list[SpectralSpecification] | None = None,
437 performance_param: PerformanceParameter | None = None,
438 calibration_param: CalibrationParameter | None = None,
439 calibrator_list: dict = None,
440 external_joint_param: ExternalJointParameter | None = None,
441) -> CapabilityRequest:
442 """Parses calibratorList.json to configure CRs/CRPs for the VLA Continuum or GBT Spectral Line capabilities.
443 Note that this service will use fallback values if center frequency is missing from calibrator entry.
445 :param session: The DB session to use for queries
446 :param ar: The AR to generate a CR for
447 :param solicitation_facility_capability: The SolicitationFacilityCapability to make CR for
448 :param ar_copy: The AR to generate a CR for (if provided it means that the proposal is to be submitted and this will go on observatory copy)
449 :param capability_request_name: the capability request name
450 :param field_sources: FieldSources
451 :param spectral_specs: SpectralSpecs
452 :param performance_param: PerformanceParameter for this CR
453 :param calibration_param: CalibrationParameter for this CR
454 :param calibrator_list: A dictionary of calibrators to use for configuration
455 :return: A CapabilityRequest with the given answers
456 :raises ValueError: If Facility is not VLA or GBT
457 """
458 num_calibrators = randint(1, 3)
459 calibrators = []
460 while len(calibrators) < num_calibrators:
461 calibrators.append(choice(list(calibrator_list.values())))
463 facility = sfc.facility
465 # set CRP values based on calibrator if not provided
466 if not field_sources:
467 field_sources = []
468 # create field source for each calibrator
469 for calibrator in calibrators:
470 random_center_freq_index = (
471 randint(0, len(calibrator["center frequency"]) - 1) if calibrator["center frequency"] else None
472 )
473 field_sources.append(create_field_source(calibrator, random_center_freq_index, ar))
474 if not spectral_specs:
475 spectral_specs = []
476 # create spectral spec for each calibrator
477 for calibrator in calibrators:
478 random_center_freq_index = (
479 randint(0, len(calibrator["center frequency"]) - 1) if calibrator["center frequency"] else None
480 )
481 spectral_specs.append(create_spectral_specification(calibrator, random_center_freq_index))
482 if not performance_param:
483 pp = create_performance_parameter(field_sources[0])
484 if not calibration_param:
485 cp = create_calibration_parameter()
487 # Attach Follow-up Cadence for TRIGGERED ARs (10 days ± 3 days; repeatCounts=1)
488 target_pp = pp if not performance_param else performance_param
489 if ar.request_type == RequestType.TRIGGERED and target_pp is not None:
490 target_pp.followup_cadence = Cadence(
491 repeat_counts=[1],
492 deltas=[10 * u.day],
493 tolerances=[3 * u.day],
494 allocation_request=ar,
495 )
497 # VLA-specific performance parameters will be used for both facilities until we define them for GBT
498 random_vla_pp_index = randint(0, 3) # corresponds to VLA configuration
500 # pp.angular_resolution = 1.2 * (c / f_hz) / b_max_values[random_vla_pp_index] (did not produce the correct value...)
502 # set facility specific values
503 for fs, ss in zip(field_sources, spectral_specs):
504 if facility.facility_name == "GBT":
505 ss.bandwidth = 2000
506 ss.spectral_resolution = 10
507 fs.peak_line_flux_density = 0.1 * fs.peak_continuum_flux_density
508 fs.line_width = 50
509 elif facility.facility_name == "VLA":
510 ss.bandwidth = 10
511 ss.spectral_resolution = 300
512 f_hz = 1e9 * ss.center_frequency if ss.center_frequency else 1 # assume freq is given in GHz
513 pp.largest_angular_scale = (c / f_hz) / b_min_values[random_vla_pp_index]
514 elif facility.facility_name in ["ALMA", "JWST", "IXPE", "Swift", "HST", "XMM-Newton", "Fermi", "Chandra"]:
515 # Nothing special needs to be done here
516 pass
517 else:
518 raise ValueError(
519 "Facility must be one of ALMA, JWST, IXPE, Swift, HST, XMM-Newton, Fermi, Chandra, GBT or VLA"
520 )
522 cr = CapabilityRequest(
523 capability_request_name=capability_request_name, solicitation_facility_capability=sfc, allocation_request=ar
524 )
525 cr.field_sources = field_sources
526 cr.spectral_specifications = spectral_specs
527 cr.calibration_parameter = cp
528 cr.performance_parameter = pp
530 session.add(cr)
532 # make a copy of the capability request if we have a copy of the allocation request to match
533 if ar_copy:
534 cr_copy = cr.clone()
535 cr_copy.allocation_request = ar_copy
536 session.add(cr_copy)
538 return cr
541def generate_deterministic_capability_request(
542 session,
543 ar: AllocationRequest,
544 sfc: SolicitationFacilityCapability,
545 ar_copy: AllocationRequest = None,
546 capability_request_name: str = "Test Capability Request",
547 field_sources: list[FieldSource] | None = None,
548 spectral_specs: list[SpectralSpecification] | None = None,
549 performance_param: PerformanceParameter | None = None,
550 calibration_param: CalibrationParameter | None = None,
551 external_joint_param: ExternalJointParameter | None = None,
552 calibrator_list: dict = None,
553) -> CapabilityRequest:
554 """Parses calibratorList.json to configure CRs/CRPs for the VLA Continuum or GBT Spectral Line capabilities.
555 Note that this service will use fallback values if center frequency is missing from calibrator entry.
557 :param session: The DB session to use for queries
558 :param ar: The AR to generate a CR for
559 :param solicitation_facility_capability: The SolicitationFacilityCapability to make CR for
560 :param ar_copy: The AR to generate a CR for (if provided it means that the proposal is to be submitted and this will go on observatory copy)
561 :param capability_request_name: the capability request name
562 :param field_sources: FieldSources
563 :param spectral_specs: SpectralSpecs
564 :param performance_param: PerformanceParameter for this CR
565 :param calibration_param: CalibrationParameter for this CR
566 :param calibrator_list: A dictionary of calibrators to use for configuration
567 :return: A CapabilityRequest with the given answers
568 :raises ValueError: If Facility is not VLA or GBT
569 """
570 num_calibrators = 2
571 calibrators = []
572 calibrators.append(list(calibrator_list.values())[0])
573 calibrators.append(list(calibrator_list.values())[1])
575 facility = sfc.facility
577 # set CRP values based on calibrator if not provided
578 if not field_sources:
579 field_sources = []
580 # create field source for each calibrator
581 for calibrator in calibrators:
582 field_sources.append(create_field_source(calibrator, 0, ar))
583 if not spectral_specs:
584 spectral_specs = []
585 # create spectral spec for each calibrator
586 for calibrator in calibrators:
587 spectral_specs.append(create_spectral_specification(calibrator, 0))
588 if not performance_param:
589 pp = create_performance_parameter(field_sources[0])
590 if not calibration_param:
591 cp = create_calibration_parameter()
593 # Attach Follow-up Cadence for TRIGGERED ARs (10 days ± 3 days; repeatCounts=1)
594 target_pp = pp if not performance_param else performance_param
595 if ar.request_type == RequestType.TRIGGERED and target_pp is not None:
596 target_pp.followup_cadence = Cadence(
597 repeat_counts=[1],
598 deltas=[10 * u.day],
599 tolerances=[3 * u.day],
600 allocation_request=ar,
601 )
603 # VLA-specific performance parameters will be used for both facilities until we define them for GBT
604 vla_pp_index = 0 # corresponds to VLA configuration
606 # pp.angular_resolution = 1.2 * (c / f_hz) / b_max_values[vla_pp_index] (did not produce the correct value...)
608 # set facility specific values
609 for fs, ss in zip(field_sources, spectral_specs):
610 if facility.facility_name == "GBT":
611 ss.bandwidth = 2000
612 ss.spectral_resolution = 10
613 fs.peak_line_flux_density = 0.1 * fs.peak_continuum_flux_density
614 fs.line_width = 50
615 elif facility.facility_name == "VLA":
616 ss.bandwidth = 10
617 ss.spectral_resolution = 300
618 f_hz = 1e9 * ss.center_frequency if ss.center_frequency else 1 # assume freq is given in GHz
619 pp.largest_angular_scale = (c / f_hz) / b_min_values[vla_pp_index]
620 elif facility.facility_name == "ALMA":
621 # Nothing special needs to be done here because it is external
622 pass
623 else:
624 raise ValueError("Facility must be ALMA, VLA or GBT")
626 cr = CapabilityRequest(
627 capability_request_name=capability_request_name, solicitation_facility_capability=sfc, allocation_request=ar
628 )
629 cr.field_sources = field_sources
630 cr.spectral_specifications = spectral_specs
631 cr.calibration_parameter = cp
632 cr.performance_parameter = pp
634 session.add(cr)
636 # make a copy of the capability request if we have a copy of the allocation request to match
637 if ar_copy:
638 cr_copy = cr.clone()
639 cr_copy.allocation_request = ar_copy
640 session.add(cr_copy)
642 return cr
645def parse_config(filename: str = None, config: dict = None, **kwargs):
646 """Parses a JSON file or config dict and overwrites the configuration with any values specified in kwargs.
647 The JSON in the file or config dict must be formatted as defined by the __json__ method of the entity to be
648 generated downstream, containing at minimum the values required by the associated generator method to create
649 that entity.
650 :param filename: The name of the file to load from middle_layer/testdata/config
651 :param config: The input dictionary to modify if not loading a JSON file
652 :param kwargs: keyword-value pairs to overwrite top-level values in the loaded config, e.g.
653 capabilityRequestName="Foo"
654 :return: A dictionary representation of the entity to be generated
655 :raises ValueError: If both filename and config are specified
656 """
657 if filename:
658 if config:
659 raise ValueError("Must specify only filename or config, not both")
660 config_path = (get_middle_layer() / f"testdata/config/{filename}").resolve()
661 with open(config_path) as f:
662 config = json.load(f)
663 for key, value in kwargs.items():
664 config[key] = value
665 return config
668def get_crp_config_with_cps_name(name: str, crps: list[dict]) -> dict:
669 for crp in crps:
670 if get_cps_name_from_crp_config(crp) == name:
671 return crp
672 return {}
675def get_cps_name_from_crp_config(crp_config: dict) -> str:
676 return crp_config["solicitationCapabilityParameterSpecification"]["capabilityParameterSpecification"][
677 "capabilityParameterSpecificationName"
678 ]