Coverage for middle_layer/common/application_layer/rest_api/jwtauth/jwtauth.py: 79.12%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2026-04-13 06:13 +0000

1# Copyright 2024 Associated Universities, Inc. 

2# 

3# This file is part of Telescope Time Allocation Tools (TTAT). 

4# 

5# TTAT is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# any later version. 

9# 

10# TTAT is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with TTAT. If not, see <https://www.gnu.org/licenses/>. 

17# 

18import datetime 

19import logging 

20 

21import jwt 

22from pyramid.interfaces import ISecurityPolicy 

23from pyramid.request import Request 

24from pyramid.security import Allowed, Denied 

25from zope.interface import implementer 

26 

27from auth import auth 

28from common.domain_layer.entities.user import User 

29 

30 

31@implementer(ISecurityPolicy) 

32class JWTAuth: 

33 """ 

34 This is a pyramid authorization policy to handle JWT's 

35 inspired heavily by https://github.com/wichert/pyramid_jwt/ 

36 """ 

37 

38 def __init__( 

39 self, 

40 private_key, 

41 public_key=None, 

42 algorithm="HS512", 

43 leeway=0, 

44 expiration=None, 

45 http_header="Authorization", 

46 auth_type="JWT", 

47 callback=None, 

48 audience="tta", 

49 ): 

50 self.private_key = private_key 

51 self.public_key = public_key if public_key is not None else private_key 

52 self.algorithm = algorithm 

53 self.leeway = leeway 

54 self.http_header = http_header 

55 self.auth_type = auth_type 

56 if not isinstance(expiration, datetime.timedelta): 

57 expiration = datetime.timedelta(days=30) 

58 self.expiration = expiration 

59 self.audience = audience 

60 self.callback = callback 

61 

62 def create_token(self, user: User, **claims) -> str: 

63 """ 

64 creates a JWT from a User object with optional additional claims 

65 :param user: the User object to create the JWT for 

66 :param claims: additional claims to add to the JWT - will overwrite any existing ones from the User obj 

67 :return: a JWT string 

68 """ 

69 payload = user.__json__() 

70 payload.update(claims) 

71 payload["sub"] = user.user_id 

72 payload["iat"] = iat = datetime.datetime.now(datetime.UTC) 

73 payload["exp"] = iat + self.expiration 

74 payload["aud"] = self.audience 

75 token = jwt.encode(payload, self.private_key, algorithm=self.algorithm) 

76 if not isinstance(token, str): # Python3 unicode madness 

77 token = token.decode("ascii") 

78 return token 

79 

80 def get_claims(self, request: Request) -> dict: 

81 """ 

82 gets the jwt from the Authorization header and returns it's claims 

83 :param request: the pyramid request 

84 :return: a dict of the claims 

85 """ 

86 try: 

87 if request.authorization is None: 

88 return {} 

89 except ValueError: # Invalid Authorization header 

90 return {} 

91 auth_type, token = request.authorization 

92 if auth_type != self.auth_type: 

93 return {} 

94 if not token: 

95 return {} 

96 try: 

97 claims = jwt.decode( 

98 token, 

99 self.public_key, 

100 algorithms=[self.algorithm], 

101 leeway=self.leeway, 

102 audience=self.audience, 

103 ) 

104 return claims 

105 except jwt.InvalidTokenError as e: 

106 logging.warning(f"Invalid JWT token from {request.remote_addr}: {e}") 

107 return {} 

108 

109 def identity(self, request: Request) -> User | None: 

110 """ 

111 validates a JWT and returns a User by the JWT user id (subject) claim value 

112 :param request: the pyramid request 

113 :return: a User object if valid, otherwise None 

114 """ 

115 user_id = self.get_claims(request).get("sub", None) 

116 if user_id is None: 

117 return None 

118 try: 

119 user = auth.get_user_by_id(user_id) 

120 except ValueError as e: 

121 logging.warning(e) 

122 return None 

123 return user 

124 

125 def authenticated_userid(self, request: Request) -> int | None: 

126 """ 

127 returns the id of a Validated User 

128 :param request: the pyramid request 

129 :return: the users id, if validated 

130 """ 

131 user = self.identity(request) 

132 if user is None: 

133 return None 

134 return user.user_id 

135 

136 def permits(self, request: Request, context: object, permission: str) -> Allowed | Denied: 

137 """ 

138 this is the authorization function used for permission on any view or resource 

139 :param request: the Pyramid Request object 

140 :param context: the context of the calling view or resource 

141 :param permission: a string of the permission name for this context 

142 :return: a Pyramid Security Allowed or Denied object 

143 """ 

144 # in theory if you don't add a permission param to the view_config, this method doesn't get called 

145 # but figured having an explicit "always_allow" was nice so the intent is known on the view 

146 user = self.identity(request) 

147 if user is None: 

148 logging.warning("User is empty") 

149 return Denied("User is not signed in or invalid") 

150 # we have a valid user 

151 # there is concept "Access Control List" (ACL) in pyramid, but we are not using it right now 

152 # if we want to consider it in the future, we are using URL Dispatch matching, so here is where to start 

153 # https://docs.pylonsproject.org/projects/pyramid/en/2.0-branch/narr/urldispatch.html#using-security-with-urldispatch 

154 # for now, we are using just matching the values of "permission" e.g. 

155 # @view_config(route_name="solicitation_capability_upsert", renderer="json", permission="solicitation_update") 

156 # against the dict below to get what roles are allowed - then using the roles list in our user object to match 

157 all_roles = ["tta_user", "tta_member"] 

158 permissions = { 

159 "author_add": all_roles, 

160 "authors_list": all_roles, 

161 "author_by_id": all_roles, 

162 "author_update": all_roles, 

163 "author_delete": all_roles, 

164 "facilities_list": all_roles, 

165 "facility_by_id": all_roles, 

166 "facility_update": ["tta_member"], 

167 "message": all_roles, 

168 "message_add": ["tta_member"], 

169 "message_delete": ["tta_member"], 

170 "notification_groups_list": all_roles, 

171 "notification_group_by_id": all_roles, 

172 "notification_group_update": ["tta_member"], 

173 "notification_group_delete": ["tta_member"], 

174 "proposal_processes_list": all_roles, 

175 "proposal_process_by_id": all_roles, 

176 "proposal_process_update": ["tta_member"], 

177 "proposal_process_delete": ["tta_member"], 

178 "proposal_scientific_justification_delete": all_roles, 

179 "science_categories_list": all_roles, 

180 "science_category_by_id": all_roles, 

181 "science_category_update": ["tta_member"], 

182 "science_category_delete": ["tta_member"], 

183 "proposal_classes_list": all_roles, 

184 "proposal_class_by_id": all_roles, 

185 "proposal_class_update": ["tta_member"], 

186 "proposal_class_delete": ["tta_member"], 

187 "add_solicitation_from_config_file": ["tta_member"], 

188 "get_solicitation_config_file": ["tta_member"], 

189 "solicitations_list": all_roles, 

190 "solicitation_by_id": all_roles, 

191 "solicitation_update": ["tta_member"], 

192 "solicitation_delete": ["tta_member"], 

193 "generate_solicitation": ["tta_member"], 

194 "generate_proposals": ["tta_member"], 

195 "set_context": ["tta_member"], 

196 "vet_all_proposals": ["tta_member"], 

197 "complete_panel_configuration": ["tta_member"], 

198 "set_certify_conflicts": ["tta_member"], 

199 "finalize_test_isrs": ["tta_member"], 

200 "launch_consensus": ["tta_member"], 

201 "set_ppr_comments": ["tta_member"], 

202 "complete_ppr_comments": ["tta_member"], 

203 "finalize_test_pprs": ["tta_member"], 

204 "run_all_ppr_steps": ["tta_member"], 

205 "set_osr_comments": ["tta_member"], 

206 "set_osr_scientific_merit_metrics": ["tta_member"], 

207 "finalize_osr_reviews": ["tta_member"], 

208 "run_all_osr_process_steps": ["tta_member"], 

209 "allocate_publish": ["tta_member"], 

210 "cleanup": ["tta_member"], 

211 "capabilities_list": all_roles, 

212 "capability_by_id": all_roles, 

213 "capability_update": ["tta_member"], 

214 "capability_parameter_specifications_list": all_roles, 

215 "capability_parameter_specification_by_id": all_roles, 

216 "capability_parameter_specification_add": ["tta_member"], 

217 "capability_parameter_specification_update": ["tta_member"], 

218 "capability_parameter_specification_delete": ["tta_member"], 

219 "capability_request_delete": all_roles, 

220 "conflict_declaration_update": all_roles, 

221 "validation_parameters_list": all_roles, 

222 "validation_parameter_add": ["tta_member"], 

223 "validation_parameter_delete": ["tta_member"], 

224 "proposals_search": all_roles, 

225 "proposals_list": all_roles, 

226 "proposal_by_id": all_roles, 

227 "proposal_upsert": all_roles, 

228 "proposal_update": all_roles, 

229 "proposal_set_display_order": ["tta_member"], 

230 "proposal_vetting": ["tta_member"], 

231 "proposal_change_state": all_roles, 

232 "solicitation_capabilities_list": all_roles, 

233 "solicitation_capability_by_id": all_roles, 

234 "solicitation_capability_update": ["tta_member"], 

235 "solicitation_capability_delete": ["tta_member"], 

236 "solicitation_facility_capability_by_id": all_roles, 

237 "solicitation_facility_capability_update": ["tta_member"], 

238 "solicitation_facility_capability_delete": ["tta_member"], 

239 "parameter_configurations_list": all_roles, 

240 "parameter_configuration_update": ["tta_member"], 

241 "parameter_configuration_delete": ["tta_member"], 

242 "science_target_list": all_roles, 

243 "science_target_upsert": all_roles, 

244 "science_target_delete": all_roles, 

245 "reference_target_list": all_roles, 

246 "reference_target_upsert": all_roles, 

247 "reference_target_delete": all_roles, 

248 "osr_proposal_review_by_proposal_id": ["tta_member"], 

249 "osr_proposal_review_update_by_proposal_id": ["tta_member"], 

250 "osr_proposal_review_list_by_solicitation_id": all_roles, 

251 "science_review_panel_upsert": ["tta_member"], 

252 "science_review_panel_delete": ["tta_member"], 

253 "science_review_panel_update": ["tta_member"], 

254 "science_review_panel_list": all_roles, 

255 "panel_configuration_complete": ["tta_member"], 

256 "individual_science_review_assignment": all_roles, 

257 "individual_science_review_external_assignment": ["tta_member"], 

258 "individual_science_review_update": all_roles, 

259 "individual_science_review_finalize": all_roles, 

260 "individual_science_review_export": all_roles, 

261 "individual_science_review_import": all_roles, 

262 "individual_science_review_list": all_roles, 

263 "feasibility_review_group": all_roles, 

264 "feasibility_review": all_roles, 

265 "ppr_proposal_review_update": all_roles, 

266 "ppr_proposal_review_finalize": all_roles, 

267 "ppr_proposal_review_list": all_roles, 

268 "ppr_proposal_review_export": all_roles, 

269 "ppr_proposal_review_import": all_roles, 

270 "science_reviewer_update": all_roles, 

271 "science_reviewer_delete": ["tta_member"], 

272 "science_reviewer_list": all_roles, 

273 "feasibility_reviewer_list": all_roles, 

274 "proposal_review_list_by_solicitation_id": all_roles, 

275 "tac_member_list_by_solicitation_id": all_roles, 

276 "tac_member_update": ["tta_member"], 

277 "tac_member_delete": ["tta_member"], 

278 "allocation_version_list_by_group_id": all_roles, 

279 "allocation_version_create": ["tta_member"], 

280 "allocation_version_update": ["tta_member"], 

281 "allocation_version_publish": ["tta_member"], 

282 "source_conflict_check": ["tta_member"], 

283 "allocation_disposition_list": all_roles, 

284 "timebins": ["tta_member"], 

285 "available_time_model_version_list_by_solicitation_id_and_facility_id": all_roles, 

286 "available_time_model_version_by_id": ["tta_member"], 

287 "available_time_model_version_create": ["tta_member"], 

288 "available_time_model_version_update": ["tta_member"], 

289 "available_time_model_upsert": ["tta_member"], 

290 "available_time_model_delete": ["tta_member"], 

291 "available_time_model_list_by_available_time_model_version_id": ["tta_member"], 

292 "allocation_disposition_update": ["tta_member"], 

293 "obspec_disposition_update": ["tta_member"], 

294 "rspec_disposition_update": ["tta_member"], 

295 "proposal_disposition_list": all_roles, 

296 "proposal_disposition_update": all_roles, 

297 "proposal_disposition_group_create": ["tta_member"], 

298 "proposal_disposition_group_list": all_roles, 

299 "proposal_disposition_group_comment_export": ["tta_member"], 

300 "proposal_disposition_group_comment_import": ["tta_member"], 

301 "proposal_summary_list": ["tta_member"], 

302 "proposal_summary_export": ["tta_member"], 

303 "time_reservation_upsert": ["tta_member"], 

304 "time_reservation_delete": ["tta_member"], 

305 "time_reservation_import": ["tta_member"], 

306 "allocated_science_target_update": ["tta_member"], 

307 "calendar": ["tta_member"], 

308 "generate_disposition_letters": ["tta_member"], 

309 "send_disposition_letters": ["tta_member"], 

310 "template_management": ["tta_member"], 

311 "update_disposition_letter": ["tta_member"], 

312 "export_prototype_project": ["tta_member"], 

313 } 

314 

315 this_permission = permissions.get(permission, None) 

316 if this_permission is None or len(this_permission) < 1: 

317 logging.warning(f"No permission entry found for '{permission}'") 

318 return Denied(f"No permission entry found for '{permission}'") 

319 if not hasattr(user, "roles") or len(user.roles) < 1: 

320 logging.warning(f"User '{user.user_id}' has no roles") 

321 return Denied(f"User '{user.user_id}' has no roles") 

322 overlap_roles = [x for x in user.roles if x in this_permission] 

323 if len(overlap_roles) < 1: 

324 logging.warning(f"User does not have the right role") 

325 return Denied("User does not have the right role") 

326 return Allowed("User has the right role to proceed") 

327 

328 def remember(self, request, principal, **kw): 

329 logging.warning("JWT tokens need to be returned by an API. Using remember() " "has no effect.") 

330 return [] 

331 

332 def forget(self, request): 

333 logging.warning("JWT tokens are managed by API (users) manually. Using forget() " "has no effect.") 

334 return []