Coverage for middle_layer/allocate/application_layer/orm_repositories/observation_specification_disposition.py: 100.00%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2026-03-09 06:13 +0000

1# Copyright 2024 Associated Universities, Inc. 

2# 

3# This file is part of Telescope Time Allocation Tools (TTAT). 

4# 

5# TTAT is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# any later version. 

9# 

10# TTAT is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with TTAT. If not, see <https://www.gnu.org/licenses/>. 

17 

18"""ORMRepository for ObservationSpecificationDisposition""" 

19 

20from sqlalchemy.orm.session import Session 

21 

22from allocate.domain_layer.entities.observation_specification_disposition import ( 

23 ObservationSpecificationDisposition, 

24 ProprietaryPeriodType, 

25) 

26from allocate.domain_layer.repositories.observation_specification_disposition import ( 

27 ObservationSpecificationDispositionRepository, 

28) 

29from common.application_layer.orm_repositories import add_entity, get_object_by_id, list_entities 

30 

31 

32class ObservationSpecificationDispositionORMRepository(ObservationSpecificationDispositionRepository): 

33 def __init__(self, session: Session) -> None: 

34 self.session = session 

35 

36 def add(self, observation_specification_disposition: ObservationSpecificationDisposition) -> int: 

37 add_entity(self.session, observation_specification_disposition) 

38 return observation_specification_disposition.observation_specification_disposition_id 

39 

40 def by_id(self, observation_specification_disposition_id: int) -> ObservationSpecificationDisposition: 

41 return get_object_by_id( 

42 self.session, 

43 observation_specification_disposition_id, 

44 ObservationSpecificationDisposition, 

45 ObservationSpecificationDisposition.observation_specification_disposition_id, 

46 ) 

47 

48 def list_all(self) -> list[ObservationSpecificationDisposition]: 

49 return list_entities( 

50 self.session, 

51 ObservationSpecificationDisposition, 

52 ObservationSpecificationDisposition.observation_specification_disposition_id, 

53 ) 

54 

55 def update(self, observation_specification_disposition: ObservationSpecificationDisposition) -> None: 

56 osd = self.by_id(observation_specification_disposition.observation_specification_disposition_id) 

57 

58 osd.allocation_disposition_id = observation_specification_disposition.allocation_disposition_id 

59 osd.allocation_request_id = observation_specification_disposition.allocation_request_id 

60 # TODO: See STT-1763 and STT-1764 as they address scans in OSDs 

61 # osd.scans = [] 

62 # facility_configuration updates are managed via adding/removing facility configurations 

63 osd.scheduling_priority_name = observation_specification_disposition.scheduling_priority_name 

64 # there should be a "constraints" field copied here 

65 

66 osd.proprietary_period_type = observation_specification_disposition.proprietary_period_type 

67 osd.proprietary_period_duration = observation_specification_disposition.proprietary_period_duration 

68 osd.proprietary_period_date = observation_specification_disposition.proprietary_period_date 

69 

70 osd.overhead = observation_specification_disposition.overhead 

71 

72 osd.allocated_time_per_repeat = observation_specification_disposition.allocated_time_per_repeat 

73 osd.allocated_earliest_start_time = observation_specification_disposition.allocated_earliest_start_time 

74 osd.allocated_latest_end_time = observation_specification_disposition.allocated_latest_end_time 

75 osd.allocated_earliest_date = observation_specification_disposition.allocated_earliest_date 

76 osd.allocated_latest_date = observation_specification_disposition.allocated_latest_date 

77 osd.allocated_cadence = observation_specification_disposition.allocated_cadence 

78 

79 osd.requested_earliest_start_time = observation_specification_disposition.requested_earliest_start_time 

80 osd.requested_latest_end_time = observation_specification_disposition.requested_latest_end_time 

81 osd.requested_cadence = observation_specification_disposition.requested_cadence 

82 

83 osd.is_requested_filler = observation_specification_disposition.is_requested_filler 

84 osd.is_proposer_modified = observation_specification_disposition.is_proposer_modified 

85 osd.scheduling_priority_locked = observation_specification_disposition.scheduling_priority_locked 

86 osd.requires_manual_generation = observation_specification_disposition.requires_manual_generation 

87 osd.temporal_reference = observation_specification_disposition.temporal_reference 

88 

89 self.session.flush() 

90 

91 def delete(self, observation_specification_disposition: ObservationSpecificationDisposition) -> None: 

92 self.session.delete(observation_specification_disposition) 

93 self.session.flush()