.. _permissions-provider:

Permissions Provider
====================

The Permissions Provider system in auth-middleware enables fine-grained authorization by retrieving user permissions from various sources. This allows you to implement detailed access control beyond simple role-based systems.

Overview
========

Permissions providers implement the ``PermissionsProvider`` interface and are responsible for fetching user permissions based on JWT token information. The middleware uses these permissions for granular access control to specific resources and actions.

.. note::
   Permissions provide more granular control than groups. While groups typically represent roles (admin, user), permissions represent specific actions (read:posts, write:comments, delete:users).

Built-in Providers
==================

SqlPermissionsProvider
----------------------

Retrieves permissions from a SQL database using SQLAlchemy.

**Features:**
- Stores user permissions in database
- Supports multiple database backends (PostgreSQL, MySQL, SQLite)
- Async database operations
- Configurable database connection

**Database Schema:**

.. code-block:: sql

   CREATE TABLE authz_permissions (
       id VARCHAR(27) PRIMARY KEY,
       username VARCHAR(500) NOT NULL,
       permission VARCHAR(100) NOT NULL
   );

   CREATE INDEX idx_authz_permissions_username ON authz_permissions(username);

**Usage:**

.. code-block:: python

   from auth_middleware.providers.authz.sql_permissions_provider import SqlPermissionsProvider
   from auth_middleware.providers.authz.async_database import AsyncDatabase
   from auth_middleware.providers.authz.async_database_settings import AsyncDatabaseSettings

   # Configure database connection
   db_settings = AsyncDatabaseSettings(
       database_url="postgresql+asyncpg://user:pass@localhost/mydb"
   )
   AsyncDatabase.configure(db_settings)

   # Configure the permissions provider
   permissions_provider = SqlPermissionsProvider()

   # Add middleware with permissions provider
   app.add_middleware(
       JwtAuthMiddleware,
       auth_provider=auth_provider,
       permissions_provider=permissions_provider,
   )

**Managing Permissions:**

Add permissions to users by inserting records:

.. code-block:: python

   from auth_middleware.providers.authz.sql_permissions_provider import PermissionsModel
   from auth_middleware.providers.authz.async_database import AsyncDatabase

   async def grant_permission(username: str, permission: str):
       async with AsyncDatabase.get_session() as session:
           permission_record = PermissionsModel(username=username, permission=permission)
           session.add(permission_record)
           await session.commit()

   async def revoke_permission(username: str, permission: str):
       async with AsyncDatabase.get_session() as session:
           query = select(PermissionsModel).filter(
               PermissionsModel.username == username,
               PermissionsModel.permission == permission
           )
           result = await session.execute(query)
           permission_record = result.scalar_one_or_none()
           
           if permission_record:
               await session.delete(permission_record)
               await session.commit()

   # Example usage
   await grant_permission("john.doe", "read:posts")
   await grant_permission("john.doe", "write:posts")
   await grant_permission("admin", "delete:posts")

Using Permissions in Your Application
=====================================

Once configured, permissions are automatically available in your endpoints:

.. code-block:: python

   from fastapi import Depends, FastAPI
   from auth_middleware.functions import require_permissions, get_current_user
   from auth_middleware.types.user import User

   app = FastAPI()

   @app.get("/posts")
   async def read_posts(user: User = Depends(require_permissions("read:posts"))):
       return {"posts": [...]}

   @app.post("/posts")
   async def create_post(user: User = Depends(require_permissions("write:posts"))):
       return {"message": "Post created"}

   @app.delete("/posts/{post_id}")
   async def delete_post(
       post_id: int,
       user: User = Depends(require_permissions("delete:posts"))
   ):
       return {"message": f"Post {post_id} deleted"}

   @app.get("/user-permissions")
   async def user_permissions(user: User = Depends(get_current_user())):
       # Access permissions directly
       permissions = await user.permissions
       return {"username": user.username, "permissions": permissions}

   @app.get("/admin-posts")
   async def admin_posts(
       user: User = Depends(require_permissions(["read:posts", "admin:posts"]))
   ):
       return {"message": "Admin access to posts"}

Permission Patterns
===================

**Common Permission Formats:**

.. code-block:: python

   # Resource-based permissions
   "read:posts"
   "write:posts"
   "delete:posts"
   
   # Action-based permissions
   "create:user"
   "update:user"
   "delete:user"
   
   # Hierarchical permissions
   "admin:system"
   "admin:users"
   "admin:posts"
   
   # Fine-grained permissions
   "read:posts:published"
   "write:posts:draft"
   "approve:posts:pending"

**Permission Inheritance:**

.. code-block:: python

   class HierarchicalPermissionsProvider(PermissionsProvider):
       """Permissions provider with inheritance support."""
       
       PERMISSION_HIERARCHY = {
           "admin": ["admin:*"],
           "admin:posts": ["read:posts", "write:posts", "delete:posts"],
           "admin:users": ["read:users", "write:users", "delete:users"],
           "editor": ["read:posts", "write:posts"],
           "viewer": ["read:posts"]
       }
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch permissions with hierarchy resolution."""
           # Get base permissions
           base_permissions = await self._fetch_base_permissions(token)
           
           # Expand hierarchical permissions
           expanded_permissions = set()
           for permission in base_permissions:
               expanded_permissions.add(permission)
               
               # Add inherited permissions
               if permission in self.PERMISSION_HIERARCHY:
                   expanded_permissions.update(self.PERMISSION_HIERARCHY[permission])
           
           # Handle wildcard permissions
           final_permissions = []
           for permission in expanded_permissions:
               if permission.endswith(":*"):
                   # Grant all permissions for this resource
                   resource = permission[:-2]
                   final_permissions.extend(self._get_all_permissions_for_resource(resource))
               else:
                   final_permissions.append(permission)
           
           return list(set(final_permissions))

Custom Permissions Provider
===========================

Create custom permissions providers by implementing the ``PermissionsProvider`` interface:

**Basic Implementation:**

.. code-block:: python

   from auth_middleware.providers.authz.permissions_provider import PermissionsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class CustomPermissionsProvider(PermissionsProvider):
       """Custom permissions provider implementation."""
       
       def __init__(self, api_client):
           self.api_client = api_client
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch permissions from custom source."""
           username = token.claims.get("username")
           
           # Implement your custom logic here
           permissions = await self.api_client.get_user_permissions(username)
           
           return permissions

**Redis Permissions Provider:**

.. code-block:: python

   import json
   import redis.asyncio as redis
   from auth_middleware.providers.authz.permissions_provider import PermissionsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class RedisPermissionsProvider(PermissionsProvider):
       """Permissions provider using Redis for storage."""
       
       def __init__(self, redis_url: str = "redis://localhost:6379"):
           self.redis_url = redis_url
           self._redis = None
       
       async def _get_redis(self):
           if self._redis is None:
               self._redis = redis.from_url(self.redis_url)
           return self._redis
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch permissions from Redis."""
           username = token.claims.get("username")
           if not username:
               return []
           
           redis_client = await self._get_redis()
           
           # Get direct permissions
           direct_permissions = await redis_client.smembers(f"user_permissions:{username}")
           
           # Get role-based permissions
           user_roles = await redis_client.smembers(f"user_roles:{username}")
           role_permissions = []
           
           for role in user_roles:
               role_perms = await redis_client.smembers(f"role_permissions:{role}")
               role_permissions.extend(role_perms)
           
           # Combine and deduplicate
           all_permissions = list(set(direct_permissions) | set(role_permissions))
           
           return [perm.decode() if isinstance(perm, bytes) else perm for perm in all_permissions]

**JWT Claims-based Provider:**

.. code-block:: python

   from auth_middleware.providers.authz.permissions_provider import PermissionsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class JwtPermissionsProvider(PermissionsProvider):
       """Extract permissions directly from JWT claims."""
       
       def __init__(self, permissions_claim: str = "permissions"):
           self.permissions_claim = permissions_claim
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Extract permissions from JWT claims."""
           permissions = token.claims.get(self.permissions_claim, [])
           
           # Handle different claim formats
           if isinstance(permissions, str):
               # Space-separated permissions
               return permissions.split()
           elif isinstance(permissions, list):
               # List of permissions
               return permissions
           else:
               return []

**API-based Permissions Provider:**

.. code-block:: python

   import httpx
   from auth_middleware.providers.authz.permissions_provider import PermissionsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class ApiPermissionsProvider(PermissionsProvider):
       """Permissions provider using external API."""
       
       def __init__(self, api_base_url: str, api_key: str, timeout: int = 10):
           self.api_base_url = api_base_url
           self.api_key = api_key
           self.timeout = timeout
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch permissions from external API."""
           username = token.claims.get("username")
           user_id = token.claims.get("sub")
           
           if not username and not user_id:
               return []
           
           identifier = username or user_id
           
           async with httpx.AsyncClient(timeout=self.timeout) as client:
               try:
                   response = await client.get(
                       f"{self.api_base_url}/users/{identifier}/permissions",
                       headers={"Authorization": f"Bearer {self.api_key}"}
                   )
                   response.raise_for_status()
                   
                   data = response.json()
                   return data.get("permissions", [])
               
               except httpx.HTTPError as e:
                   # Log error and return empty permissions
                   logger.error(f"Failed to fetch permissions: {e}")
                   return []

**File-based Permissions Provider:**

.. code-block:: python

   import json
   import os
   from pathlib import Path
   from auth_middleware.providers.authz.permissions_provider import PermissionsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class FilePermissionsProvider(PermissionsProvider):
       """Permissions provider using JSON file storage."""
       
       def __init__(self, permissions_file: str = "permissions.json"):
           self.permissions_file = Path(permissions_file)
           self._permissions_cache = None
           self._last_modified = None
       
       async def _load_permissions(self):
           """Load permissions from file with caching."""
           if not self.permissions_file.exists():
               return {}
           
           current_modified = self.permissions_file.stat().st_mtime
           
           if (self._permissions_cache is None or 
               self._last_modified != current_modified):
               
               with open(self.permissions_file) as f:
                   self._permissions_cache = json.load(f)
               self._last_modified = current_modified
           
           return self._permissions_cache
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch permissions from JSON file."""
           username = token.claims.get("username")
           if not username:
               return []
           
           permissions_data = await self._load_permissions()
           
           # Get direct user permissions
           user_permissions = permissions_data.get("users", {}).get(username, [])
           
           # Get role-based permissions
           user_roles = permissions_data.get("user_roles", {}).get(username, [])
           role_permissions = []
           
           for role in user_roles:
               role_perms = permissions_data.get("roles", {}).get(role, [])
               role_permissions.extend(role_perms)
           
           # Combine and deduplicate
           all_permissions = list(set(user_permissions + role_permissions))
           
           return all_permissions

**Example permissions.json file:**

.. code-block:: json

   {
     "users": {
       "admin": ["admin:*"],
       "john.doe": ["read:posts", "write:posts"]
     },
     "roles": {
       "editor": ["read:posts", "write:posts", "edit:posts"],
       "viewer": ["read:posts"],
       "admin": ["admin:*"]
     },
     "user_roles": {
       "john.doe": ["editor"],
       "jane.smith": ["viewer"],
       "admin": ["admin"]
     }
   }

Advanced Features
=================

**Conditional Permissions:**

.. code-block:: python

   class ConditionalPermissionsProvider(PermissionsProvider):
       """Permissions provider with conditional logic."""
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch permissions with conditional logic."""
           username = token.claims.get("username")
           permissions = await self._get_base_permissions(username)
           
           # Add time-based permissions
           current_hour = datetime.now().hour
           if 9 <= current_hour <= 17:  # Business hours
               permissions.append("business_hours:access")
           
           # Add location-based permissions (from token claims)
           location = token.claims.get("location")
           if location == "headquarters":
               permissions.append("onsite:access")
           
           # Add temporary permissions
           temp_permissions = await self._get_temporary_permissions(username)
           permissions.extend(temp_permissions)
           
           return permissions

**Cached Permissions Provider:**

.. code-block:: python

   import asyncio
   from datetime import datetime, timedelta
   from auth_middleware.providers.authz.permissions_provider import PermissionsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class CachedPermissionsProvider(PermissionsProvider):
       """Permissions provider with caching support."""
       
       def __init__(self, base_provider: PermissionsProvider, cache_ttl: int = 300):
           self.base_provider = base_provider
           self.cache_ttl = cache_ttl
           self._cache = {}
       
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch permissions with caching."""
           username = token.claims.get("username")
           cache_key = f"permissions:{username}"
           
           # Check cache
           if cache_key in self._cache:
               cached_data, timestamp = self._cache[cache_key]
               if datetime.now() - timestamp < timedelta(seconds=self.cache_ttl):
                   return cached_data
           
           # Fetch from base provider
           permissions = await self.base_provider.fetch_permissions(token)
           
           # Cache result
           self._cache[cache_key] = (permissions, datetime.now())
           
           return permissions
       
       def clear_cache(self, username: str = None):
           """Clear cache for specific user or all users."""
           if username:
               cache_key = f"permissions:{username}"
               self._cache.pop(cache_key, None)
           else:
               self._cache.clear()

Permission Management
====================

**Permission Management API:**

.. code-block:: python

   from fastapi import FastAPI, Depends, HTTPException
   from pydantic import BaseModel
   from auth_middleware.functions import require_permissions

   app = FastAPI()

   class PermissionRequest(BaseModel):
       username: str
       permission: str

   @app.post("/api/permissions/grant")
   async def grant_permission(
       request: PermissionRequest,
       admin: User = Depends(require_permissions("admin:permissions"))
   ):
       """Grant permission to user."""
       await grant_permission(request.username, request.permission)
       return {"message": f"Permission {request.permission} granted to {request.username}"}

   @app.post("/api/permissions/revoke")
   async def revoke_permission(
       request: PermissionRequest,
       admin: User = Depends(require_permissions("admin:permissions"))
   ):
       """Revoke permission from user."""
       await revoke_permission(request.username, request.permission)
       return {"message": f"Permission {request.permission} revoked from {request.username}"}

   @app.get("/api/permissions/{username}")
   async def get_user_permissions(
       username: str,
       admin: User = Depends(require_permissions("admin:permissions"))
   ):
       """Get all permissions for a user."""
       # This would need to be implemented based on your provider
       permissions = await get_user_permissions_from_db(username)
       return {"username": username, "permissions": permissions}

Testing Permissions Providers
=============================

**Unit Testing:**

.. code-block:: python

   import pytest
   from unittest.mock import AsyncMock
   from auth_middleware.types.jwt import JWTAuthorizationCredentials
   from your_app.providers import CustomPermissionsProvider

   @pytest.mark.asyncio
   async def test_custom_permissions_provider():
       # Setup
       mock_api_client = AsyncMock()
       mock_api_client.get_user_permissions.return_value = ["read:posts", "write:posts"]
       
       provider = CustomPermissionsProvider(mock_api_client)
       
       # Create test token
       token = JWTAuthorizationCredentials(
           jwt_token="test_token",
           header={"alg": "HS256"},
           signature="signature",
           message="message",
           claims={"username": "testuser"}
       )
       
       # Test
       permissions = await provider.fetch_permissions(token)
       
       # Assertions
       assert permissions == ["read:posts", "write:posts"]
       mock_api_client.get_user_permissions.assert_called_once_with("testuser")

**Integration Testing:**

.. code-block:: python

   from fastapi.testclient import TestClient
   from your_app.main import app

   def test_permissions_authorization():
       client = TestClient(app)
       
       # Test with sufficient permissions
       user_token = "valid_jwt_with_read_posts_permission"
       response = client.get(
           "/posts",
           headers={"Authorization": f"Bearer {user_token}"}
       )
       assert response.status_code == 200
       
       # Test with insufficient permissions
       limited_token = "valid_jwt_without_read_posts_permission"
       response = client.get(
           "/posts",
           headers={"Authorization": f"Bearer {limited_token}"}
       )
       assert response.status_code == 403

Best Practices
==============

**Security Considerations:**

1. **Principle of Least Privilege**: Grant minimum necessary permissions
2. **Regular Audits**: Regularly review and clean up permissions
3. **Permission Expiration**: Implement time-based permission expiration
4. **Audit Logging**: Log all permission grants and revocations

**Performance Optimization:**

1. **Caching**: Cache frequently accessed permissions
2. **Batch Operations**: Batch permission checks when possible
3. **Database Indexes**: Ensure proper indexing on username columns
4. **Connection Pooling**: Use database connection pooling

**Error Handling:**

.. code-block:: python

   class RobustPermissionsProvider(PermissionsProvider):
       async def fetch_permissions(self, token: JWTAuthorizationCredentials) -> list[str]:
           try:
               return await self._fetch_permissions_internal(token)
           except Exception as e:
               logger.error(f"Failed to fetch permissions: {e}")
               # Return minimal permissions or raise exception
               return []  # Or raise an exception based on your security model

Troubleshooting
===============

**Common Issues:**

1. **Permissions Not Loading**
   
   - Check database connectivity
   - Verify permission format and naming
   - Ensure proper provider configuration

2. **Performance Issues**
   
   - Implement caching for frequently accessed permissions
   - Check database query performance
   - Monitor external API response times

3. **Authorization Failures**
   
   - Verify permission names match exactly
   - Check case sensitivity
   - Ensure permissions are properly granted

API Reference
=============

.. autoclass:: auth_middleware.providers.authz.permissions_provider.PermissionsProvider
   :members:

.. autoclass:: auth_middleware.providers.authz.sql_permissions_provider.SqlPermissionsProvider
   :members:

See Also
========

- :doc:`groups-provider` - For role-based authorization
- :doc:`../functions` - For using permissions in endpoint dependencies
- :doc:`../middleware-configuration` - For middleware setup with permissions providers
