.. _groups-provider:

Groups Provider
===============

The Groups Provider system in auth-middleware allows you to implement custom authorization logic by retrieving user groups from various sources. This enables flexible role-based access control (RBAC) in your application.

Overview
========

Groups providers implement the ``GroupsProvider`` interface and are responsible for fetching user groups based on JWT token information. The middleware uses these groups to determine user permissions and access levels.

.. note::
   Groups are typically used for role-based authorization, while permissions provide more granular access control. Both can be used together for comprehensive authorization systems.

Built-in Providers
==================

CognitoGroupsProvider
--------------------

Extracts groups directly from AWS Cognito JWT tokens.

**Features:**
- Reads groups from ``cognito:groups`` claim
- Fallback to ``scope`` claim for single group scenarios
- No external database queries required
- Zero-latency group resolution

**Usage:**

.. code-block:: python

   from auth_middleware.providers.authz.cognito_groups_provider import CognitoGroupsProvider
   from auth_middleware.jwt_auth_middleware import JwtAuthMiddleware
   from auth_middleware.providers.authn.cognito_provider import CognitoProvider

   # Configure the authentication provider
   auth_provider = CognitoProvider(settings=auth_settings)
   
   # Configure the groups provider
   groups_provider = CognitoGroupsProvider()

   # Add middleware with groups provider
   app.add_middleware(
       JwtAuthMiddleware,
       auth_provider=auth_provider,
       groups_provider=groups_provider,
   )

**Token Format:**

The provider expects JWT tokens with group information in one of these formats:

.. code-block:: json

   {
     "sub": "user123",
     "username": "john.doe",
     "cognito:groups": ["admin", "user", "moderator"],
     "exp": 1234567890
   }

Or for single scope scenarios:

.. code-block:: json

   {
     "sub": "user123", 
     "username": "john.doe",
     "scope": "api/admin",
     "exp": 1234567890
   }

SqlGroupsProvider
-----------------

Retrieves groups from a SQL database using SQLAlchemy.

**Features:**
- Stores group memberships in database
- Supports multiple database backends (PostgreSQL, MySQL, SQLite)
- Async database operations
- Configurable database connection

**Database Schema:**

.. code-block:: sql

   CREATE TABLE authz_groups (
       id VARCHAR(27) PRIMARY KEY,
       username VARCHAR(500) NOT NULL,
       group_name VARCHAR(100) NOT NULL
   );

   CREATE INDEX idx_authz_groups_username ON authz_groups(username);

**Usage:**

.. code-block:: python

   from auth_middleware.providers.authz.sql_groups_provider import SqlGroupsProvider
   from auth_middleware.providers.authz.async_database import AsyncDatabase
   from auth_middleware.providers.authz.async_database_settings import AsyncDatabaseSettings

   # Configure database connection
   db_settings = AsyncDatabaseSettings(
       database_url="postgresql+asyncpg://user:pass@localhost/mydb"
   )
   AsyncDatabase.configure(db_settings)

   # Configure the groups provider
   groups_provider = SqlGroupsProvider()

   # Add middleware with groups provider
   app.add_middleware(
       JwtAuthMiddleware,
       auth_provider=auth_provider,
       groups_provider=groups_provider,
   )

**Managing Groups:**

Add users to groups by inserting records:

.. code-block:: python

   from auth_middleware.providers.authz.sql_groups_provider import GroupsModel
   from auth_middleware.providers.authz.async_database import AsyncDatabase

   async def add_user_to_group(username: str, group: str):
       async with AsyncDatabase.get_session() as session:
           group_record = GroupsModel(username=username, group=group)
           session.add(group_record)
           await session.commit()

   # Example usage
   await add_user_to_group("john.doe", "admin")
   await add_user_to_group("john.doe", "user")

Using Groups in Your Application
=================================

Once configured, groups are automatically available in your endpoints through the user object:

.. code-block:: python

   from fastapi import Depends, FastAPI
   from auth_middleware.functions import require_groups, get_current_user
   from auth_middleware.types.user import User

   app = FastAPI()

   @app.get("/admin-only")
   async def admin_endpoint(user: User = Depends(require_groups("admin"))):
       return {"message": f"Hello admin {user.username}"}

   @app.get("/user-info")
   async def user_info(user: User = Depends(get_current_user())):
       # Access groups directly
       groups = await user.groups
       return {"username": user.username, "groups": groups}

   @app.get("/multi-role")
   async def multi_role(user: User = Depends(require_groups(["admin", "moderator"]))):
       return {"message": "Admin or moderator access"}

Custom Groups Provider
======================

You can create custom groups providers by implementing the ``GroupsProvider`` interface:

**Basic Implementation:**

.. code-block:: python

   from auth_middleware.providers.authz.groups_provider import GroupsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class CustomGroupsProvider(GroupsProvider):
       """Custom groups provider implementation."""
       
       def __init__(self, api_client):
           self.api_client = api_client
       
       async def fetch_groups(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch groups from custom source."""
           username = token.claims.get("username")
           
           # Implement your custom logic here
           groups = await self.api_client.get_user_groups(username)
           
           return groups

**Advanced Example - Redis Groups Provider:**

.. code-block:: python

   import json
   import redis.asyncio as redis
   from auth_middleware.providers.authz.groups_provider import GroupsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class RedisGroupsProvider(GroupsProvider):
       """Groups provider using Redis for storage."""
       
       def __init__(self, redis_url: str = "redis://localhost:6379"):
           self.redis_url = redis_url
           self._redis = None
       
       async def _get_redis(self):
           if self._redis is None:
               self._redis = redis.from_url(self.redis_url)
           return self._redis
       
       async def fetch_groups(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch groups from Redis."""
           username = token.claims.get("username")
           if not username:
               return []
           
           redis_client = await self._get_redis()
           
           # Get groups from Redis hash
           groups_data = await redis_client.hget("user_groups", username)
           
           if groups_data:
               return json.loads(groups_data)
           
           return []
       
       async def close(self):
           """Clean up Redis connection."""
           if self._redis:
               await self._redis.close()

   # Usage
   groups_provider = RedisGroupsProvider("redis://localhost:6379")

**LDAP/Active Directory Groups Provider:**

.. code-block:: python

   import ldap3
   from auth_middleware.providers.authz.groups_provider import GroupsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class LdapGroupsProvider(GroupsProvider):
       """Groups provider using LDAP/Active Directory."""
       
       def __init__(self, server_url: str, base_dn: str, bind_user: str, bind_password: str):
           self.server_url = server_url
           self.base_dn = base_dn
           self.bind_user = bind_user
           self.bind_password = bind_password
       
       async def fetch_groups(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch groups from LDAP."""
           username = token.claims.get("username")
           if not username:
               return []
           
           # Note: This is a simplified example
           # In production, use asyncio-compatible LDAP libraries
           server = ldap3.Server(self.server_url)
           conn = ldap3.Connection(server, self.bind_user, self.bind_password)
           
           if conn.bind():
               # Search for user groups
               search_filter = f"(&(objectClass=group)(member=cn={username},{self.base_dn}))"
               conn.search(self.base_dn, search_filter, attributes=['cn'])
               
               groups = [entry.cn.value for entry in conn.entries]
               conn.unbind()
               return groups
           
           return []

**API-based Groups Provider:**

.. code-block:: python

   import httpx
   from auth_middleware.providers.authz.groups_provider import GroupsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class ApiGroupsProvider(GroupsProvider):
       """Groups provider using external API."""
       
       def __init__(self, api_base_url: str, api_key: str):
           self.api_base_url = api_base_url
           self.api_key = api_key
       
       async def fetch_groups(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch groups from external API."""
           username = token.claims.get("username")
           if not username:
               return []
           
           async with httpx.AsyncClient() as client:
               try:
                   response = await client.get(
                       f"{self.api_base_url}/users/{username}/groups",
                       headers={"Authorization": f"Bearer {self.api_key}"}
                   )
                   response.raise_for_status()
                   
                   data = response.json()
                   return data.get("groups", [])
               
               except httpx.HTTPError:
                   # Log error and return empty groups
                   return []

Configuration Examples
======================

**Multiple Groups Sources:**

.. code-block:: python

   class HybridGroupsProvider(GroupsProvider):
       """Combines multiple groups sources."""
       
       def __init__(self, cognito_provider, sql_provider):
           self.cognito_provider = cognito_provider
           self.sql_provider = sql_provider
       
       async def fetch_groups(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch groups from multiple sources."""
           # Get groups from Cognito token
           cognito_groups = await self.cognito_provider.fetch_groups(token)
           
           # Get additional groups from database
           db_groups = await self.sql_provider.fetch_groups(token)
           
           # Combine and deduplicate
           all_groups = list(set(cognito_groups + db_groups))
           
           return all_groups

**Cached Groups Provider:**

.. code-block:: python

   import asyncio
   from functools import wraps
   from auth_middleware.providers.authz.groups_provider import GroupsProvider
   from auth_middleware.types.jwt import JWTAuthorizationCredentials

   class CachedGroupsProvider(GroupsProvider):
       """Groups provider with caching support."""
       
       def __init__(self, base_provider: GroupsProvider, cache_ttl: int = 300):
           self.base_provider = base_provider
           self.cache_ttl = cache_ttl
           self._cache = {}
       
       async def fetch_groups(self, token: JWTAuthorizationCredentials) -> list[str]:
           """Fetch groups with caching."""
           username = token.claims.get("username")
           cache_key = f"groups:{username}"
           
           # Check cache
           if cache_key in self._cache:
               cached_data, timestamp = self._cache[cache_key]
               if asyncio.get_event_loop().time() - timestamp < self.cache_ttl:
                   return cached_data
           
           # Fetch from base provider
           groups = await self.base_provider.fetch_groups(token)
           
           # Cache result
           self._cache[cache_key] = (groups, asyncio.get_event_loop().time())
           
           return groups

Testing Groups Providers
========================

**Unit Testing:**

.. code-block:: python

   import pytest
   from unittest.mock import AsyncMock
   from auth_middleware.types.jwt import JWTAuthorizationCredentials
   from your_app.providers import CustomGroupsProvider

   @pytest.mark.asyncio
   async def test_custom_groups_provider():
       # Setup
       mock_api_client = AsyncMock()
       mock_api_client.get_user_groups.return_value = ["admin", "user"]
       
       provider = CustomGroupsProvider(mock_api_client)
       
       # Create test token
       token = JWTAuthorizationCredentials(
           jwt_token="test_token",
           header={"alg": "HS256"},
           signature="signature",
           message="message",
           claims={"username": "testuser"}
       )
       
       # Test
       groups = await provider.fetch_groups(token)
       
       # Assertions
       assert groups == ["admin", "user"]
       mock_api_client.get_user_groups.assert_called_once_with("testuser")

**Integration Testing:**

.. code-block:: python

   from fastapi.testclient import TestClient
   from your_app.main import app

   def test_groups_authorization():
       client = TestClient(app)
       
       # Test with admin token
       admin_token = "valid_admin_jwt_token"
       response = client.get(
           "/admin-only",
           headers={"Authorization": f"Bearer {admin_token}"}
       )
       assert response.status_code == 200
       
       # Test with user token
       user_token = "valid_user_jwt_token"
       response = client.get(
           "/admin-only", 
           headers={"Authorization": f"Bearer {user_token}"}
       )
       assert response.status_code == 403

Best Practices
==============

**Performance Considerations:**

1. **Caching**: Implement caching for frequently accessed groups
2. **Connection Pooling**: Use connection pools for database providers
3. **Async Operations**: Always use async/await for I/O operations
4. **Error Handling**: Gracefully handle provider failures

**Security Best Practices:**

1. **Input Validation**: Validate usernames and group names
2. **SQL Injection Prevention**: Use parameterized queries
3. **Rate Limiting**: Implement rate limiting for external API calls
4. **Logging**: Log security-relevant events without exposing sensitive data

**Error Handling:**

.. code-block:: python

   class RobustGroupsProvider(GroupsProvider):
       async def fetch_groups(self, token: JWTAuthorizationCredentials) -> list[str]:
           try:
               return await self._fetch_groups_internal(token)
           except Exception as e:
               # Log error but don't expose to client
               logger.error(f"Failed to fetch groups: {e}")
               # Return empty groups or default groups
               return ["user"]  # Default fallback group

Migration and Deployment
=======================

**Database Migrations:**

When using SqlGroupsProvider, ensure your database schema is properly migrated:

.. code-block:: python

   # Alembic migration example
   from alembic import op
   import sqlalchemy as sa

   def upgrade():
       op.create_table('authz_groups',
           sa.Column('id', sa.String(27), primary_key=True),
           sa.Column('username', sa.String(500), nullable=False),
           sa.Column('group', sa.String(100), nullable=False)
       )
       op.create_index('idx_authz_groups_username', 'authz_groups', ['username'])

**Environment Configuration:**

.. code-block:: python

   import os
   from auth_middleware.providers.authz.sql_groups_provider import SqlGroupsProvider
   from auth_middleware.providers.authz.cognito_groups_provider import CognitoGroupsProvider

   def create_groups_provider():
       """Factory function for groups provider based on environment."""
       provider_type = os.getenv("GROUPS_PROVIDER", "cognito")
       
       if provider_type == "sql":
           return SqlGroupsProvider()
       elif provider_type == "cognito":
           return CognitoGroupsProvider()
       else:
           raise ValueError(f"Unknown groups provider: {provider_type}")

Troubleshooting
===============

**Common Issues:**

1. **Groups Not Loading**
   
   - Check token claims format
   - Verify database connectivity
   - Ensure proper provider configuration

2. **Performance Issues**
   
   - Implement caching
   - Check database query performance
   - Monitor external API response times

3. **Authorization Failures**
   
   - Verify group names match exactly
   - Check case sensitivity
   - Ensure groups are properly assigned

**Debugging:**

Enable debug logging to troubleshoot issues:

.. code-block:: python

   import logging
   logging.getLogger("auth_middleware").setLevel(logging.DEBUG)

API Reference
=============

.. autoclass:: auth_middleware.providers.authz.groups_provider.GroupsProvider
   :members:

.. autoclass:: auth_middleware.providers.authz.cognito_groups_provider.CognitoGroupsProvider
   :members:

.. autoclass:: auth_middleware.providers.authz.sql_groups_provider.SqlGroupsProvider
   :members:

See Also
========

- :doc:`permissions-provider` - For granular permission-based authorization
- :doc:`../functions` - For using groups in endpoint dependencies
- :doc:`../middleware-configuration` - For middleware setup with groups providers
