Coverage for middle_layer/propose/domain_layer/entities/field_source.py: 85.88%
85 statements
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2026-03-09 06:13 +0000
1# Copyright 2024 Associated Universities, Inc.
2#
3# This file is part of Telescope Time Allocation Tools (TTAT).
4#
5# TTAT is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# any later version.
9#
10# TTAT is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with TTAT. If not, see <https://www.gnu.org/licenses/>.
17#
18from __future__ import annotations
20from typing import TYPE_CHECKING, override
22import astropy.units as u
23from astropy.coordinates import SkyCoord
24from sqlalchemy import Enum as SAEnum
25from sqlalchemy import ForeignKey
26from sqlalchemy.orm import Mapped, mapped_column, relationship
28from common.domain_layer import JSON
29from common.domain_layer.entities.base import Base
30from propose.domain_layer.entities import HasSkyCoord
32if TYPE_CHECKING:
33 from propose.domain_layer.entities.proposal import CapabilityRequest
34 from propose.domain_layer.entities.source import Source
36from solicit.domain_layer.entities.parameter_configuration import (
37 CoordinateSystem,
38 DopplerType,
39 FieldOfViewShape,
40 VelocityReferenceFrame,
41)
44class FieldSource(Base, HasSkyCoord):
45 __tablename__ = "field_sources"
47 field_source_id: Mapped[int] = mapped_column(primary_key=True)
48 capability_request_id: Mapped[int] = mapped_column(
49 ForeignKey("capability_requests.capability_request_id", ondelete="CASCADE"),
50 nullable=False,
51 )
52 capability_request: Mapped["CapabilityRequest"] = relationship(
53 "CapabilityRequest",
54 foreign_keys=[capability_request_id],
55 back_populates="field_sources",
56 )
58 source_id: Mapped[int] = mapped_column(ForeignKey("sources.source_id"))
59 source: Mapped["Source"] = relationship()
61 field_of_view_shape: Mapped[FieldOfViewShape] = mapped_column(
62 SAEnum(
63 FieldOfViewShape,
64 name="field_of_view_shape_type",
65 values_callable=lambda x: [e.value for e in x],
66 ),
67 default=FieldOfViewShape.POINT.name,
68 )
69 field_of_view_lat: Mapped[float]
70 field_of_view_long: Mapped[float]
71 radial_velocity: Mapped[float]
72 velocity_reference_frame: Mapped[VelocityReferenceFrame] = mapped_column(
73 SAEnum(
74 VelocityReferenceFrame,
75 name="velocity_reference_frame_type",
76 values_callable=lambda x: [e.value for e in x],
77 ),
78 default=VelocityReferenceFrame.LSRK.name,
79 )
80 doppler_type: Mapped[DopplerType] = mapped_column(
81 SAEnum(
82 DopplerType,
83 name="doppler_type",
84 values_callable=lambda x: [e.value for e in x],
85 ),
86 default=DopplerType.RADIO.name,
87 )
88 parallax: Mapped[float]
89 proper_motion_lat: Mapped[float]
90 proper_motion_long: Mapped[float]
91 peak_continuum_flux_density: Mapped[float]
92 peak_line_flux_density: Mapped[float]
93 line_width: Mapped[float]
95 def __init__(
96 self,
97 source: Source | None = None,
98 field_of_view_shape: FieldOfViewShape | str | None = None,
99 field_of_view_lat: float | None = None,
100 field_of_view_long: float | None = None,
101 radial_velocity: float | None = None,
102 velocity_reference_frame: VelocityReferenceFrame | str | None = None,
103 doppler_type: DopplerType | str | None = None,
104 parallax: float | None = None,
105 proper_motion_lat: float | None = None,
106 proper_motion_long: float | None = None,
107 peak_continuum_flux_density: float | None = None,
108 peak_line_flux_density: float | None = None,
109 line_width: float | None = None,
110 capability_request: CapabilityRequest | None = None,
111 ):
112 super().__init__(
113 source=source,
114 field_of_view_lat=field_of_view_lat,
115 field_of_view_long=field_of_view_long,
116 radial_velocity=radial_velocity,
117 parallax=parallax,
118 proper_motion_lat=proper_motion_lat,
119 proper_motion_long=proper_motion_long,
120 peak_continuum_flux_density=peak_continuum_flux_density,
121 peak_line_flux_density=peak_line_flux_density,
122 line_width=line_width,
123 )
125 # Handle capability_request or capability_request_id
126 if capability_request is not None:
127 self.capability_request = capability_request
128 self.capability_request_id = capability_request.capability_request_id
130 if isinstance(field_of_view_shape, str):
131 try:
132 self.field_of_view_shape = FieldOfViewShape(field_of_view_shape)
133 except ValueError:
134 self.field_of_view_shape = FieldOfViewShape.POINT
135 else:
136 self.field_of_view_shape = field_of_view_shape or FieldOfViewShape.POINT
138 if isinstance(velocity_reference_frame, str):
139 try:
140 self.velocity_reference_frame = VelocityReferenceFrame(velocity_reference_frame)
141 except ValueError:
142 self.velocity_reference_frame = VelocityReferenceFrame.LSRK
143 else:
144 self.velocity_reference_frame = velocity_reference_frame or VelocityReferenceFrame.LSRK
146 if isinstance(doppler_type, str):
147 try:
148 self.doppler_type = DopplerType(doppler_type)
149 except ValueError:
150 self.doppler_type = DopplerType.RADIO
151 else:
152 self.doppler_type = doppler_type or DopplerType.RADIO
154 def __json__(self) -> JSON:
155 return super().__json__() | {
156 "source": self.source.__json__() if self.source else None,
157 }
159 def update_from_json(self, json: JSON):
160 super().update_from_json(json)
161 # Check if this field source should have a source; create/update accordingly
162 if "source" in json and json["source"] is not None:
163 from propose.domain_layer.entities.source import Source
165 if self.source:
166 self.source.update_from_json(json["source"])
167 else:
168 src = Source()
169 src.update_from_json(json["source"])
170 self.source = src
172 @property
173 def coordinate_system_value(self):
174 return getattr(self.coordinate_system, "value", self.coordinate_system)
176 @property
177 def field_of_view_shape_value(self):
178 return getattr(self.field_of_view_shape, "value", self.field_of_view_shape)
180 @property
181 def velocity_reference_frame_value(self):
182 return getattr(self.velocity_reference_frame, "value", self.velocity_reference_frame)
184 @property
185 def doppler_type_value(self):
186 return getattr(self.doppler_type, "value", self.doppler_type)
188 @property
189 @override
190 def sky_coord(self) -> SkyCoord:
191 return self.source.sky_coord
193 def clone(self, capability_request: CapabilityRequest | None = None) -> "FieldSource":
194 return FieldSource(
195 source=self.source,
196 field_of_view_shape=self.field_of_view_shape,
197 field_of_view_lat=self.field_of_view_lat,
198 field_of_view_long=self.field_of_view_long,
199 radial_velocity=self.radial_velocity,
200 velocity_reference_frame=self.velocity_reference_frame,
201 doppler_type=self.doppler_type,
202 parallax=self.parallax,
203 proper_motion_lat=self.proper_motion_lat,
204 proper_motion_long=self.proper_motion_long,
205 peak_continuum_flux_density=self.peak_continuum_flux_density,
206 peak_line_flux_density=self.peak_line_flux_density,
207 line_width=self.line_width,
208 capability_request=capability_request or self.capability_request,
209 )