Coverage for middle_layer/common/application_layer/rest_api/jwtauth/jwtauth.py: 79.12%
91 statements
« prev ^ index » next coverage.py v7.10.5, created at 2026-04-13 06:13 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2026-04-13 06:13 +0000
1# Copyright 2024 Associated Universities, Inc.
2#
3# This file is part of Telescope Time Allocation Tools (TTAT).
4#
5# TTAT is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# any later version.
9#
10# TTAT is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with TTAT. If not, see <https://www.gnu.org/licenses/>.
17#
18import datetime
19import logging
21import jwt
22from pyramid.interfaces import ISecurityPolicy
23from pyramid.request import Request
24from pyramid.security import Allowed, Denied
25from zope.interface import implementer
27from auth import auth
28from common.domain_layer.entities.user import User
31@implementer(ISecurityPolicy)
32class JWTAuth:
33 """
34 This is a pyramid authorization policy to handle JWT's
35 inspired heavily by https://github.com/wichert/pyramid_jwt/
36 """
38 def __init__(
39 self,
40 private_key,
41 public_key=None,
42 algorithm="HS512",
43 leeway=0,
44 expiration=None,
45 http_header="Authorization",
46 auth_type="JWT",
47 callback=None,
48 audience="tta",
49 ):
50 self.private_key = private_key
51 self.public_key = public_key if public_key is not None else private_key
52 self.algorithm = algorithm
53 self.leeway = leeway
54 self.http_header = http_header
55 self.auth_type = auth_type
56 if not isinstance(expiration, datetime.timedelta):
57 expiration = datetime.timedelta(days=30)
58 self.expiration = expiration
59 self.audience = audience
60 self.callback = callback
62 def create_token(self, user: User, **claims) -> str:
63 """
64 creates a JWT from a User object with optional additional claims
65 :param user: the User object to create the JWT for
66 :param claims: additional claims to add to the JWT - will overwrite any existing ones from the User obj
67 :return: a JWT string
68 """
69 payload = user.__json__()
70 payload.update(claims)
71 payload["sub"] = user.user_id
72 payload["iat"] = iat = datetime.datetime.now(datetime.UTC)
73 payload["exp"] = iat + self.expiration
74 payload["aud"] = self.audience
75 token = jwt.encode(payload, self.private_key, algorithm=self.algorithm)
76 if not isinstance(token, str): # Python3 unicode madness
77 token = token.decode("ascii")
78 return token
80 def get_claims(self, request: Request) -> dict:
81 """
82 gets the jwt from the Authorization header and returns it's claims
83 :param request: the pyramid request
84 :return: a dict of the claims
85 """
86 try:
87 if request.authorization is None:
88 return {}
89 except ValueError: # Invalid Authorization header
90 return {}
91 auth_type, token = request.authorization
92 if auth_type != self.auth_type:
93 return {}
94 if not token:
95 return {}
96 try:
97 claims = jwt.decode(
98 token,
99 self.public_key,
100 algorithms=[self.algorithm],
101 leeway=self.leeway,
102 audience=self.audience,
103 )
104 return claims
105 except jwt.InvalidTokenError as e:
106 logging.warning(f"Invalid JWT token from {request.remote_addr}: {e}")
107 return {}
109 def identity(self, request: Request) -> User | None:
110 """
111 validates a JWT and returns a User by the JWT user id (subject) claim value
112 :param request: the pyramid request
113 :return: a User object if valid, otherwise None
114 """
115 user_id = self.get_claims(request).get("sub", None)
116 if user_id is None:
117 return None
118 try:
119 user = auth.get_user_by_id(user_id)
120 except ValueError as e:
121 logging.warning(e)
122 return None
123 return user
125 def authenticated_userid(self, request: Request) -> int | None:
126 """
127 returns the id of a Validated User
128 :param request: the pyramid request
129 :return: the users id, if validated
130 """
131 user = self.identity(request)
132 if user is None:
133 return None
134 return user.user_id
136 def permits(self, request: Request, context: object, permission: str) -> Allowed | Denied:
137 """
138 this is the authorization function used for permission on any view or resource
139 :param request: the Pyramid Request object
140 :param context: the context of the calling view or resource
141 :param permission: a string of the permission name for this context
142 :return: a Pyramid Security Allowed or Denied object
143 """
144 # in theory if you don't add a permission param to the view_config, this method doesn't get called
145 # but figured having an explicit "always_allow" was nice so the intent is known on the view
146 user = self.identity(request)
147 if user is None:
148 logging.warning("User is empty")
149 return Denied("User is not signed in or invalid")
150 # we have a valid user
151 # there is concept "Access Control List" (ACL) in pyramid, but we are not using it right now
152 # if we want to consider it in the future, we are using URL Dispatch matching, so here is where to start
153 # https://docs.pylonsproject.org/projects/pyramid/en/2.0-branch/narr/urldispatch.html#using-security-with-urldispatch
154 # for now, we are using just matching the values of "permission" e.g.
155 # @view_config(route_name="solicitation_capability_upsert", renderer="json", permission="solicitation_update")
156 # against the dict below to get what roles are allowed - then using the roles list in our user object to match
157 all_roles = ["tta_user", "tta_member"]
158 permissions = {
159 "author_add": all_roles,
160 "authors_list": all_roles,
161 "author_by_id": all_roles,
162 "author_update": all_roles,
163 "author_delete": all_roles,
164 "facilities_list": all_roles,
165 "facility_by_id": all_roles,
166 "facility_update": ["tta_member"],
167 "message": all_roles,
168 "message_add": ["tta_member"],
169 "message_delete": ["tta_member"],
170 "notification_groups_list": all_roles,
171 "notification_group_by_id": all_roles,
172 "notification_group_update": ["tta_member"],
173 "notification_group_delete": ["tta_member"],
174 "proposal_processes_list": all_roles,
175 "proposal_process_by_id": all_roles,
176 "proposal_process_update": ["tta_member"],
177 "proposal_process_delete": ["tta_member"],
178 "proposal_scientific_justification_delete": all_roles,
179 "science_categories_list": all_roles,
180 "science_category_by_id": all_roles,
181 "science_category_update": ["tta_member"],
182 "science_category_delete": ["tta_member"],
183 "proposal_classes_list": all_roles,
184 "proposal_class_by_id": all_roles,
185 "proposal_class_update": ["tta_member"],
186 "proposal_class_delete": ["tta_member"],
187 "add_solicitation_from_config_file": ["tta_member"],
188 "get_solicitation_config_file": ["tta_member"],
189 "solicitations_list": all_roles,
190 "solicitation_by_id": all_roles,
191 "solicitation_update": ["tta_member"],
192 "solicitation_delete": ["tta_member"],
193 "generate_solicitation": ["tta_member"],
194 "generate_proposals": ["tta_member"],
195 "set_context": ["tta_member"],
196 "vet_all_proposals": ["tta_member"],
197 "complete_panel_configuration": ["tta_member"],
198 "set_certify_conflicts": ["tta_member"],
199 "finalize_test_isrs": ["tta_member"],
200 "launch_consensus": ["tta_member"],
201 "set_ppr_comments": ["tta_member"],
202 "complete_ppr_comments": ["tta_member"],
203 "finalize_test_pprs": ["tta_member"],
204 "run_all_ppr_steps": ["tta_member"],
205 "set_osr_comments": ["tta_member"],
206 "set_osr_scientific_merit_metrics": ["tta_member"],
207 "finalize_osr_reviews": ["tta_member"],
208 "run_all_osr_process_steps": ["tta_member"],
209 "allocate_publish": ["tta_member"],
210 "cleanup": ["tta_member"],
211 "capabilities_list": all_roles,
212 "capability_by_id": all_roles,
213 "capability_update": ["tta_member"],
214 "capability_parameter_specifications_list": all_roles,
215 "capability_parameter_specification_by_id": all_roles,
216 "capability_parameter_specification_add": ["tta_member"],
217 "capability_parameter_specification_update": ["tta_member"],
218 "capability_parameter_specification_delete": ["tta_member"],
219 "capability_request_delete": all_roles,
220 "conflict_declaration_update": all_roles,
221 "validation_parameters_list": all_roles,
222 "validation_parameter_add": ["tta_member"],
223 "validation_parameter_delete": ["tta_member"],
224 "proposals_search": all_roles,
225 "proposals_list": all_roles,
226 "proposal_by_id": all_roles,
227 "proposal_upsert": all_roles,
228 "proposal_update": all_roles,
229 "proposal_set_display_order": ["tta_member"],
230 "proposal_vetting": ["tta_member"],
231 "proposal_change_state": all_roles,
232 "solicitation_capabilities_list": all_roles,
233 "solicitation_capability_by_id": all_roles,
234 "solicitation_capability_update": ["tta_member"],
235 "solicitation_capability_delete": ["tta_member"],
236 "solicitation_facility_capability_by_id": all_roles,
237 "solicitation_facility_capability_update": ["tta_member"],
238 "solicitation_facility_capability_delete": ["tta_member"],
239 "parameter_configurations_list": all_roles,
240 "parameter_configuration_update": ["tta_member"],
241 "parameter_configuration_delete": ["tta_member"],
242 "science_target_list": all_roles,
243 "science_target_upsert": all_roles,
244 "science_target_delete": all_roles,
245 "reference_target_list": all_roles,
246 "reference_target_upsert": all_roles,
247 "reference_target_delete": all_roles,
248 "osr_proposal_review_by_proposal_id": ["tta_member"],
249 "osr_proposal_review_update_by_proposal_id": ["tta_member"],
250 "osr_proposal_review_list_by_solicitation_id": all_roles,
251 "science_review_panel_upsert": ["tta_member"],
252 "science_review_panel_delete": ["tta_member"],
253 "science_review_panel_update": ["tta_member"],
254 "science_review_panel_list": all_roles,
255 "panel_configuration_complete": ["tta_member"],
256 "individual_science_review_assignment": all_roles,
257 "individual_science_review_external_assignment": ["tta_member"],
258 "individual_science_review_update": all_roles,
259 "individual_science_review_finalize": all_roles,
260 "individual_science_review_export": all_roles,
261 "individual_science_review_import": all_roles,
262 "individual_science_review_list": all_roles,
263 "feasibility_review_group": all_roles,
264 "feasibility_review": all_roles,
265 "ppr_proposal_review_update": all_roles,
266 "ppr_proposal_review_finalize": all_roles,
267 "ppr_proposal_review_list": all_roles,
268 "ppr_proposal_review_export": all_roles,
269 "ppr_proposal_review_import": all_roles,
270 "science_reviewer_update": all_roles,
271 "science_reviewer_delete": ["tta_member"],
272 "science_reviewer_list": all_roles,
273 "feasibility_reviewer_list": all_roles,
274 "proposal_review_list_by_solicitation_id": all_roles,
275 "tac_member_list_by_solicitation_id": all_roles,
276 "tac_member_update": ["tta_member"],
277 "tac_member_delete": ["tta_member"],
278 "allocation_version_list_by_group_id": all_roles,
279 "allocation_version_create": ["tta_member"],
280 "allocation_version_update": ["tta_member"],
281 "allocation_version_publish": ["tta_member"],
282 "source_conflict_check": ["tta_member"],
283 "allocation_disposition_list": all_roles,
284 "timebins": ["tta_member"],
285 "available_time_model_version_list_by_solicitation_id_and_facility_id": all_roles,
286 "available_time_model_version_by_id": ["tta_member"],
287 "available_time_model_version_create": ["tta_member"],
288 "available_time_model_version_update": ["tta_member"],
289 "available_time_model_upsert": ["tta_member"],
290 "available_time_model_delete": ["tta_member"],
291 "available_time_model_list_by_available_time_model_version_id": ["tta_member"],
292 "allocation_disposition_update": ["tta_member"],
293 "obspec_disposition_update": ["tta_member"],
294 "rspec_disposition_update": ["tta_member"],
295 "proposal_disposition_list": all_roles,
296 "proposal_disposition_update": all_roles,
297 "proposal_disposition_group_create": ["tta_member"],
298 "proposal_disposition_group_list": all_roles,
299 "proposal_disposition_group_comment_export": ["tta_member"],
300 "proposal_disposition_group_comment_import": ["tta_member"],
301 "proposal_summary_list": ["tta_member"],
302 "proposal_summary_export": ["tta_member"],
303 "time_reservation_upsert": ["tta_member"],
304 "time_reservation_delete": ["tta_member"],
305 "time_reservation_import": ["tta_member"],
306 "allocated_science_target_update": ["tta_member"],
307 "calendar": ["tta_member"],
308 "generate_disposition_letters": ["tta_member"],
309 "send_disposition_letters": ["tta_member"],
310 "template_management": ["tta_member"],
311 "update_disposition_letter": ["tta_member"],
312 "export_prototype_project": ["tta_member"],
313 }
315 this_permission = permissions.get(permission, None)
316 if this_permission is None or len(this_permission) < 1:
317 logging.warning(f"No permission entry found for '{permission}'")
318 return Denied(f"No permission entry found for '{permission}'")
319 if not hasattr(user, "roles") or len(user.roles) < 1:
320 logging.warning(f"User '{user.user_id}' has no roles")
321 return Denied(f"User '{user.user_id}' has no roles")
322 overlap_roles = [x for x in user.roles if x in this_permission]
323 if len(overlap_roles) < 1:
324 logging.warning(f"User does not have the right role")
325 return Denied("User does not have the right role")
326 return Allowed("User has the right role to proceed")
328 def remember(self, request, principal, **kw):
329 logging.warning("JWT tokens need to be returned by an API. Using remember() " "has no effect.")
330 return []
332 def forget(self, request):
333 logging.warning("JWT tokens are managed by API (users) manually. Using forget() " "has no effect.")
334 return []