Coverage for src / superset_io / session.py: 99%
97 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 13:07 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 13:07 +0100
1from __future__ import annotations
3import base64
4import json
5import logging
6import re
7from typing import Self, cast
9import requests
11log = logging.getLogger("superset_io")
14class SupersetApiSession(requests.Session):
15 base_url: str
16 bearer_token: str | None
17 csrf_token: str | None
19 def __init__(
20 self,
21 base_url: str,
22 bearer_token: str | None = None,
23 csrf_token: str | None = None,
24 *args,
25 **kwargs,
26 ):
27 super().__init__(*args, **kwargs)
28 self.base_url = base_url
29 self.bearer_token = bearer_token
30 self.csrf_token = csrf_token
32 headers = {"User-Agent": "coasti-superset-import-export/1.0.0"}
33 if bearer_token:
34 headers["Authorization"] = f"Bearer {bearer_token}"
35 if csrf_token:
36 headers["X-CSRFToken"] = csrf_token
38 self.headers.update(headers)
40 def request(self, method: str | bytes, url: str | bytes, *args, **kwargs):
41 if isinstance(url, str) and not url.startswith("http"):
42 url = f"{self.base_url}{url}"
43 return super().request(method, url, *args, **kwargs)
45 @classmethod
46 def from_credentials(
47 cls,
48 base_url: str,
49 username: str,
50 password: str,
51 ) -> Self:
52 """Authenticate and return an authenticated SupersetApiSession."""
53 session = cls(base_url=base_url)
55 try:
56 bearer_token = session._get_bearer_token(username, password)
57 session.bearer_token = bearer_token
58 session.headers["Authorization"] = f"Bearer {bearer_token}"
59 except requests.HTTPError:
60 log.debug("Failed to get bearer token")
61 bearer_token = cast(str, None)
62 # Some features work without authentication, in particular connection test.
63 # A bit dirty, we should not pass None to `from_token`, but I do not want
64 # to change the type hint of the public classmethod.
66 try:
67 session = cls.from_token(base_url, bearer_token, session=session)
68 except RuntimeError as e:
69 # this is our custom error for jwt config errors on server
70 log.error(e)
71 session._get_csrf_via_session_cookie(username, password)
73 return session
75 @classmethod
76 def from_token(
77 cls,
78 base_url: str,
79 bearer_token: str,
80 session: Self | None = None,
81 ) -> Self:
82 """Create a SupersetApiSession from an existing access token."""
84 if session is None:
85 session = cls(base_url=base_url, bearer_token=bearer_token)
87 # get csrf for api writes
88 try:
89 csrf_token = session._get_csrf_via_bearer()
90 session.csrf_token = csrf_token
91 session.headers["X-CSRFToken"] = csrf_token
92 except requests.HTTPError:
93 log.debug("Failed to get csrf token")
94 return session
96 def _get_bearer_token(self, username: str, password: str) -> str:
97 """Get a bearer access token"""
98 log.debug("Obtaining bearer access token")
99 res = self.post(
100 "/api/v1/security/login",
101 headers={"Content-Type": "application/json"},
102 json={
103 "username": username,
104 "password": password,
105 "provider": "db",
106 "refresh": False,
107 },
108 verify=True,
109 )
111 res.raise_for_status()
113 token: str = res.json()["access_token"]
114 log.debug(f"access token algorithm {self._jwt_header(token)}")
116 return token
118 def _get_csrf_via_bearer(self) -> str:
119 log.debug("Obtaining csrf token via bearer")
120 res = self.get("/api/v1/security/csrf_token/")
122 try:
123 res.raise_for_status()
124 except requests.HTTPError:
125 if (
126 res.status_code == 422
127 and "The specified alg value is not allowed" in res.text
128 ):
129 raise RuntimeError(
130 "Superset rejected the Bearer JWT: "
131 f"{self._jwt_header(str(self.bearer_token))}."
132 "Fix server config (e.g., JWT_ALGORITHM/allowed algorithms) "
133 "or use cookie/session auth."
134 )
135 raise
137 return res.json()["result"]
139 def _get_csrf_via_session_cookie(self, username: str, password: str) -> None:
140 """
141 Try to get a working csrf token via login page and session cookie.
143 TODO: Verify, and this will likely be the path for keycloak.
144 """
146 log.debug("Obtaining csrf token via session cookie")
147 res = self.get("/login/")
148 res.raise_for_status()
149 csrf = self._extract_csrf_from_login_html(res.text)
150 if not csrf:
151 raise RuntimeError(
152 "Could not find csrf_token field on /login/ page. "
153 "Cookie-based login fallback may not be supported/enabled."
154 )
156 # POST login form (Flask-AppBuilder default fields)
157 res = self.post(
158 "/login/",
159 data={
160 "username": username,
161 "password": password,
162 "csrf_token": csrf,
163 },
164 allow_redirects=True,
165 headers={"Accept": "text/html,application/xhtml+xml"},
166 )
167 res.raise_for_status()
169 res = self.get("/api/v1/security/csrf_token/")
170 res.raise_for_status()
171 csrf = res.json()["result"] # TODO: Check, is this the same csrf as before?
173 self.headers["X-CSRFToken"] = csrf
174 self.csrf_token = csrf
176 @classmethod
177 def _jwt_header(cls, token: str) -> dict:
178 header_b64 = token.split(".")[0]
179 header_b64 += "=" * (-len(header_b64) % 4)
180 return json.loads(base64.urlsafe_b64decode(header_b64).decode("utf-8"))
182 @classmethod
183 def _extract_csrf_from_login_html(cls, html: str) -> str | None:
184 """
185 Superset's /login/ page typically includes a CSRF token in a hidden input
186 called "csrf_token". This is not guaranteed across all themes/versions,
187 but works in many default deployments.
189 PS: not verified.
190 """
192 match = re.search(r'name="csrf_token"\s+type="hidden"\s+value="([^"]+)"', html)
193 csrf: str | None = None
194 if match:
195 csrf = match.group(1)
197 return csrf