Coverage for middle_layer/common/application_layer/rest_api/jwtauth/__init__.py: 89.19%

37 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2026-05-11 06:14 +0000

1# Copyright 2024 Associated Universities, Inc. 

2# 

3# This file is part of Telescope Time Allocation Tools (TTAT). 

4# 

5# TTAT is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# any later version. 

9# 

10# TTAT is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with TTAT. If not, see <https://www.gnu.org/licenses/>. 

17# 

18from pyramid.httpexceptions import HTTPUnauthorized 

19from pyramid.request import Request 

20 

21from common.domain_layer.entities.user import User 

22 

23from .jwtauth import JWTAuth 

24 

25 

26def includeme(config): 

27 # this adds a method to the pyramid Request object used to set up the policy in a way we can pass it parameters 

28 config.add_directive("set_jwt_security_policy", set_jwt_security_policy, action_wrap=True) 

29 

30 

31def create_jwt_security_policy( 

32 config, 

33 private_key=None, 

34 public_key=None, 

35 algorithm=None, 

36 expiration=None, 

37 leeway=None, 

38 http_header=None, 

39 auth_type=None, 

40 callback=None, 

41 audience=None, 

42): 

43 """ 

44 This instantiates the JWTAuth class 

45 :param config: the Pyramid config object 

46 :param private_key: the private key used to encode/validate JWT's 

47 :param public_key: a public key, if we go that way in the future 

48 :param algorithm: the encoding algorithm to use in the JWT's 

49 :param expiration: how long from being issues should a JWT be good for - datetime.timedelta 

50 :param leeway: how many seconds of leeway we will allow for the JWT expiration 

51 :param http_header: the http header where we expect to find the JWT 

52 :param auth_type: the type of value in the above mention http_header 

53 :param callback: optional callback 

54 :param audience: the value to use in the JWT's "aud" (audience) claim 

55 :return: a JWTAuth instance 

56 """ 

57 settings = config.get_settings() 

58 private_key = private_key or settings.get("jwt.private_key") 

59 audience = audience or settings.get("jwt.audience") 

60 algorithm = algorithm or settings.get("jwt.algorithm") or "HS512" 

61 if not algorithm.startswith("HS"): 

62 public_key = public_key or settings.get("jwt.public_key") 

63 else: 

64 public_key = None 

65 if expiration is None and "jwt.expiration" in settings: 

66 expiration = int(settings.get("jwt.expiration")) 

67 leeway = int(settings.get("jwt.leeway", 0)) if leeway is None else leeway 

68 http_header = http_header or settings.get("jwt.http_header") or "Authorization" 

69 if http_header.lower() == "authorization": 

70 auth_type = auth_type or settings.get("jwt.auth_type") or "JWT" 

71 else: 

72 auth_type = None 

73 return JWTAuth( 

74 private_key=private_key, 

75 public_key=public_key, 

76 algorithm=algorithm, 

77 leeway=leeway, 

78 expiration=expiration, 

79 http_header=http_header, 

80 auth_type=auth_type, 

81 callback=callback, 

82 audience=audience, 

83 ) 

84 

85 

86def _request_create_token(request: Request, user: User, **claims): 

87 """ 

88 helper function to allow access to creating tokens from the pyramid Request object 

89 :param request: the request object 

90 :param user: a User to create a JWT from 

91 :param claims: and additional claims to add to the JWT 

92 :return: a JWT string (via security_policy.create_token) 

93 """ 

94 return request.security_policy.create_token(user, **claims) 

95 

96 

97def _request_claims(request): 

98 """ 

99 helper function allow access to JWT claims from the pyramid Request object 

100 :param request: the request object 

101 :return: a dict of claims from the JWT 

102 """ 

103 return request.security_policy.get_claims(request) 

104 

105 

106def _forbidden(request: Request): 

107 """ 

108 Override the default Pyramid "forbidden view" - what to show when a security policy fails to validate a view 

109 :param request: the Pyramid request 

110 :return: a Response 

111 """ 

112 return HTTPUnauthorized( 

113 body=f"Please log in again, or ensure you have a valid JWT in the request. {str(request.exception)}" 

114 ) 

115 

116 

117def _configure(config, auth_policy): 

118 """ 

119 sets the auth_policy create (JWTAuth) and adds it to the Pyramid Config 

120 :param config: the config object 

121 :param auth_policy: the auth_policy object 

122 :return: None 

123 """ 

124 config.set_security_policy(auth_policy) 

125 config.add_request_method(lambda request: auth_policy, "security_policy", reify=True) 

126 config.add_request_method(_request_claims, "jwt_claims", reify=True) 

127 config.add_request_method(_request_create_token, "create_jwt_token") 

128 config.add_forbidden_view(_forbidden) 

129 

130 

131def set_jwt_security_policy( 

132 config, 

133 private_key=None, 

134 public_key=None, 

135 algorithm=None, 

136 expiration=None, 

137 leeway=None, 

138 http_header=None, 

139 auth_type=None, 

140 callback=None, 

141 audience=None, 

142): 

143 policy = create_jwt_security_policy( 

144 config, 

145 private_key, 

146 public_key, 

147 algorithm, 

148 expiration, 

149 leeway, 

150 http_header, 

151 auth_type, 

152 callback, 

153 audience, 

154 ) 

155 

156 _configure(config, policy)