Metadata-Version: 2.4
Name: huace-aigc-auth-client
Version: 1.1.26
Summary: 华策AIGC Auth Client - 提供 Token 验证、用户信息获取、权限检查、旧系统接入等功能
Author-email: Huace <support@huace.com>
License: MIT
Project-URL: Homepage, https://github.com/huace/huace-aigc-auth-client
Project-URL: Repository, https://github.com/huace/huace-aigc-auth-client
Keywords: aigc,auth,huace,sdk,authentication
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: requests>=2.20.0
Dynamic: license-file

# AIGC Auth Python SDK

[![PyPI version](https://badge.fury.io/py/huace-aigc-auth-client.svg)](https://pypi.org/project/huace-aigc-auth-client/)
[![Python Version](https://img.shields.io/pypi/pyversions/huace-aigc-auth-client.svg)](https://pypi.org/project/huace-aigc-auth-client/)

Python 后端服务接入华策 AIGC 鉴权中心的 SDK 工具包。

## 安装

```bash
pip install huace-aigc-auth-client
```

## 环境变量配置

在项目根目录创建 `.env` 文件：

```bash
# 必填：应用 ID 和密钥（在鉴权中心创建应用后获取）
AIGC_AUTH_APP_ID=your_app_id
AIGC_AUTH_APP_SECRET=your_app_secret
# 鉴权服务地址（默认为生产环境）
AIGC_AUTH_BASE_URL=your-auth-api-url-prefix
```

如需通过 Nginx 代理：

```nginx
location /aigc-auth/ {
    proxy_pass https://aigc-auth.huacemedia.com/aigc-auth/;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
}
```

## 快速开始

### 1. 初始化客户端

```python
from huace_aigc_auth_client import AigcAuthClient

# 方式一：从环境变量读取配置
client = AigcAuthClient()

# 方式二：直接传入参数
client = AigcAuthClient(
    app_id="your_app_id",
    app_secret="your_app_secret",
    base_url="your-auth-api-url-prefix"
)
```

### 2. 验证 Token

```python
result = client.verify_token(token)

if result.valid:
    print(f"用户 ID: {result.user_id}")
    print(f"用户名: {result.username}")
    print(f"过期时间: {result.expires_at}")
else:
    print("Token 无效")
```

### 3. 获取用户信息

```python
from huace_aigc_auth_client import AigcAuthClient, AigcAuthError

client = AigcAuthClient()

try:
    user = client.get_user_info(token)
    
    print(f"用户名: {user.username}")
    print(f"昵称: {user.nickname}")
    print(f"邮箱: {user.email}")
    print(f"角色: {user.roles}")
    print(f"权限: {user.permissions}")
    
    # 检查角色(在 Auth 里面配置)
    if user.has_role("admin") or user.is_admin:
        print("是管理员")
    
    # 检查权限(在 Auth 里面配置)
    if user.has_permission("user:write"):
        print("有用户写权限")
        
except AigcAuthError as e:
    print(f"错误: {e.message}")
```

### 4. 批量检查权限

```python
results = client.check_permissions(token, ["user:read", "user:write", "admin:access"])

for permission, has_permission in results.items():
    print(f"{permission}: {'✓' if has_permission else '✗'}")
```

## FastAPI 集成

### 方式一：使用中间件（推荐）

```python
from fastapi import FastAPI, Request, HTTPException
from huace_aigc_auth_client import AigcAuthClient, AuthMiddleware

app = FastAPI()

# 初始化客户端和中间件
client = AigcAuthClient()
auth_middleware = AuthMiddleware(
    client,
    exclude_paths=["/health", "/docs", "/openapi.json"],
    exclude_prefixes=["/public/"]
)

# 注册中间件
@app.middleware("http")
async def auth(request: Request, call_next):
    return await auth_middleware.fastapi_middleware(request, call_next)

# 在路由中获取用户信息
@app.get("/me")
async def get_current_user(request: Request):
    user = request.state.user_info
    return {
        "username": user.username,
        "roles": user.roles
    }

@app.get("/admin")
async def admin_only(request: Request):
    user = request.state.user_info
    if not user.has_role("admin"):
        raise HTTPException(status_code=403, detail="需要管理员权限")
    return {"message": "欢迎管理员"}
```

### 方式二：使用依赖注入

```python
from fastapi import FastAPI, Depends, HTTPException
from huace_aigc_auth_client import AigcAuthClient, create_fastapi_auth_dependency, UserInfo

app = FastAPI()
client = AigcAuthClient()

# 创建认证依赖
get_current_user = create_fastapi_auth_dependency(client)

@app.get("/me")
async def get_me(user: UserInfo = Depends(get_current_user)):
    return {"username": user.username}

# 创建权限检查依赖
def require_permission(permission: str):
    async def check(user: UserInfo = Depends(get_current_user)):
        if not user.has_permission(permission):
            raise HTTPException(status_code=403, detail="权限不足")
        return user
    return check

@app.get("/users")
async def list_users(user: UserInfo = Depends(require_permission("user:read"))):
    return {"users": [...]}
```

### 方式三：使用装饰器

```python
from fastapi import FastAPI, Request
from huace_aigc_auth_client import AigcAuthClient, require_auth, UserInfo

app = FastAPI()
client = AigcAuthClient()

@app.get("/protected")
@require_auth(client)
async def protected_route(request: Request, user_info: UserInfo):
    return {"user": user_info.username}

@app.get("/admin")
@require_auth(client, permissions=["admin:access"])
async def admin_route(request: Request, user_info: UserInfo):
    return {"admin": True}

# 只需要任意一个权限
@app.get("/editor")
@require_auth(client, permissions=["article:write", "article:edit"], any_permission=True)
async def editor_route(request: Request, user_info: UserInfo):
    return {"editor": True}
```

## Flask 集成

```python
from flask import Flask, g, jsonify
from functools import wraps
from huace_aigc_auth_client import AigcAuthClient, AuthMiddleware

app = Flask(__name__)

# 初始化客户端和中间件
client = AigcAuthClient()
auth_middleware = AuthMiddleware(
    client,
    exclude_paths=["/health", "/login"],
    exclude_prefixes=["/public/"]
)

# 注册 before_request
@app.before_request
def before_request():
    return auth_middleware.flask_before_request()

# 在路由中获取用户信息
@app.route("/me")
def get_me():
    user = g.user_info
    return jsonify({
        "username": user.username,
        "roles": user.roles
    })

# 权限检查装饰器
def require_permission(permission):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            user = g.user_info
            if not user.has_permission(permission):
                return jsonify({"error": "权限不足"}), 403
            return f(*args, **kwargs)
        return decorated_function
    return decorator

@app.route("/admin")
@require_permission("admin:access")
def admin_only():
    return jsonify({"message": "欢迎管理员"})
```

## 上下文管理

SDK 提供了上下文管理功能，可以在任意位置（Service、Utility 等）获取当前请求的用户信息，无需层层传递参数。
支持 Flask 和 FastAPI，兼容同步和异步环境。

注意：使用此功能前必须先注册 `AuthMiddleware`。

```python
from huace_aigc_auth_client import get_current_user

def do_something_logic():
    # 获取当前用户信息（返回字典，非 UserInfo 对象）
    user = get_current_user()
    
    if user:
        print(f"当前操作用户ID: {user['id']}")
        print(f"当前操作用户名: {user['username']}")
        
        # 也可以获取 app_id 等信息（如果未过滤）
        if 'app_id' in user:
            print(f"应用ID: {user['app_id']}")
    else:
        print("无用户上下文")
```

## API 参考

### UserInfo 对象

| 属性 | 类型 | 说明 |
|------|------|------|
| `id` | int | 用户 ID |
| `username` | str | 用户名 |
| `nickname` | str | 昵称 |
| `email` | str | 邮箱 |
| `phone` | str | 手机号 |
| `avatar` | str | 头像 URL |
| `roles` | List[str] | 角色代码列表 |
| `permissions` | List[str] | 权限代码列表 |
| `department` | str | 部门 |
| `company` | str | 公司 |

| 方法 | 说明 |
|------|------|
| `has_role(role)` | 检查是否拥有指定角色 |
| `has_permission(permission)` | 检查是否拥有指定权限 |
| `has_any_permission(permissions)` | 检查是否拥有任意一个权限 |
| `has_all_permissions(permissions)` | 检查是否拥有所有权限 |

### 异常处理

```python
from huace_aigc_auth_client import AigcAuthClient, AigcAuthError

client = AigcAuthClient()

try:
    user = client.get_user_info(token)
except AigcAuthError as e:
    print(f"错误码: {e.code}")
    print(f"错误信息: {e.message}")
```

| 错误码 | 说明 |
|--------|------|
| 401 | 未授权（Token 无效或已过期） |
| 403 | 禁止访问（用户被禁用或权限不足） |
| 404 | 用户不存在 |
| -1 | 网络请求失败 |

---

## 旧系统接入（用户同步）

如果你的系统已有用户表，可通过 SDK 提供的「旧系统适配器」实现低成本接入，**无需修改历史代码和表结构**。

### 接入原理

```
┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   aigc-auth     │────▶│   SDK 同步层     │────▶│   旧系统         │
│   (鉴权中心)     │     │   (字段映射)     │     │   (用户表)       │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │                       │                       │
        │   1. 初始化同步        │                       │
        │◀──────────────────────────────────────────────│
        │   (批量同步旧用户到 auth)                       │
        │                       │                       │
        │   2. 增量同步          │                       │
        │──────────────────────▶│──────────────────────▶│
        │   (auth 新用户自动同步到旧系统)                  │
        │                       │                       │
        │   3. Webhook 推送      │                       │
        │──────────────────────────────────────────────▶│
        │   (用户变更主动通知)                            │
```

### 快速开始

#### 1. 配置环境变量

```bash
# .env 文件
AIGC_AUTH_APP_ID=your_app_id
AIGC_AUTH_APP_SECRET=your_app_secret
AIGC_AUTH_BASE_URL=your-auth-api-url-prefix

# 同步配置
AIGC_AUTH_SYNC_ENABLED=true
AIGC_AUTH_SYNC_PASSWORD=通用密码
AIGC_AUTH_WEBHOOK_URL=https://your-domain.com/api/v1/webhook/auth
AIGC_AUTH_WEBHOOK_SECRET=your_secret
```

#### 2. 创建字段映射

```python
from huace_aigc_auth_client import (
    FieldMapping,
    SyncConfig,
    PasswordMode,
    create_sync_config
)

# 定义字段映射
field_mappings = [
    FieldMapping(
        auth_field="username",
        legacy_field="username",
        required=True
    ),
    FieldMapping(
        auth_field="email",
        legacy_field="email"
    ),
    FieldMapping(
        auth_field="nickname",
        legacy_field="nickname"
    ),
    # 状态字段转换：auth 的 status 映射到旧系统的 is_active
    FieldMapping(
        auth_field="status",
        legacy_field="is_active",
        transform_to_legacy=lambda s: s == "active",
        transform_to_auth=lambda a: "active" if a else "disabled"
    ),
    # 角色字段转换：auth 的 roles 列表映射到旧系统的 role 字符串
    FieldMapping(
        auth_field="roles",
        legacy_field="role",
        transform_to_legacy=lambda r: r[0] if r else "viewer",
        transform_to_auth=lambda r: [r] if r else ["viewer"]
    ),
]

# 创建同步配置
sync_config = create_sync_config(
    field_mappings=field_mappings,
    password_mode=PasswordMode.UNIFIED,
    unified_password="通用密码",
    webhook_url="https://your-domain.com/api/v1/webhook/auth",
)
```

#### 3. 实现旧系统适配器

```python
from huace_aigc_auth_client import LegacySystemAdapter, LegacyUserData

class MyLegacyAdapter(LegacySystemAdapter):
    """实现与旧系统用户表的交互"""
    
    def __init__(self, db, sync_config, auth_client=None):
        super().__init__(sync_config, auth_client)
        self.db = db
    
    def get_user_by_unique_field(self, username: str):
        """通过用户名获取旧系统用户"""
        user = self.db.query(User).filter(User.username == username).first()
        if not user:
            return None
        return LegacyUserData({
            "id": user.id,
            "username": user.username,
            "email": user.email,
            # ... 其他字段
        })

    async def get_user_by_username_async(self, username: str) -> Optional[LegacyUserData]:
        """异步通过用户名获取旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if not user:
            return None
        return LegacyUserData({
            "id": user.id,
            "username": user.username,
            "email": user.email,
            # ... 其他字段
        })
    
    async def _create_user_async(self, user_data: Dict[str, Any]) -> Optional[Any]:
        """在旧系统创建用户"""
        user = User(
            username=user_data["username"],
            email=user_data.get("email"),
            hashed_password=hash_password(user_data["password"]),
            # ... 其他字段
        )
        self.db.add(user)
        await self.db.commit()
        return user.id
    
    async def _update_user_async(self, username: str, user_data: Dict[str, Any]) -> bool:
        """异步更新旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if user:
            for key, value in user_data.items():
                setattr(user, key, value)
            await self.db.commit()
            return True
        return False

    async def _delete_user_async(self, username: str) -> bool:
        """异步删除旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if user:
            await self.db.delete(user) 
            # 或者软删除：user.is_active = False
            await self.db.commit()
            return True
        return False
    
    async def get_all_users_async(self) -> List[LegacyUserData]:
        """异步获取所有用户（用于初始化同步）"""
        result = await self.db.execute(select(User))
        users = result.scalars().all()
        return [LegacyUserData({"username": u.username, ...}) for u in users]
```

#### 4. 集成到登录流程

```python
from huace_aigc_auth_client import AigcAuthClient, UserSyncService

client = AigcAuthClient()
adapter = MyLegacyAdapter(db, sync_config, client)
sync_service = UserSyncService(client, adapter)

# 在获取用户信息后调用同步
@app.middleware("http")
async def auth_middleware(request, call_next):
    response = await auth.fastapi_middleware(request, call_next)
    
    # 登录成功后，同步用户到旧系统
    if hasattr(request.state, "user_info"):
        user_info = request.state.user_info
        await sync_service.sync_on_login_async(user_info)
    
    return response
```

#### 5. 添加 Webhook 接收端点

**推荐方式：使用 SDK 提供的通用 Webhook 路由**

```python
from fastapi import APIRouter
from huace_aigc_auth_client import register_webhook_router

api_router = APIRouter()
client = AigcAuthClient()
adapter = MyLegacyAdapter(db, sync_config, client)

# 定义 webhook 处理函数
async def handle_auth_webhook(request_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    独立的 webhook 处理函数（用于 SDK 集成）
    
    这个函数不需要 db 参数，会在内部创建数据库会话。
    适用于通过 SDK 的 register_webhook_router 注册。
    """
    from app.db.session import get_db
    
    async for db in get_db():
        try:
            event = request_data.get("event")
            data = request_data.get("data", {})
            logger.info(f"Received webhook event: {event} with data: {data}")
            
            # 调用适配器的 handle_webhook 方法
            return await adapter.handle_webhook(event, data)
        except Exception as e:
            logger.exception(f"Failed to handle webhook: {e}")
            raise

# 注册 webhook 路由（自动处理签名验证）
register_webhook_router(
    api_router,
    handler=handle_auth_webhook,
    prefix="/webhook",  # 可选，默认 "/webhook"
    secret_env_key="aigc-auth-webhook-secret",  # 可选，在 auth 后台配置
    tags=["Webhook"]  # 可选，默认 ["Webhook"]
)

# webhook 端点将自动创建在: /webhook/auth
```

**Webhook 签名验证说明**

SDK 的 `register_webhook_router` 会自动处理签名验证：
- 从请求头 `X-Webhook-Signature` 获取签名
- 从环境变量读取密钥（默认 `AIGC_AUTH_WEBHOOK_SECRET`）
- 使用 HMAC-SHA256 算法验证签名
- 签名不匹配时返回 401 错误

#### 6. 执行初始化同步

```python
# 一次性脚本：将旧系统用户同步到 aigc-auth
async def init_sync():
    adapter = MyLegacyAdapter(db, sync_config, client)
    users = adapter.get_all_users()
    
    for user in users:
        auth_data = adapter.transform_legacy_to_auth(user)
        
        # 获取密码（支持元组返回格式）
        password_result = adapter.get_password_for_sync(user)
        if isinstance(password_result, tuple):
            password, is_hashed = password_result
        else:
            password, is_hashed = password_result, False
        
        # 根据是否已加密选择不同的字段
        if is_hashed:
            auth_data["password_hashed"] = password  # 直接传递已加密密码
        else:
            auth_data["password"] = password  # 传递明文密码，服务端会加密
        
        client.sync_user_to_auth(auth_data)
    
    print(f"同步完成：{len(users)} 个用户")

# 运行
asyncio.run(init_sync())
```

### 密码处理策略

| 模式 | 说明 | 适用场景 |
|------|------|----------|
| `UNIFIED` | 统一初始密码 | 新系统接入，安全性高 |
| `CUSTOM_MAPPING` | 自定义映射函数 | 需要保留原密码时 |
| `CUSTOM_MAPPING` + `password_is_hashed=True` | 直接同步已加密密码 | **推荐**，两系统使用相同加密方式 (bcrypt) |

```python
from huace_aigc_auth_client import PasswordMode, create_sync_config

# 统一密码模式
sync_config = create_sync_config(
    password_mode=PasswordMode.UNIFIED,
    unified_password="Abc@123456"
)

# 自定义映射模式（返回明文密码）
def password_mapper(user_data):
    return user_data.get("password", "default")

sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=password_mapper
)

# 直接同步已加密密码（推荐，适用于两系统使用相同的 bcrypt 加密）
def hashed_password_mapper(user_data):
    return user_data.get("hashed_password", "")

sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=hashed_password_mapper,
    password_is_hashed=True  # 标记返回的是已加密密码
)
```

> **注意**：当 `password_is_hashed=True` 时，SDK 会使用 `password_hashed` 字段传递到服务端，
> 服务端会直接使用该哈希值而不再重新加密。

### 同步方向配置

```python
from huace_aigc_auth_client import SyncDirection, create_sync_config

# 仅从 auth 同步到旧系统（默认）
sync_config = create_sync_config(
    direction=SyncDirection.AUTH_TO_LEGACY
)

# 仅从旧系统同步到 auth（初始化用）
sync_config = create_sync_config(
    direction=SyncDirection.LEGACY_TO_AUTH
)

# 双向同步（需谨慎使用）
sync_config = create_sync_config(
    direction=SyncDirection.BIDIRECTIONAL
)
```

---

## 导出清单

```python
from huace_aigc_auth_client import (
    # 核心类
    AigcAuthClient,
    require_auth,
    AuthMiddleware,
    UserInfo,
    TokenVerifyResult,
    AigcAuthError,
    create_fastapi_auth_dependency,
    
    # 旧系统接入
    LegacySystemAdapter,
    LegacyUserData,
    SyncConfig,
    SyncDirection,
    PasswordMode,
    FieldMapping,
    UserSyncService,
    WebhookSender,
    SyncResult,
    create_sync_config,
    create_default_field_mappings,
    
    # Webhook 接收
    register_webhook_router,
    verify_webhook_signature,
)
```

---5 (2026-01-15)

#### 新增功能

1. **Webhook 接收功能**
   - 新增 `register_webhook_router()` 函数，快速注册 webhook 接收路由
   - 自动处理签名验证、请求解析和错误处理
   - 支持自定义前缀、密钥环境变量和标签
   - 新增 `verify_webhook_signature()` 工具函数

#### 使用示例

```python
from fastapi import APIRouter
from huace_aigc_auth_client import register_webhook_router

api_router = APIRouter()

async def my_handler(data: dict) -> dict:
    event = data.get("event")
    # 处理逻辑...
    return {"status": "ok"}

# 注册 webhook 路由
register_webhook_router(api_router, my_handler)
```

### v1.1.

## API 变更日志

### v1.1.0 (2026-01-13)

#### 新增功能

1. **支持直接同步已加密密码**
   - `SyncConfig` 新增 `password_is_hashed` 参数
   - `SyncUserRequest` 新增 `password_hashed` 字段
   - 当两个系统使用相同的密码加密方式（bcrypt）时，可直接同步已加密密码

2. **`_request` 方法支持自定义 headers**
   - `sync_user_to_auth(user_data, headers=None)` 支持传入自定义请求头
   - `batch_sync_users_to_auth(users, headers=None)` 支持传入自定义请求头

3. **`get_password_for_sync` 返回格式变更**
   - 旧版：返回 `str`（密码字符串）
   - 新版：返回 `tuple[str, bool]`（密码字符串, 是否已加密）

#### 使用示例

```python
# 直接同步已加密密码
sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=lambda user: user.get("hashed_password"),
    password_is_hashed=True,  # 新增参数
)

# SDK 调用
client.sync_user_to_auth({
    "username": "test",
    "password_hashed": "$2b$12$xxxxx...",  # 直接传递 bcrypt 哈希值
    "email": "test@example.com"
})
```

## 许可证

MIT License
