Coverage for /Users/antonigmitruk/golf/src/golf/auth/factory.py: 0%

131 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-16 18:46 +0200

1"""Factory functions for creating FastMCP authentication providers.""" 

2 

3import os 

4from typing import Any 

5 

6# Import these at runtime to avoid import errors during Golf installation 

7from typing import TYPE_CHECKING 

8 

9if TYPE_CHECKING: 

10 from fastmcp.server.auth.auth import AuthProvider 

11 from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier 

12from mcp.server.auth.settings import RevocationOptions 

13 

14from .providers import ( 

15 AuthConfig, 

16 JWTAuthConfig, 

17 StaticTokenConfig, 

18 OAuthServerConfig, 

19 RemoteAuthConfig, 

20) 

21from .registry import ( 

22 get_provider_registry, 

23 create_auth_provider_from_registry, 

24) 

25 

26 

27def create_auth_provider(config: AuthConfig) -> "AuthProvider": 

28 """Create a FastMCP AuthProvider from Golf auth configuration. 

29 

30 This function uses the provider registry system to allow extensibility. 

31 Built-in providers are automatically registered, and custom providers 

32 can be added via the registry system. 

33 

34 Args: 

35 config: Golf authentication configuration 

36 

37 Returns: 

38 Configured FastMCP AuthProvider instance 

39 

40 Raises: 

41 ValueError: If configuration is invalid 

42 ImportError: If required dependencies are missing 

43 KeyError: If provider type is not registered 

44 """ 

45 try: 

46 return create_auth_provider_from_registry(config) 

47 except KeyError: 

48 # Fall back to legacy dispatch for backward compatibility 

49 # This ensures existing code continues to work during transition 

50 if config.provider_type == "jwt": 

51 return _create_jwt_provider(config) 

52 elif config.provider_type == "static": 

53 return _create_static_provider(config) 

54 elif config.provider_type == "oauth_server": 

55 return _create_oauth_server_provider(config) 

56 elif config.provider_type == "remote": 

57 return _create_remote_provider(config) 

58 else: 

59 raise ValueError(f"Unknown provider type: {config.provider_type}") from None 

60 

61 

62def _create_jwt_provider(config: JWTAuthConfig) -> "JWTVerifier": 

63 """Create JWT token verifier from configuration.""" 

64 # Resolve runtime values from environment variables 

65 public_key = config.public_key 

66 if config.public_key_env_var: 

67 env_value = os.environ.get(config.public_key_env_var) 

68 if env_value: 

69 public_key = env_value 

70 

71 jwks_uri = config.jwks_uri 

72 if config.jwks_uri_env_var: 

73 env_value = os.environ.get(config.jwks_uri_env_var) 

74 if env_value: 

75 jwks_uri = env_value 

76 

77 issuer = config.issuer 

78 if config.issuer_env_var: 

79 env_value = os.environ.get(config.issuer_env_var) 

80 if env_value: 

81 issuer = env_value 

82 

83 audience = config.audience 

84 if config.audience_env_var: 

85 env_value = os.environ.get(config.audience_env_var) 

86 if env_value: 

87 # Handle both string and comma-separated list 

88 if "," in env_value: 

89 audience = [s.strip() for s in env_value.split(",")] 

90 else: 

91 audience = env_value 

92 

93 # Validate configuration 

94 if not public_key and not jwks_uri: 

95 raise ValueError("Either public_key or jwks_uri must be provided for JWT verification") 

96 

97 if public_key and jwks_uri: 

98 raise ValueError("Provide either public_key or jwks_uri, not both") 

99 

100 try: 

101 from fastmcp.server.auth import JWTVerifier 

102 except ImportError as e: 

103 raise ImportError("JWTVerifier not available. Please install fastmcp>=2.11.0") from e 

104 

105 return JWTVerifier( 

106 public_key=public_key, 

107 jwks_uri=jwks_uri, 

108 issuer=issuer, 

109 audience=audience, 

110 algorithm=config.algorithm, 

111 required_scopes=config.required_scopes, 

112 ) 

113 

114 

115def _create_static_provider(config: StaticTokenConfig) -> "StaticTokenVerifier": 

116 """Create static token verifier from configuration.""" 

117 if not config.tokens: 

118 raise ValueError("Static token provider requires at least one token") 

119 

120 try: 

121 from fastmcp.server.auth import StaticTokenVerifier 

122 except ImportError as e: 

123 raise ImportError("StaticTokenVerifier not available. Please install fastmcp>=2.11.0") from e 

124 

125 return StaticTokenVerifier( 

126 tokens=config.tokens, 

127 required_scopes=config.required_scopes, 

128 ) 

129 

130 

131def _create_oauth_server_provider(config: OAuthServerConfig) -> "AuthProvider": 

132 """Create OAuth authorization server provider from configuration.""" 

133 try: 

134 from fastmcp.server.auth import OAuthProvider 

135 except ImportError as e: 

136 raise ImportError( 

137 "OAuthProvider not available in this FastMCP version. Please upgrade to FastMCP 2.11.0 or later." 

138 ) from e 

139 

140 # Resolve runtime values from environment variables with validation 

141 base_url = config.base_url 

142 if config.base_url_env_var: 

143 env_value = os.environ.get(config.base_url_env_var) 

144 if env_value: 

145 # Apply the same validation as the config field to env var value 

146 try: 

147 from urllib.parse import urlparse 

148 

149 env_value = env_value.strip() 

150 parsed = urlparse(env_value) 

151 

152 if not parsed.scheme or not parsed.netloc: 

153 raise ValueError( 

154 f"Invalid base URL from environment variable {config.base_url_env_var}: '{env_value}'" 

155 ) 

156 

157 if parsed.scheme not in ("http", "https"): 

158 raise ValueError(f"Base URL from environment must use http/https: '{env_value}'") 

159 

160 # Production HTTPS check 

161 is_production = ( 

162 os.environ.get("GOLF_ENV", "").lower() in ("prod", "production") 

163 or os.environ.get("NODE_ENV", "").lower() == "production" 

164 or os.environ.get("ENVIRONMENT", "").lower() in ("prod", "production") 

165 ) 

166 

167 if is_production and parsed.scheme == "http": 

168 raise ValueError(f"Base URL must use HTTPS in production: '{env_value}'") 

169 

170 base_url = env_value 

171 

172 except Exception as e: 

173 raise ValueError(f"Invalid base URL from environment variable {config.base_url_env_var}: {e}") from e 

174 

175 # Additional security validations before creating provider 

176 from urllib.parse import urlparse 

177 

178 # Validate final base_url 

179 parsed_base = urlparse(base_url) 

180 if not parsed_base.scheme or not parsed_base.netloc: 

181 raise ValueError(f"Invalid base URL: '{base_url}'") 

182 

183 # Security check: prevent localhost in production 

184 is_production = ( 

185 os.environ.get("GOLF_ENV", "").lower() in ("prod", "production") 

186 or os.environ.get("NODE_ENV", "").lower() == "production" 

187 or os.environ.get("ENVIRONMENT", "").lower() in ("prod", "production") 

188 ) 

189 

190 if is_production and parsed_base.hostname in ("localhost", "127.0.0.1", "0.0.0.0"): 

191 raise ValueError(f"Cannot use localhost/loopback addresses in production: '{base_url}'") 

192 

193 # Client registration options - always disabled for security 

194 client_reg_options = None 

195 

196 # Create revocation options 

197 revocation_options = None 

198 if config.allow_token_revocation: 

199 revocation_options = RevocationOptions(enabled=True) 

200 

201 return OAuthProvider( 

202 base_url=base_url, 

203 issuer_url=config.issuer_url, 

204 service_documentation_url=config.service_documentation_url, 

205 client_registration_options=client_reg_options, 

206 revocation_options=revocation_options, 

207 required_scopes=config.required_scopes, 

208 ) 

209 

210 

211def _create_remote_provider(config: RemoteAuthConfig) -> "AuthProvider": 

212 """Create remote auth provider from configuration.""" 

213 try: 

214 from fastmcp.server.auth import RemoteAuthProvider 

215 except ImportError as e: 

216 raise ImportError( 

217 "RemoteAuthProvider not available in this FastMCP version. Please upgrade to FastMCP 2.11.0 or later." 

218 ) from e 

219 

220 # Resolve runtime values from environment variables 

221 authorization_servers = config.authorization_servers 

222 if config.authorization_servers_env_var: 

223 env_value = os.environ.get(config.authorization_servers_env_var) 

224 if env_value: 

225 # Split comma-separated values and strip whitespace 

226 authorization_servers = [s.strip() for s in env_value.split(",")] 

227 

228 resource_server_url = config.resource_server_url 

229 if config.resource_server_url_env_var: 

230 env_value = os.environ.get(config.resource_server_url_env_var) 

231 if env_value: 

232 resource_server_url = env_value 

233 

234 # Create the underlying token verifier 

235 token_verifier = create_auth_provider(config.token_verifier_config) 

236 

237 # Ensure it's actually a TokenVerifier 

238 if not hasattr(token_verifier, "verify_token"): 

239 raise ValueError(f"Remote auth provider requires a TokenVerifier, got {type(token_verifier).__name__}") 

240 

241 return RemoteAuthProvider( 

242 token_verifier=token_verifier, 

243 authorization_servers=authorization_servers, 

244 resource_server_url=resource_server_url, 

245 ) 

246 

247 

248def create_simple_jwt_provider( 

249 *, 

250 jwks_uri: str | None = None, 

251 public_key: str | None = None, 

252 issuer: str | None = None, 

253 audience: str | list[str] | None = None, 

254 required_scopes: list[str] | None = None, 

255) -> "JWTVerifier": 

256 """Create a simple JWT provider for common use cases. 

257 

258 This is a convenience function for creating JWT providers without 

259 having to construct the full configuration objects. 

260 

261 Args: 

262 jwks_uri: JWKS URI for key fetching 

263 public_key: Static public key (PEM format) 

264 issuer: Expected issuer claim 

265 audience: Expected audience claim(s) 

266 required_scopes: Required scopes for all requests 

267 

268 Returns: 

269 Configured JWTVerifier instance 

270 """ 

271 config = JWTAuthConfig( 

272 jwks_uri=jwks_uri, 

273 public_key=public_key, 

274 issuer=issuer, 

275 audience=audience, 

276 required_scopes=required_scopes or [], 

277 ) 

278 return _create_jwt_provider(config) 

279 

280 

281def create_dev_token_provider( 

282 tokens: dict[str, Any] | None = None, 

283 required_scopes: list[str] | None = None, 

284) -> "StaticTokenVerifier": 

285 """Create a static token provider for development. 

286 

287 Args: 

288 tokens: Token dictionary or None for default dev tokens 

289 required_scopes: Required scopes for all requests 

290 

291 Returns: 

292 Configured StaticTokenVerifier instance 

293 """ 

294 if tokens is None: 

295 # Default development tokens 

296 tokens = { 

297 "dev-token-123": { 

298 "client_id": "dev-client", 

299 "scopes": ["read", "write"], 

300 }, 

301 "admin-token-456": { 

302 "client_id": "admin-client", 

303 "scopes": ["read", "write", "admin"], 

304 }, 

305 } 

306 

307 config = StaticTokenConfig( 

308 tokens=tokens, 

309 required_scopes=required_scopes or [], 

310 ) 

311 return _create_static_provider(config) 

312 

313 

314def register_builtin_providers() -> None: 

315 """Register built-in authentication providers in the registry. 

316 

317 This function registers the standard Golf authentication providers: 

318 - jwt: JWT token verification 

319 - static: Static token verification (development) 

320 - oauth_server: Full OAuth authorization server 

321 - remote: Remote authorization server integration 

322 """ 

323 registry = get_provider_registry() 

324 

325 # Register built-in provider factories 

326 registry.register_factory("jwt", _create_jwt_provider) 

327 registry.register_factory("static", _create_static_provider) 

328 registry.register_factory("oauth_server", _create_oauth_server_provider) 

329 registry.register_factory("remote", _create_remote_provider) 

330 

331 

332# Register built-in providers when module is imported 

333register_builtin_providers()