Coverage for middle_layer/propose/application_layer/orm_repositories/observation_specification.py: 90.79%

76 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2026-04-13 06:13 +0000

1# Copyright 2024 Associated Universities, Inc. 

2# 

3# This file is part of Telescope Time Allocation Tools (TTAT). 

4# 

5# TTAT is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# any later version. 

9# 

10# TTAT is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with TTAT. If not, see <https://www.gnu.org/licenses/>. 

17# 

18from datetime import datetime, timezone 

19 

20from sqlalchemy.orm import Session 

21 

22from allocate.domain_layer.entities.allocation_disposition import AllocationDisposition 

23from allocate.domain_layer.entities.allocation_version import AllocationVersion 

24from allocate.domain_layer.entities.observation_specification_disposition import ObservationSpecificationDisposition 

25from common.application_layer.orm_repositories import add_entity, get_object_by_id, list_entities 

26from propose.domain_layer.entities.observation_specification import ObservationSpecification, Scan 

27from propose.domain_layer.entities.proposal import AllocationRequest, ProposalCopy 

28from propose.domain_layer.repositories.proposal import ObservationSpecificationRepository 

29 

30 

31class ObservationSpecificationORMRepository(ObservationSpecificationRepository): 

32 def __init__(self, session: Session): 

33 self.session = session 

34 

35 def by_id(self, obspec_id: int) -> ObservationSpecification: 

36 return get_object_by_id( 

37 self.session, obspec_id, ObservationSpecification, ObservationSpecification.observation_specification_id 

38 ) 

39 

40 def scan_by_id(self, scan_id: int) -> Scan: 

41 return get_object_by_id(self.session, scan_id, Scan, Scan.scan_id) 

42 

43 def list_all(self) -> list[ObservationSpecification]: 

44 return list_entities( 

45 self.session, ObservationSpecification, ObservationSpecification.observation_specification_id 

46 ) 

47 

48 def list_by_allocation_request_id(self, ar_id: int) -> list[ObservationSpecification]: 

49 return list( 

50 self.session.query(ObservationSpecification) 

51 .filter(ObservationSpecification.allocation_request_id == ar_id) 

52 .order_by(ObservationSpecification.observation_specification_id) 

53 .all() 

54 ) 

55 

56 def list_by_allocation_version_id(self, av_id: int) -> list[ObservationSpecification]: 

57 allocation_requests = ( 

58 self.session.query(AllocationRequest) 

59 .join(AllocationDisposition) 

60 .filter(AllocationDisposition.allocation_version_id == av_id) 

61 .all() 

62 ) 

63 ob_specs = [] 

64 for ar in allocation_requests: 

65 for obspec in ar.observation_specifications: 

66 ob_specs.append(obspec) 

67 return ob_specs 

68 

69 def list_by_proposal_disposition_group_id(self, pdg_id: int) -> list[ObservationSpecification]: 

70 allocation_requests = ( 

71 self.session.query(AllocationRequest) 

72 .join(AllocationDisposition) 

73 .join( 

74 AllocationVersion, 

75 AllocationVersion.allocation_version_id == AllocationDisposition.allocation_version_id, 

76 ) 

77 .filter(AllocationVersion.proposal_disposition_group_id == pdg_id) 

78 .order_by(AllocationRequest.allocation_request_id) 

79 .all() 

80 ) 

81 ob_specs = [] 

82 for ar in allocation_requests: 

83 for obspec in ar.observation_specifications: 

84 ob_specs.append(obspec) 

85 return ob_specs 

86 

87 def add(self, observation_specification: ObservationSpecification) -> int: 

88 add_entity(self.session, observation_specification) 

89 scans = observation_specification.scans 

90 for i, _ in enumerate(scans): 

91 scans[i].observation_specification_id = observation_specification.observation_specification_id 

92 add_entity(self.session, scans[i]) 

93 subscans = scans[i].subscans 

94 for j, _ in enumerate(subscans): 

95 subscans[j].scan_id = scans[i].scan_id 

96 add_entity(self.session, subscans[j]) 

97 

98 ar = get_object_by_id( 

99 self.session, 

100 observation_specification.allocation_request_id, 

101 AllocationRequest, 

102 AllocationRequest.allocation_request_id, 

103 ) 

104 proposal_copy = get_object_by_id(self.session, ar.proposal_copy_id, ProposalCopy, ProposalCopy.proposal_copy_id) 

105 proposal_copy.modified_timestamp = datetime.now(timezone.utc) 

106 self.session.flush() 

107 return observation_specification.observation_specification_id 

108 

109 def add_scan(self, scan: Scan) -> int: 

110 add_entity(self.session, scan) 

111 for j, _ in enumerate(scan.subscans): 

112 scan.subscans[j].scan_id = scan.scan_id 

113 add_entity(self.session, scan.subscans[j]) 

114 return scan.scan_id 

115 

116 def update(self, observation_specification: ObservationSpecification) -> None: 

117 o = self.by_id(observation_specification.observation_specification_id) 

118 o.scans = observation_specification.scans 

119 

120 ar = get_object_by_id( 

121 self.session, 

122 observation_specification.allocation_request_id, 

123 AllocationRequest, 

124 AllocationRequest.allocation_request_id, 

125 ) 

126 proposal_copy = get_object_by_id(self.session, ar.proposal_copy_id, ProposalCopy, ProposalCopy.proposal_copy_id) 

127 proposal_copy.modified_timestamp = datetime.now(timezone.utc) 

128 self.session.flush() 

129 

130 def update_scan(self, scan: Scan) -> None: 

131 retrieved_scan = self.scan_by_id(scan.scan_id) 

132 retrieved_scan.position_in_list = scan.position_in_list 

133 retrieved_scan.scan_intents = scan.scan_intents 

134 retrieved_scan.subscans = scan.subscans 

135 self.session.flush() 

136 

137 def delete(self, observation_specification: ObservationSpecification) -> None: 

138 for s in observation_specification.scans: 

139 self.session.delete(s) 

140 self.session.delete(observation_specification) 

141 self.session.flush() 

142 

143 def delete_scan(self, scan: Scan) -> None: 

144 self.session.delete(scan) 

145 self.session.flush()