Coverage for /Users/antonigmitruk/golf/src/golf/auth/factory.py: 0%
131 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-16 18:46 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-16 18:46 +0200
1"""Factory functions for creating FastMCP authentication providers."""
3import os
4from typing import Any
6# Import these at runtime to avoid import errors during Golf installation
7from typing import TYPE_CHECKING
9if TYPE_CHECKING:
10 from fastmcp.server.auth.auth import AuthProvider
11 from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier
12from mcp.server.auth.settings import RevocationOptions
14from .providers import (
15 AuthConfig,
16 JWTAuthConfig,
17 StaticTokenConfig,
18 OAuthServerConfig,
19 RemoteAuthConfig,
20)
21from .registry import (
22 get_provider_registry,
23 create_auth_provider_from_registry,
24)
27def create_auth_provider(config: AuthConfig) -> "AuthProvider":
28 """Create a FastMCP AuthProvider from Golf auth configuration.
30 This function uses the provider registry system to allow extensibility.
31 Built-in providers are automatically registered, and custom providers
32 can be added via the registry system.
34 Args:
35 config: Golf authentication configuration
37 Returns:
38 Configured FastMCP AuthProvider instance
40 Raises:
41 ValueError: If configuration is invalid
42 ImportError: If required dependencies are missing
43 KeyError: If provider type is not registered
44 """
45 try:
46 return create_auth_provider_from_registry(config)
47 except KeyError:
48 # Fall back to legacy dispatch for backward compatibility
49 # This ensures existing code continues to work during transition
50 if config.provider_type == "jwt":
51 return _create_jwt_provider(config)
52 elif config.provider_type == "static":
53 return _create_static_provider(config)
54 elif config.provider_type == "oauth_server":
55 return _create_oauth_server_provider(config)
56 elif config.provider_type == "remote":
57 return _create_remote_provider(config)
58 else:
59 raise ValueError(f"Unknown provider type: {config.provider_type}") from None
62def _create_jwt_provider(config: JWTAuthConfig) -> "JWTVerifier":
63 """Create JWT token verifier from configuration."""
64 # Resolve runtime values from environment variables
65 public_key = config.public_key
66 if config.public_key_env_var:
67 env_value = os.environ.get(config.public_key_env_var)
68 if env_value:
69 public_key = env_value
71 jwks_uri = config.jwks_uri
72 if config.jwks_uri_env_var:
73 env_value = os.environ.get(config.jwks_uri_env_var)
74 if env_value:
75 jwks_uri = env_value
77 issuer = config.issuer
78 if config.issuer_env_var:
79 env_value = os.environ.get(config.issuer_env_var)
80 if env_value:
81 issuer = env_value
83 audience = config.audience
84 if config.audience_env_var:
85 env_value = os.environ.get(config.audience_env_var)
86 if env_value:
87 # Handle both string and comma-separated list
88 if "," in env_value:
89 audience = [s.strip() for s in env_value.split(",")]
90 else:
91 audience = env_value
93 # Validate configuration
94 if not public_key and not jwks_uri:
95 raise ValueError("Either public_key or jwks_uri must be provided for JWT verification")
97 if public_key and jwks_uri:
98 raise ValueError("Provide either public_key or jwks_uri, not both")
100 try:
101 from fastmcp.server.auth import JWTVerifier
102 except ImportError as e:
103 raise ImportError("JWTVerifier not available. Please install fastmcp>=2.11.0") from e
105 return JWTVerifier(
106 public_key=public_key,
107 jwks_uri=jwks_uri,
108 issuer=issuer,
109 audience=audience,
110 algorithm=config.algorithm,
111 required_scopes=config.required_scopes,
112 )
115def _create_static_provider(config: StaticTokenConfig) -> "StaticTokenVerifier":
116 """Create static token verifier from configuration."""
117 if not config.tokens:
118 raise ValueError("Static token provider requires at least one token")
120 try:
121 from fastmcp.server.auth import StaticTokenVerifier
122 except ImportError as e:
123 raise ImportError("StaticTokenVerifier not available. Please install fastmcp>=2.11.0") from e
125 return StaticTokenVerifier(
126 tokens=config.tokens,
127 required_scopes=config.required_scopes,
128 )
131def _create_oauth_server_provider(config: OAuthServerConfig) -> "AuthProvider":
132 """Create OAuth authorization server provider from configuration."""
133 try:
134 from fastmcp.server.auth import OAuthProvider
135 except ImportError as e:
136 raise ImportError(
137 "OAuthProvider not available in this FastMCP version. Please upgrade to FastMCP 2.11.0 or later."
138 ) from e
140 # Resolve runtime values from environment variables with validation
141 base_url = config.base_url
142 if config.base_url_env_var:
143 env_value = os.environ.get(config.base_url_env_var)
144 if env_value:
145 # Apply the same validation as the config field to env var value
146 try:
147 from urllib.parse import urlparse
149 env_value = env_value.strip()
150 parsed = urlparse(env_value)
152 if not parsed.scheme or not parsed.netloc:
153 raise ValueError(
154 f"Invalid base URL from environment variable {config.base_url_env_var}: '{env_value}'"
155 )
157 if parsed.scheme not in ("http", "https"):
158 raise ValueError(f"Base URL from environment must use http/https: '{env_value}'")
160 # Production HTTPS check
161 is_production = (
162 os.environ.get("GOLF_ENV", "").lower() in ("prod", "production")
163 or os.environ.get("NODE_ENV", "").lower() == "production"
164 or os.environ.get("ENVIRONMENT", "").lower() in ("prod", "production")
165 )
167 if is_production and parsed.scheme == "http":
168 raise ValueError(f"Base URL must use HTTPS in production: '{env_value}'")
170 base_url = env_value
172 except Exception as e:
173 raise ValueError(f"Invalid base URL from environment variable {config.base_url_env_var}: {e}") from e
175 # Additional security validations before creating provider
176 from urllib.parse import urlparse
178 # Validate final base_url
179 parsed_base = urlparse(base_url)
180 if not parsed_base.scheme or not parsed_base.netloc:
181 raise ValueError(f"Invalid base URL: '{base_url}'")
183 # Security check: prevent localhost in production
184 is_production = (
185 os.environ.get("GOLF_ENV", "").lower() in ("prod", "production")
186 or os.environ.get("NODE_ENV", "").lower() == "production"
187 or os.environ.get("ENVIRONMENT", "").lower() in ("prod", "production")
188 )
190 if is_production and parsed_base.hostname in ("localhost", "127.0.0.1", "0.0.0.0"):
191 raise ValueError(f"Cannot use localhost/loopback addresses in production: '{base_url}'")
193 # Client registration options - always disabled for security
194 client_reg_options = None
196 # Create revocation options
197 revocation_options = None
198 if config.allow_token_revocation:
199 revocation_options = RevocationOptions(enabled=True)
201 return OAuthProvider(
202 base_url=base_url,
203 issuer_url=config.issuer_url,
204 service_documentation_url=config.service_documentation_url,
205 client_registration_options=client_reg_options,
206 revocation_options=revocation_options,
207 required_scopes=config.required_scopes,
208 )
211def _create_remote_provider(config: RemoteAuthConfig) -> "AuthProvider":
212 """Create remote auth provider from configuration."""
213 try:
214 from fastmcp.server.auth import RemoteAuthProvider
215 except ImportError as e:
216 raise ImportError(
217 "RemoteAuthProvider not available in this FastMCP version. Please upgrade to FastMCP 2.11.0 or later."
218 ) from e
220 # Resolve runtime values from environment variables
221 authorization_servers = config.authorization_servers
222 if config.authorization_servers_env_var:
223 env_value = os.environ.get(config.authorization_servers_env_var)
224 if env_value:
225 # Split comma-separated values and strip whitespace
226 authorization_servers = [s.strip() for s in env_value.split(",")]
228 resource_server_url = config.resource_server_url
229 if config.resource_server_url_env_var:
230 env_value = os.environ.get(config.resource_server_url_env_var)
231 if env_value:
232 resource_server_url = env_value
234 # Create the underlying token verifier
235 token_verifier = create_auth_provider(config.token_verifier_config)
237 # Ensure it's actually a TokenVerifier
238 if not hasattr(token_verifier, "verify_token"):
239 raise ValueError(f"Remote auth provider requires a TokenVerifier, got {type(token_verifier).__name__}")
241 return RemoteAuthProvider(
242 token_verifier=token_verifier,
243 authorization_servers=authorization_servers,
244 resource_server_url=resource_server_url,
245 )
248def create_simple_jwt_provider(
249 *,
250 jwks_uri: str | None = None,
251 public_key: str | None = None,
252 issuer: str | None = None,
253 audience: str | list[str] | None = None,
254 required_scopes: list[str] | None = None,
255) -> "JWTVerifier":
256 """Create a simple JWT provider for common use cases.
258 This is a convenience function for creating JWT providers without
259 having to construct the full configuration objects.
261 Args:
262 jwks_uri: JWKS URI for key fetching
263 public_key: Static public key (PEM format)
264 issuer: Expected issuer claim
265 audience: Expected audience claim(s)
266 required_scopes: Required scopes for all requests
268 Returns:
269 Configured JWTVerifier instance
270 """
271 config = JWTAuthConfig(
272 jwks_uri=jwks_uri,
273 public_key=public_key,
274 issuer=issuer,
275 audience=audience,
276 required_scopes=required_scopes or [],
277 )
278 return _create_jwt_provider(config)
281def create_dev_token_provider(
282 tokens: dict[str, Any] | None = None,
283 required_scopes: list[str] | None = None,
284) -> "StaticTokenVerifier":
285 """Create a static token provider for development.
287 Args:
288 tokens: Token dictionary or None for default dev tokens
289 required_scopes: Required scopes for all requests
291 Returns:
292 Configured StaticTokenVerifier instance
293 """
294 if tokens is None:
295 # Default development tokens
296 tokens = {
297 "dev-token-123": {
298 "client_id": "dev-client",
299 "scopes": ["read", "write"],
300 },
301 "admin-token-456": {
302 "client_id": "admin-client",
303 "scopes": ["read", "write", "admin"],
304 },
305 }
307 config = StaticTokenConfig(
308 tokens=tokens,
309 required_scopes=required_scopes or [],
310 )
311 return _create_static_provider(config)
314def register_builtin_providers() -> None:
315 """Register built-in authentication providers in the registry.
317 This function registers the standard Golf authentication providers:
318 - jwt: JWT token verification
319 - static: Static token verification (development)
320 - oauth_server: Full OAuth authorization server
321 - remote: Remote authorization server integration
322 """
323 registry = get_provider_registry()
325 # Register built-in provider factories
326 registry.register_factory("jwt", _create_jwt_provider)
327 registry.register_factory("static", _create_static_provider)
328 registry.register_factory("oauth_server", _create_oauth_server_provider)
329 registry.register_factory("remote", _create_remote_provider)
332# Register built-in providers when module is imported
333register_builtin_providers()