Coverage for src / superset_io / session.py: 99%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 13:07 +0100

1from __future__ import annotations 

2 

3import base64 

4import json 

5import logging 

6import re 

7from typing import Self, cast 

8 

9import requests 

10 

11log = logging.getLogger("superset_io") 

12 

13 

14class SupersetApiSession(requests.Session): 

15 base_url: str 

16 bearer_token: str | None 

17 csrf_token: str | None 

18 

19 def __init__( 

20 self, 

21 base_url: str, 

22 bearer_token: str | None = None, 

23 csrf_token: str | None = None, 

24 *args, 

25 **kwargs, 

26 ): 

27 super().__init__(*args, **kwargs) 

28 self.base_url = base_url 

29 self.bearer_token = bearer_token 

30 self.csrf_token = csrf_token 

31 

32 headers = {"User-Agent": "coasti-superset-import-export/1.0.0"} 

33 if bearer_token: 

34 headers["Authorization"] = f"Bearer {bearer_token}" 

35 if csrf_token: 

36 headers["X-CSRFToken"] = csrf_token 

37 

38 self.headers.update(headers) 

39 

40 def request(self, method: str | bytes, url: str | bytes, *args, **kwargs): 

41 if isinstance(url, str) and not url.startswith("http"): 

42 url = f"{self.base_url}{url}" 

43 return super().request(method, url, *args, **kwargs) 

44 

45 @classmethod 

46 def from_credentials( 

47 cls, 

48 base_url: str, 

49 username: str, 

50 password: str, 

51 ) -> Self: 

52 """Authenticate and return an authenticated SupersetApiSession.""" 

53 session = cls(base_url=base_url) 

54 

55 try: 

56 bearer_token = session._get_bearer_token(username, password) 

57 session.bearer_token = bearer_token 

58 session.headers["Authorization"] = f"Bearer {bearer_token}" 

59 except requests.HTTPError: 

60 log.debug("Failed to get bearer token") 

61 bearer_token = cast(str, None) 

62 # Some features work without authentication, in particular connection test. 

63 # A bit dirty, we should not pass None to `from_token`, but I do not want 

64 # to change the type hint of the public classmethod. 

65 

66 try: 

67 session = cls.from_token(base_url, bearer_token, session=session) 

68 except RuntimeError as e: 

69 # this is our custom error for jwt config errors on server 

70 log.error(e) 

71 session._get_csrf_via_session_cookie(username, password) 

72 

73 return session 

74 

75 @classmethod 

76 def from_token( 

77 cls, 

78 base_url: str, 

79 bearer_token: str, 

80 session: Self | None = None, 

81 ) -> Self: 

82 """Create a SupersetApiSession from an existing access token.""" 

83 

84 if session is None: 

85 session = cls(base_url=base_url, bearer_token=bearer_token) 

86 

87 # get csrf for api writes 

88 try: 

89 csrf_token = session._get_csrf_via_bearer() 

90 session.csrf_token = csrf_token 

91 session.headers["X-CSRFToken"] = csrf_token 

92 except requests.HTTPError: 

93 log.debug("Failed to get csrf token") 

94 return session 

95 

96 def _get_bearer_token(self, username: str, password: str) -> str: 

97 """Get a bearer access token""" 

98 log.debug("Obtaining bearer access token") 

99 res = self.post( 

100 "/api/v1/security/login", 

101 headers={"Content-Type": "application/json"}, 

102 json={ 

103 "username": username, 

104 "password": password, 

105 "provider": "db", 

106 "refresh": False, 

107 }, 

108 verify=True, 

109 ) 

110 

111 res.raise_for_status() 

112 

113 token: str = res.json()["access_token"] 

114 log.debug(f"access token algorithm {self._jwt_header(token)}") 

115 

116 return token 

117 

118 def _get_csrf_via_bearer(self) -> str: 

119 log.debug("Obtaining csrf token via bearer") 

120 res = self.get("/api/v1/security/csrf_token/") 

121 

122 try: 

123 res.raise_for_status() 

124 except requests.HTTPError: 

125 if ( 

126 res.status_code == 422 

127 and "The specified alg value is not allowed" in res.text 

128 ): 

129 raise RuntimeError( 

130 "Superset rejected the Bearer JWT: " 

131 f"{self._jwt_header(str(self.bearer_token))}." 

132 "Fix server config (e.g., JWT_ALGORITHM/allowed algorithms) " 

133 "or use cookie/session auth." 

134 ) 

135 raise 

136 

137 return res.json()["result"] 

138 

139 def _get_csrf_via_session_cookie(self, username: str, password: str) -> None: 

140 """ 

141 Try to get a working csrf token via login page and session cookie. 

142 

143 TODO: Verify, and this will likely be the path for keycloak. 

144 """ 

145 

146 log.debug("Obtaining csrf token via session cookie") 

147 res = self.get("/login/") 

148 res.raise_for_status() 

149 csrf = self._extract_csrf_from_login_html(res.text) 

150 if not csrf: 

151 raise RuntimeError( 

152 "Could not find csrf_token field on /login/ page. " 

153 "Cookie-based login fallback may not be supported/enabled." 

154 ) 

155 

156 # POST login form (Flask-AppBuilder default fields) 

157 res = self.post( 

158 "/login/", 

159 data={ 

160 "username": username, 

161 "password": password, 

162 "csrf_token": csrf, 

163 }, 

164 allow_redirects=True, 

165 headers={"Accept": "text/html,application/xhtml+xml"}, 

166 ) 

167 res.raise_for_status() 

168 

169 res = self.get("/api/v1/security/csrf_token/") 

170 res.raise_for_status() 

171 csrf = res.json()["result"] # TODO: Check, is this the same csrf as before? 

172 

173 self.headers["X-CSRFToken"] = csrf 

174 self.csrf_token = csrf 

175 

176 @classmethod 

177 def _jwt_header(cls, token: str) -> dict: 

178 header_b64 = token.split(".")[0] 

179 header_b64 += "=" * (-len(header_b64) % 4) 

180 return json.loads(base64.urlsafe_b64decode(header_b64).decode("utf-8")) 

181 

182 @classmethod 

183 def _extract_csrf_from_login_html(cls, html: str) -> str | None: 

184 """ 

185 Superset's /login/ page typically includes a CSRF token in a hidden input 

186 called "csrf_token". This is not guaranteed across all themes/versions, 

187 but works in many default deployments. 

188 

189 PS: not verified. 

190 """ 

191 

192 match = re.search(r'name="csrf_token"\s+type="hidden"\s+value="([^"]+)"', html) 

193 csrf: str | None = None 

194 if match: 

195 csrf = match.group(1) 

196 

197 return csrf