跳转至

AI 模块 API

FundAgent

Fund-CLI 智能分析 Agent

基于 LangGraph 构建的基金分析智能体,能够: 1. 理解用户的自然语言查询 2. 自主调用数据工具获取实时数据 3. 进行多轮对话并保持上下文 4. 提供专业的基金分析建议

使用示例

agent = FundAgent() response = await agent.ainvoke("分析基金000001的投资价值")

源代码位于: src/fund_cli/ai/agent.py
class FundAgent:
    """
    Fund-CLI 智能分析 Agent

    基于 LangGraph 构建的基金分析智能体,能够:
    1. 理解用户的自然语言查询
    2. 自主调用数据工具获取实时数据
    3. 进行多轮对话并保持上下文
    4. 提供专业的基金分析建议

    使用示例:
        agent = FundAgent()
        response = await agent.ainvoke("分析基金000001的投资价值")
    """

    def __init__(
        self,
        llm: BaseChatModel | None = None,
        checkpointer: Any = None,
        enable_human_review: bool | None = None,
    ):
        """
        初始化 Fund Agent

        Args:
            llm: 语言模型实例,不传则使用配置创建
            checkpointer: 状态检查点存储,用于持久化对话记忆
            enable_human_review: 是否启用人工审核,不传则读取配置
        """
        self.llm = llm or self._create_default_llm()
        self.checkpointer = checkpointer or self._create_checkpointer()

        # 人工审核配置
        if enable_human_review is None:
            try:
                config = get_config()
                self.enable_human_review = config.agent.enable_human_review
            except Exception:
                self.enable_human_review = False
        else:
            self.enable_human_review = enable_human_review

        # 构建工作流
        self.workflow = self._build_workflow()
        self.app = self.workflow.compile(checkpointer=self.checkpointer)

    def _create_checkpointer(self):
        """根据配置创建 checkpointer"""
        try:
            config = get_config()
            if config.database.use_postgres:
                try:
                    from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

                    logger.info("使用 PostgreSQL 持久化存储")
                    return AsyncPostgresSaver.from_conn_string(
                        config.database.connection_string
                    )
                except ImportError:
                    logger.warning("langgraph-checkpoint-postgres 未安装,回退到 MemorySaver")
                    return MemorySaver()
        except Exception:
            pass

        return MemorySaver()

    def _create_default_llm(self) -> BaseChatModel:
        """创建默认 LLM"""
        config = get_config().ai

        # 使用 OpenAI 兼容接口
        if config.provider == "openai":
            return ChatOpenAI(
                model=config.model or "gpt-4",
                temperature=config.temperature,
                max_tokens=config.max_tokens,
                api_key=config.api_key,
            )
        elif config.provider == "qwen":
            # Qwen 使用 OpenAI 兼容接口
            return ChatOpenAI(
                model=config.qwen_model or "qwen-max",
                temperature=config.temperature,
                max_tokens=config.max_tokens,
                api_key=config.qwen_api_key or config.api_key,
                base_url=config.qwen_base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1",
            )
        else:
            # 默认使用 OpenAI
            return ChatOpenAI(
                model=config.model or "gpt-4",
                temperature=config.temperature,
                max_tokens=config.max_tokens,
                api_key=config.api_key,
            )

    def _build_workflow(self) -> StateGraph:
        """构建 Agent 工作流"""

        # 创建工作流图
        workflow = StateGraph(FundAgentState)

        # 添加节点
        workflow.add_node("system", create_system_node())
        workflow.add_node("llm", create_llm_node(self.llm))
        workflow.add_node("tools", create_tool_node())
        workflow.add_node("error_handler", create_error_handler_node())
        workflow.add_node("summary", create_summary_node())

        # 添加边 - 定义工作流
        workflow.add_edge(START, "system")
        workflow.add_edge("system", "llm")

        # 条件路由:根据 LLM 输出决定下一步
        if self.enable_human_review:
            # 启用人工审核时,增加 human_review 分支
            workflow.add_node("human_review", create_human_input_node())
            workflow.add_conditional_edges(
                "llm",
                router_node,
                {
                    "tools": "tools",
                    "end": "summary"
                }
            )
            workflow.add_edge("tools", "llm")
            workflow.add_edge("summary", "human_review")
            workflow.add_edge("human_review", END)
        else:
            workflow.add_conditional_edges(
                "llm",
                router_node,
                {
                    "tools": "tools",
                    "end": "summary"
                }
            )
            workflow.add_edge("tools", "llm")
            workflow.add_edge("summary", END)

        # 错误处理
        workflow.add_edge("error_handler", END)

        return workflow

    async def ainvoke(
        self,
        user_input: str,
        user_id: str = "default",
        thread_id: str | None = None
    ) -> str:
        """
        异步调用 Agent

        Args:
            user_input: 用户输入
            user_id: 用户标识,用于记忆隔离
            thread_id: 会话标识,用于多轮对话。不传则自动生成新会话

        Returns:
            Agent 响应文本
        """
        if thread_id is None:
            thread_id = str(uuid.uuid4())

        config = {
            "configurable": {
                "user_id": user_id,
                "thread_id": thread_id
            }
        }

        # 准备初始状态
        initial_state: FundAgentState = {
            "messages": [HumanMessage(content=user_input)],
            "user_id": user_id,
            "thread_id": thread_id,
            "user_input": user_input,
            "current_step": "start",
            "tool_results": [],
            "needs_human_review": False,
            "human_feedback": None,
            "final_response": None,
            "error": None
        }

        try:
            # 执行工作流
            result = await self.app.ainvoke(initial_state, config)

            # 提取最终响应
            if result.get("final_response"):
                return result["final_response"]

            # 从消息中提取
            messages = result.get("messages", [])
            if messages:
                last_message = messages[-1]
                if hasattr(last_message, 'content'):
                    return last_message.content

            return "抱歉,我无法处理您的请求。"

        except Exception as e:
            return f"执行过程中出现错误: {str(e)}"

    def invoke(
        self,
        user_input: str,
        user_id: str = "default",
        thread_id: str | None = None
    ) -> str:
        """
        同步调用 Agent

        Args:
            user_input: 用户输入
            user_id: 用户标识
            thread_id: 会话标识

        Returns:
            Agent 响应文本
        """
        import asyncio

        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        return loop.run_until_complete(
            self.ainvoke(user_input, user_id, thread_id)
        )

    def get_history(self, user_id: str, thread_id: str) -> list:
        """
        获取对话历史

        Args:
            user_id: 用户标识
            thread_id: 会话标识

        Returns:
            对话历史列表
        """
        config = {
            "configurable": {
                "user_id": user_id,
                "thread_id": thread_id
            }
        }

        try:
            history = list(self.app.get_state_history(config))
            return history
        except Exception:
            return []

    def clear_history(self, user_id: str, thread_id: str) -> bool:
        """
        清除对话历史

        Args:
            user_id: 用户标识
            thread_id: 会话标识

        Returns:
            是否成功清除
        """
        # MemorySaver 不支持清除,这里返回 True 表示操作接受
        # 实际生产环境应使用 PostgresSaver 等持久化存储
        return True

__init__

__init__(
    llm: BaseChatModel | None = None,
    checkpointer: Any = None,
    enable_human_review: bool | None = None,
)

初始化 Fund Agent

参数:

名称 类型 描述 默认
llm BaseChatModel | None

语言模型实例,不传则使用配置创建

None
checkpointer Any

状态检查点存储,用于持久化对话记忆

None
enable_human_review bool | None

是否启用人工审核,不传则读取配置

None
源代码位于: src/fund_cli/ai/agent.py
def __init__(
    self,
    llm: BaseChatModel | None = None,
    checkpointer: Any = None,
    enable_human_review: bool | None = None,
):
    """
    初始化 Fund Agent

    Args:
        llm: 语言模型实例,不传则使用配置创建
        checkpointer: 状态检查点存储,用于持久化对话记忆
        enable_human_review: 是否启用人工审核,不传则读取配置
    """
    self.llm = llm or self._create_default_llm()
    self.checkpointer = checkpointer or self._create_checkpointer()

    # 人工审核配置
    if enable_human_review is None:
        try:
            config = get_config()
            self.enable_human_review = config.agent.enable_human_review
        except Exception:
            self.enable_human_review = False
    else:
        self.enable_human_review = enable_human_review

    # 构建工作流
    self.workflow = self._build_workflow()
    self.app = self.workflow.compile(checkpointer=self.checkpointer)

ainvoke async

ainvoke(
    user_input: str,
    user_id: str = "default",
    thread_id: str | None = None,
) -> str

异步调用 Agent

参数:

名称 类型 描述 默认
user_input str

用户输入

必需
user_id str

用户标识,用于记忆隔离

'default'
thread_id str | None

会话标识,用于多轮对话。不传则自动生成新会话

None

返回:

类型 描述
str

Agent 响应文本

源代码位于: src/fund_cli/ai/agent.py
async def ainvoke(
    self,
    user_input: str,
    user_id: str = "default",
    thread_id: str | None = None
) -> str:
    """
    异步调用 Agent

    Args:
        user_input: 用户输入
        user_id: 用户标识,用于记忆隔离
        thread_id: 会话标识,用于多轮对话。不传则自动生成新会话

    Returns:
        Agent 响应文本
    """
    if thread_id is None:
        thread_id = str(uuid.uuid4())

    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": thread_id
        }
    }

    # 准备初始状态
    initial_state: FundAgentState = {
        "messages": [HumanMessage(content=user_input)],
        "user_id": user_id,
        "thread_id": thread_id,
        "user_input": user_input,
        "current_step": "start",
        "tool_results": [],
        "needs_human_review": False,
        "human_feedback": None,
        "final_response": None,
        "error": None
    }

    try:
        # 执行工作流
        result = await self.app.ainvoke(initial_state, config)

        # 提取最终响应
        if result.get("final_response"):
            return result["final_response"]

        # 从消息中提取
        messages = result.get("messages", [])
        if messages:
            last_message = messages[-1]
            if hasattr(last_message, 'content'):
                return last_message.content

        return "抱歉,我无法处理您的请求。"

    except Exception as e:
        return f"执行过程中出现错误: {str(e)}"

invoke

invoke(
    user_input: str,
    user_id: str = "default",
    thread_id: str | None = None,
) -> str

同步调用 Agent

参数:

名称 类型 描述 默认
user_input str

用户输入

必需
user_id str

用户标识

'default'
thread_id str | None

会话标识

None

返回:

类型 描述
str

Agent 响应文本

源代码位于: src/fund_cli/ai/agent.py
def invoke(
    self,
    user_input: str,
    user_id: str = "default",
    thread_id: str | None = None
) -> str:
    """
    同步调用 Agent

    Args:
        user_input: 用户输入
        user_id: 用户标识
        thread_id: 会话标识

    Returns:
        Agent 响应文本
    """
    import asyncio

    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    return loop.run_until_complete(
        self.ainvoke(user_input, user_id, thread_id)
    )

get_history

get_history(user_id: str, thread_id: str) -> list

获取对话历史

参数:

名称 类型 描述 默认
user_id str

用户标识

必需
thread_id str

会话标识

必需

返回:

类型 描述
list

对话历史列表

源代码位于: src/fund_cli/ai/agent.py
def get_history(self, user_id: str, thread_id: str) -> list:
    """
    获取对话历史

    Args:
        user_id: 用户标识
        thread_id: 会话标识

    Returns:
        对话历史列表
    """
    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": thread_id
        }
    }

    try:
        history = list(self.app.get_state_history(config))
        return history
    except Exception:
        return []

clear_history

clear_history(user_id: str, thread_id: str) -> bool

清除对话历史

参数:

名称 类型 描述 默认
user_id str

用户标识

必需
thread_id str

会话标识

必需

返回:

类型 描述
bool

是否成功清除

源代码位于: src/fund_cli/ai/agent.py
def clear_history(self, user_id: str, thread_id: str) -> bool:
    """
    清除对话历史

    Args:
        user_id: 用户标识
        thread_id: 会话标识

    Returns:
        是否成功清除
    """
    # MemorySaver 不支持清除,这里返回 True 表示操作接受
    # 实际生产环境应使用 PostgresSaver 等持久化存储
    return True

AIAnalyzer

AI 分析服务

使用 LLM 进行基金分析、报告生成等功能。

源代码位于: src/fund_cli/ai/analyzer.py
class AIAnalyzer:
    """
    AI 分析服务

    使用 LLM 进行基金分析、报告生成等功能。
    """

    def __init__(self, provider: LLMProvider | None = None):
        """
        初始化 AI 分析服务

        Args:
            provider: LLM 提供商实例,如果为None则使用默认配置创建
        """
        self.provider = provider or get_provider()
        self.prompts = PromptTemplates()

    def summarize_fund(self, fund_code: str, fund_data: dict[str, Any]) -> str:
        """
        生成基金分析摘要

        Args:
            fund_code: 基金代码
            fund_data: 基金数据字典,包含info、nav、metrics等

        Returns:
            AI生成的基金摘要文本
        """
        info = fund_data.get("info", {})
        metrics = fund_data.get("metrics", {})

        prompt_data = {
            "fund_code": fund_code,
            "fund_name": info.get("name", "未知"),
            "fund_type": info.get("type", "未知"),
            "manager": info.get("manager", "未知"),
            "total_return": metrics.get("total_return", 0),
            "cagr": metrics.get("cagr", 0),
            "sharpe": metrics.get("sharpe_ratio", 0),
            "max_drawdown": metrics.get("max_drawdown", 0),
            "volatility": metrics.get("volatility", 0),
        }

        prompt = self.prompts.format_summary_prompt(prompt_data)
        return self.provider.generate(prompt)

    def compare_funds(self, fund_codes: list[str], funds_data: list[dict]) -> str:
        """
        对比分析多只基金

        Args:
            fund_codes: 基金代码列表
            funds_data: 基金数据列表

        Returns:
            AI生成的对比分析报告
        """
        funds_text = []
        for i, data in enumerate(funds_data):
            code = fund_codes[i] if i < len(fund_codes) else data.get("code", "")
            info = data.get("info", {})
            metrics = data.get("metrics", {})

            fund_text = f"""
基金 {i + 1}: {code}
- 名称: {info.get('name', '未知')}
- 类型: {info.get('type', '未知')}
- 年化收益: {metrics.get('cagr', 0)}%
- 夏普比率: {metrics.get('sharpe_ratio', 0)}
- 最大回撤: {metrics.get('max_drawdown', 0)}%
- 波动率: {metrics.get('volatility', 0)}%
"""
            funds_text.append(fund_text)

        prompt = self.prompts.format_compare_prompt("\n".join(funds_text))
        return self.provider.generate(prompt)

    def investment_advice(
        self, fund_code: str, fund_data: dict[str, Any], risk_profile: str
    ) -> dict[str, str]:
        """
        生成投资建议

        Args:
            fund_code: 基金代码
            fund_data: 基金数据
            risk_profile: 风险偏好 (conservative/moderate/aggressive)

        Returns:
            结构化投资建议字典
        """
        info = fund_data.get("info", {})
        metrics = fund_data.get("metrics", {})

        fund_info = {
            "name": info.get("name", "未知"),
            "code": fund_code,
            "type": info.get("type", "未知"),
            "cagr": metrics.get("cagr", 0),
            "sharpe": metrics.get("sharpe_ratio", 0),
            "max_drawdown": metrics.get("max_drawdown", 0),
        }

        prompt = self.prompts.format_investment_advice_prompt(fund_info, risk_profile)
        response = self.provider.generate(prompt)

        return self._parse_json_response(response)

    def risk_assessment(
        self, fund_code: str, fund_data: dict[str, Any], detailed: bool = False
    ) -> dict[str, str]:
        """
        深度风险评估

        Args:
            fund_code: 基金代码
            fund_data: 基金数据
            detailed: 是否生成详细分析

        Returns:
            风险评估结果字典
        """
        info = fund_data.get("info", {})
        nav_data = fund_data.get("nav", [])

        # 计算风险指标
        risk_metrics = self._calculate_risk_metrics(nav_data)

        fund_info = {
            "name": info.get("name", "未知"),
            "code": fund_code,
            "max_drawdown": risk_metrics.get("max_drawdown", 0),
            "volatility": risk_metrics.get("volatility", 0),
            "downside_deviation": risk_metrics.get("downside_deviation", 0),
            "sortino_ratio": risk_metrics.get("sortino_ratio", 0),
            "beta": risk_metrics.get("beta", 0),
            "risk_events": "无重大风险事件",  # 可从历史数据中提取
        }

        prompt = self.prompts.format_risk_assessment_prompt(fund_info, detailed)
        response = self.provider.generate(prompt)

        return self._parse_json_response(response)

    def market_insight(
        self, fund_code: str, fund_data: dict[str, Any], market_context: str | None = None
    ) -> str:
        """
        市场解读分析

        Args:
            fund_code: 基金代码
            fund_data: 基金数据
            market_context: 市场环境描述(可选)

        Returns:
            市场解读文本
        """
        info = fund_data.get("info", {})
        nav_data = fund_data.get("nav", [])

        # 计算近期收益
        recent_return = 0
        if len(nav_data) >= 2:
            recent_return = (nav_data[-1] - nav_data[0]) / nav_data[0] * 100

        fund_info = {
            "name": info.get("name", "未知"),
            "code": fund_code,
            "recent_return": recent_return,
            "rank_in_category": "前30%",  # 可从数据中获取
        }

        prompt = self.prompts.format_market_insight_prompt(fund_info, market_context)
        return self.provider.generate(prompt)

    def portfolio_review(self, portfolio_data: dict[str, Any]) -> dict[str, str]:
        """
        投资组合分析

        Args:
            portfolio_data: 组合数据,包含持仓、权重、历史收益等

        Returns:
            组合分析结果
        """
        # 计算组合指标
        portfolio_metrics = self._calculate_portfolio_metrics(portfolio_data)

        portfolio_info = {
            "funds": portfolio_data.get("funds", []),
            "weights": portfolio_data.get("weights", []),
            "expected_return": portfolio_metrics.get("expected_return", 0),
            "expected_volatility": portfolio_metrics.get("expected_volatility", 0),
            "portfolio_sharpe": portfolio_metrics.get("portfolio_sharpe", 0),
        }

        prompt = self.prompts.format_portfolio_review_prompt(portfolio_info)
        response = self.provider.generate(prompt)

        return self._parse_json_response(response)

    def generate_report(
        self, fund_code: str, fund_info: dict[str, Any], metrics: dict[str, Any]
    ) -> str:
        """
        生成分析报告

        Args:
            fund_code: 基金代码
            fund_info: 基金信息
            metrics: 分析指标

        Returns:
            报告文本
        """
        prompt_data = {
            "fund_code": fund_code,
            "fund_name": fund_info.get("name", "未知"),
            "fund_type": fund_info.get("type", "未知"),
            "manager": fund_info.get("manager", "未知"),
            "total_return": metrics.get("total_return", 0),
            "cagr": metrics.get("cagr", 0),
            "sharpe": metrics.get("sharpe_ratio", 0),
            "max_drawdown": metrics.get("max_drawdown", 0),
            "volatility": metrics.get("volatility", 0),
        }

        prompt = self.prompts.format_summary_prompt(prompt_data)
        return self.provider.generate(prompt)

    def _parse_json_response(self, response: str) -> dict[str, str]:
        """
        解析JSON格式的AI响应

        Args:
            response: AI响应文本

        Returns:
            解析后的字典
        """
        # 尝试提取JSON内容
        json_match = re.search(r"\{[\s\S]*\}", response)
        if json_match:
            try:
                return json.loads(json_match.group())
            except json.JSONDecodeError:
                pass

        # 如果JSON解析失败,返回结构化文本
        return {"raw_response": response}

    def _calculate_risk_metrics(self, nav_data: list) -> dict[str, float]:
        """
        计算风险指标

        Args:
            nav_data: 净值数据列表

        Returns:
            风险指标字典
        """
        if not nav_data or len(nav_data) < 2:
            return {
                "max_drawdown": 0,
                "volatility": 0,
                "downside_deviation": 0,
                "sortino_ratio": 0,
                "beta": 0,
            }

        import numpy as np

        # 计算收益率
        returns = np.diff(nav_data) / nav_data[:-1]

        # 最大回撤
        cumulative = np.cumprod(1 + returns)
        running_max = np.maximum.accumulate(cumulative)
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = np.min(drawdown) * 100 if len(drawdown) > 0 else 0

        # 波动率
        volatility = np.std(returns) * np.sqrt(252) * 100 if len(returns) > 0 else 0

        # 下行标准差
        downside_returns = returns[returns < 0]
        downside_deviation = (
            np.std(downside_returns) * np.sqrt(252) * 100 if len(downside_returns) > 0 else 0
        )

        # 索提诺比率
        avg_return = np.mean(returns) * 252 * 100 if len(returns) > 0 else 0
        sortino_ratio = avg_return / downside_deviation if downside_deviation > 0 else 0

        # Beta (简化计算,假设市场收益率为0)
        beta = 1.0

        return {
            "max_drawdown": abs(max_drawdown),
            "volatility": volatility,
            "downside_deviation": downside_deviation,
            "sortino_ratio": sortino_ratio,  # type: ignore[dict-item]
            "beta": beta,
        }

    def _calculate_portfolio_metrics(self, portfolio_data: dict) -> dict[str, float]:
        """
        计算组合指标

        Args:
            portfolio_data: 组合数据

        Returns:
            组合指标字典
        """
        funds = portfolio_data.get("funds", [])
        weights = portfolio_data.get("weights", [])

        if not funds or not weights:
            return {
                "expected_return": 0,
                "expected_volatility": 0,
                "portfolio_sharpe": 0,
            }

        # 计算加权平均收益
        total_return = 0
        total_sharpe = 0

        for i, fund in enumerate(funds):
            weight = weights[i] if i < len(weights) else 0
            metrics = fund.get("metrics", {})
            total_return += metrics.get("cagr", 0) * weight
            total_sharpe += metrics.get("sharpe_ratio", 0) * weight

        # 简化计算,假设波动率为10%
        volatility = 10.0

        return {
            "expected_return": total_return,
            "expected_volatility": volatility,
            "portfolio_sharpe": total_sharpe,
        }

__init__

__init__(provider: LLMProvider | None = None)

初始化 AI 分析服务

参数:

名称 类型 描述 默认
provider LLMProvider | None

LLM 提供商实例,如果为None则使用默认配置创建

None
源代码位于: src/fund_cli/ai/analyzer.py
def __init__(self, provider: LLMProvider | None = None):
    """
    初始化 AI 分析服务

    Args:
        provider: LLM 提供商实例,如果为None则使用默认配置创建
    """
    self.provider = provider or get_provider()
    self.prompts = PromptTemplates()

summarize_fund

summarize_fund(
    fund_code: str, fund_data: dict[str, Any]
) -> str

生成基金分析摘要

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
fund_data dict[str, Any]

基金数据字典,包含info、nav、metrics等

必需

返回:

类型 描述
str

AI生成的基金摘要文本

源代码位于: src/fund_cli/ai/analyzer.py
def summarize_fund(self, fund_code: str, fund_data: dict[str, Any]) -> str:
    """
    生成基金分析摘要

    Args:
        fund_code: 基金代码
        fund_data: 基金数据字典,包含info、nav、metrics等

    Returns:
        AI生成的基金摘要文本
    """
    info = fund_data.get("info", {})
    metrics = fund_data.get("metrics", {})

    prompt_data = {
        "fund_code": fund_code,
        "fund_name": info.get("name", "未知"),
        "fund_type": info.get("type", "未知"),
        "manager": info.get("manager", "未知"),
        "total_return": metrics.get("total_return", 0),
        "cagr": metrics.get("cagr", 0),
        "sharpe": metrics.get("sharpe_ratio", 0),
        "max_drawdown": metrics.get("max_drawdown", 0),
        "volatility": metrics.get("volatility", 0),
    }

    prompt = self.prompts.format_summary_prompt(prompt_data)
    return self.provider.generate(prompt)

compare_funds

compare_funds(
    fund_codes: list[str], funds_data: list[dict]
) -> str

对比分析多只基金

参数:

名称 类型 描述 默认
fund_codes list[str]

基金代码列表

必需
funds_data list[dict]

基金数据列表

必需

返回:

类型 描述
str

AI生成的对比分析报告

源代码位于: src/fund_cli/ai/analyzer.py
    def compare_funds(self, fund_codes: list[str], funds_data: list[dict]) -> str:
        """
        对比分析多只基金

        Args:
            fund_codes: 基金代码列表
            funds_data: 基金数据列表

        Returns:
            AI生成的对比分析报告
        """
        funds_text = []
        for i, data in enumerate(funds_data):
            code = fund_codes[i] if i < len(fund_codes) else data.get("code", "")
            info = data.get("info", {})
            metrics = data.get("metrics", {})

            fund_text = f"""
基金 {i + 1}: {code}
- 名称: {info.get('name', '未知')}
- 类型: {info.get('type', '未知')}
- 年化收益: {metrics.get('cagr', 0)}%
- 夏普比率: {metrics.get('sharpe_ratio', 0)}
- 最大回撤: {metrics.get('max_drawdown', 0)}%
- 波动率: {metrics.get('volatility', 0)}%
"""
            funds_text.append(fund_text)

        prompt = self.prompts.format_compare_prompt("\n".join(funds_text))
        return self.provider.generate(prompt)

investment_advice

investment_advice(
    fund_code: str,
    fund_data: dict[str, Any],
    risk_profile: str,
) -> dict[str, str]

生成投资建议

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
fund_data dict[str, Any]

基金数据

必需
risk_profile str

风险偏好 (conservative/moderate/aggressive)

必需

返回:

类型 描述
dict[str, str]

结构化投资建议字典

源代码位于: src/fund_cli/ai/analyzer.py
def investment_advice(
    self, fund_code: str, fund_data: dict[str, Any], risk_profile: str
) -> dict[str, str]:
    """
    生成投资建议

    Args:
        fund_code: 基金代码
        fund_data: 基金数据
        risk_profile: 风险偏好 (conservative/moderate/aggressive)

    Returns:
        结构化投资建议字典
    """
    info = fund_data.get("info", {})
    metrics = fund_data.get("metrics", {})

    fund_info = {
        "name": info.get("name", "未知"),
        "code": fund_code,
        "type": info.get("type", "未知"),
        "cagr": metrics.get("cagr", 0),
        "sharpe": metrics.get("sharpe_ratio", 0),
        "max_drawdown": metrics.get("max_drawdown", 0),
    }

    prompt = self.prompts.format_investment_advice_prompt(fund_info, risk_profile)
    response = self.provider.generate(prompt)

    return self._parse_json_response(response)

risk_assessment

risk_assessment(
    fund_code: str,
    fund_data: dict[str, Any],
    detailed: bool = False,
) -> dict[str, str]

深度风险评估

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
fund_data dict[str, Any]

基金数据

必需
detailed bool

是否生成详细分析

False

返回:

类型 描述
dict[str, str]

风险评估结果字典

源代码位于: src/fund_cli/ai/analyzer.py
def risk_assessment(
    self, fund_code: str, fund_data: dict[str, Any], detailed: bool = False
) -> dict[str, str]:
    """
    深度风险评估

    Args:
        fund_code: 基金代码
        fund_data: 基金数据
        detailed: 是否生成详细分析

    Returns:
        风险评估结果字典
    """
    info = fund_data.get("info", {})
    nav_data = fund_data.get("nav", [])

    # 计算风险指标
    risk_metrics = self._calculate_risk_metrics(nav_data)

    fund_info = {
        "name": info.get("name", "未知"),
        "code": fund_code,
        "max_drawdown": risk_metrics.get("max_drawdown", 0),
        "volatility": risk_metrics.get("volatility", 0),
        "downside_deviation": risk_metrics.get("downside_deviation", 0),
        "sortino_ratio": risk_metrics.get("sortino_ratio", 0),
        "beta": risk_metrics.get("beta", 0),
        "risk_events": "无重大风险事件",  # 可从历史数据中提取
    }

    prompt = self.prompts.format_risk_assessment_prompt(fund_info, detailed)
    response = self.provider.generate(prompt)

    return self._parse_json_response(response)

market_insight

market_insight(
    fund_code: str,
    fund_data: dict[str, Any],
    market_context: str | None = None,
) -> str

市场解读分析

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
fund_data dict[str, Any]

基金数据

必需
market_context str | None

市场环境描述(可选)

None

返回:

类型 描述
str

市场解读文本

源代码位于: src/fund_cli/ai/analyzer.py
def market_insight(
    self, fund_code: str, fund_data: dict[str, Any], market_context: str | None = None
) -> str:
    """
    市场解读分析

    Args:
        fund_code: 基金代码
        fund_data: 基金数据
        market_context: 市场环境描述(可选)

    Returns:
        市场解读文本
    """
    info = fund_data.get("info", {})
    nav_data = fund_data.get("nav", [])

    # 计算近期收益
    recent_return = 0
    if len(nav_data) >= 2:
        recent_return = (nav_data[-1] - nav_data[0]) / nav_data[0] * 100

    fund_info = {
        "name": info.get("name", "未知"),
        "code": fund_code,
        "recent_return": recent_return,
        "rank_in_category": "前30%",  # 可从数据中获取
    }

    prompt = self.prompts.format_market_insight_prompt(fund_info, market_context)
    return self.provider.generate(prompt)

portfolio_review

portfolio_review(
    portfolio_data: dict[str, Any],
) -> dict[str, str]

投资组合分析

参数:

名称 类型 描述 默认
portfolio_data dict[str, Any]

组合数据,包含持仓、权重、历史收益等

必需

返回:

类型 描述
dict[str, str]

组合分析结果

源代码位于: src/fund_cli/ai/analyzer.py
def portfolio_review(self, portfolio_data: dict[str, Any]) -> dict[str, str]:
    """
    投资组合分析

    Args:
        portfolio_data: 组合数据,包含持仓、权重、历史收益等

    Returns:
        组合分析结果
    """
    # 计算组合指标
    portfolio_metrics = self._calculate_portfolio_metrics(portfolio_data)

    portfolio_info = {
        "funds": portfolio_data.get("funds", []),
        "weights": portfolio_data.get("weights", []),
        "expected_return": portfolio_metrics.get("expected_return", 0),
        "expected_volatility": portfolio_metrics.get("expected_volatility", 0),
        "portfolio_sharpe": portfolio_metrics.get("portfolio_sharpe", 0),
    }

    prompt = self.prompts.format_portfolio_review_prompt(portfolio_info)
    response = self.provider.generate(prompt)

    return self._parse_json_response(response)

generate_report

generate_report(
    fund_code: str,
    fund_info: dict[str, Any],
    metrics: dict[str, Any],
) -> str

生成分析报告

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
fund_info dict[str, Any]

基金信息

必需
metrics dict[str, Any]

分析指标

必需

返回:

类型 描述
str

报告文本

源代码位于: src/fund_cli/ai/analyzer.py
def generate_report(
    self, fund_code: str, fund_info: dict[str, Any], metrics: dict[str, Any]
) -> str:
    """
    生成分析报告

    Args:
        fund_code: 基金代码
        fund_info: 基金信息
        metrics: 分析指标

    Returns:
        报告文本
    """
    prompt_data = {
        "fund_code": fund_code,
        "fund_name": fund_info.get("name", "未知"),
        "fund_type": fund_info.get("type", "未知"),
        "manager": fund_info.get("manager", "未知"),
        "total_return": metrics.get("total_return", 0),
        "cagr": metrics.get("cagr", 0),
        "sharpe": metrics.get("sharpe_ratio", 0),
        "max_drawdown": metrics.get("max_drawdown", 0),
        "volatility": metrics.get("volatility", 0),
    }

    prompt = self.prompts.format_summary_prompt(prompt_data)
    return self.provider.generate(prompt)

LLMProvider

Bases: ABC


              flowchart TD
              fund_cli.ai.providers.LLMProvider[LLMProvider]

              

              click fund_cli.ai.providers.LLMProvider href "" "fund_cli.ai.providers.LLMProvider"
            

LLM 提供商抽象基类

源代码位于: src/fund_cli/ai/providers.py
class LLMProvider(ABC):
    """LLM 提供商抽象基类"""

    def __init__(self, config: AIConfig):
        self.config = config

    @abstractmethod
    def generate(self, prompt: str, **kwargs: Any) -> str:
        """生成文本

        Args:
            prompt: 输入提示词
            **kwargs: 额外参数

        Returns:
            生成的文本
        """
        pass

    @abstractmethod
    def is_available(self) -> bool:
        """检查提供商是否可用

        Returns:
            是否可用
        """
        pass

    @abstractmethod
    def validate_config(self) -> bool:
        """验证配置是否有效

        Returns:
            配置是否有效
        """
        pass

generate abstractmethod

generate(prompt: str, **kwargs: Any) -> str

生成文本

参数:

名称 类型 描述 默认
prompt str

输入提示词

必需
**kwargs Any

额外参数

{}

返回:

类型 描述
str

生成的文本

源代码位于: src/fund_cli/ai/providers.py
@abstractmethod
def generate(self, prompt: str, **kwargs: Any) -> str:
    """生成文本

    Args:
        prompt: 输入提示词
        **kwargs: 额外参数

    Returns:
        生成的文本
    """
    pass

is_available abstractmethod

is_available() -> bool

检查提供商是否可用

返回:

类型 描述
bool

是否可用

源代码位于: src/fund_cli/ai/providers.py
@abstractmethod
def is_available(self) -> bool:
    """检查提供商是否可用

    Returns:
        是否可用
    """
    pass

validate_config abstractmethod

validate_config() -> bool

验证配置是否有效

返回:

类型 描述
bool

配置是否有效

源代码位于: src/fund_cli/ai/providers.py
@abstractmethod
def validate_config(self) -> bool:
    """验证配置是否有效

    Returns:
        配置是否有效
    """
    pass

OpenAIProvider

Bases: LLMProvider


              flowchart TD
              fund_cli.ai.providers.OpenAIProvider[OpenAIProvider]
              fund_cli.ai.providers.LLMProvider[LLMProvider]

                              fund_cli.ai.providers.LLMProvider --> fund_cli.ai.providers.OpenAIProvider
                


              click fund_cli.ai.providers.OpenAIProvider href "" "fund_cli.ai.providers.OpenAIProvider"
              click fund_cli.ai.providers.LLMProvider href "" "fund_cli.ai.providers.LLMProvider"
            

OpenAI 提供商

源代码位于: src/fund_cli/ai/providers.py
class OpenAIProvider(LLMProvider):
    """OpenAI 提供商"""

    def __init__(self, config: AIConfig):
        super().__init__(config)
        self.api_key = config.api_key
        self.model = config.model or "gpt-4"
        self.base_url = config.api_base or "https://api.openai.com/v1"

    def generate(self, prompt: str, **kwargs: Any) -> str:
        """使用 OpenAI API 生成文本"""
        if not self.api_key:
            raise ValueError("OpenAI API key not configured")

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }

        payload = {
            "model": kwargs.get("model", self.model),
            "messages": [{"role": "user", "content": prompt}],
            "temperature": kwargs.get("temperature", self.config.temperature),
            "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
        }

        response = self._call_with_retry(f"{self.base_url}/chat/completions", headers, payload)

        if response and "choices" in response:
            return response["choices"][0]["message"]["content"]
        raise RuntimeError("Invalid response from OpenAI API")

    def is_available(self) -> bool:
        """检查OpenAI API是否可用"""
        if not self.api_key:
            return False
        try:
            headers = {"Authorization": f"Bearer {self.api_key}"}
            response = requests.get(
                f"{self.base_url}/models",
                headers=headers,
                timeout=self.config.timeout,
            )
            return response.status_code == 200
        except Exception:
            return False

    def validate_config(self) -> bool:
        """验证配置"""
        return bool(self.api_key and self.model)

    def _call_with_retry(self, url: str, headers: dict, payload: dict) -> dict | None:
        """带重试的API调用"""
        last_error = None
        for attempt in range(self.config.retry_count):
            try:
                response = requests.post(
                    url,
                    headers=headers,
                    json=payload,
                    timeout=self.config.timeout,
                )
                response.raise_for_status()
                return response.json()
            except requests.RequestException as e:
                last_error = e
                if attempt < self.config.retry_count - 1:
                    time.sleep(2**attempt)  # 指数退避
        raise RuntimeError(
            f"API call failed after {self.config.retry_count} attempts: {last_error}"
        )

generate

generate(prompt: str, **kwargs: Any) -> str

使用 OpenAI API 生成文本

源代码位于: src/fund_cli/ai/providers.py
def generate(self, prompt: str, **kwargs: Any) -> str:
    """使用 OpenAI API 生成文本"""
    if not self.api_key:
        raise ValueError("OpenAI API key not configured")

    headers = {
        "Authorization": f"Bearer {self.api_key}",
        "Content-Type": "application/json",
    }

    payload = {
        "model": kwargs.get("model", self.model),
        "messages": [{"role": "user", "content": prompt}],
        "temperature": kwargs.get("temperature", self.config.temperature),
        "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
    }

    response = self._call_with_retry(f"{self.base_url}/chat/completions", headers, payload)

    if response and "choices" in response:
        return response["choices"][0]["message"]["content"]
    raise RuntimeError("Invalid response from OpenAI API")

is_available

is_available() -> bool

检查OpenAI API是否可用

源代码位于: src/fund_cli/ai/providers.py
def is_available(self) -> bool:
    """检查OpenAI API是否可用"""
    if not self.api_key:
        return False
    try:
        headers = {"Authorization": f"Bearer {self.api_key}"}
        response = requests.get(
            f"{self.base_url}/models",
            headers=headers,
            timeout=self.config.timeout,
        )
        return response.status_code == 200
    except Exception:
        return False

validate_config

validate_config() -> bool

验证配置

源代码位于: src/fund_cli/ai/providers.py
def validate_config(self) -> bool:
    """验证配置"""
    return bool(self.api_key and self.model)

QwenProvider

Bases: LLMProvider


              flowchart TD
              fund_cli.ai.providers.QwenProvider[QwenProvider]
              fund_cli.ai.providers.LLMProvider[LLMProvider]

                              fund_cli.ai.providers.LLMProvider --> fund_cli.ai.providers.QwenProvider
                


              click fund_cli.ai.providers.QwenProvider href "" "fund_cli.ai.providers.QwenProvider"
              click fund_cli.ai.providers.LLMProvider href "" "fund_cli.ai.providers.LLMProvider"
            

阿里云Qwen 提供商

源代码位于: src/fund_cli/ai/providers.py
class QwenProvider(LLMProvider):
    """阿里云Qwen 提供商"""

    def __init__(self, config: AIConfig):
        super().__init__(config)
        self.api_key = config.qwen_api_key or config.api_key
        self.model = config.qwen_model or "qwen-max"
        self.base_url = config.qwen_base_url or "https://dashscope.aliyuncs.com/api/v1"

    def generate(self, prompt: str, **kwargs: Any) -> str:
        """使用 Qwen API 生成文本"""
        if not self.api_key:
            raise ValueError("Qwen API key not configured")

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }

        payload = {
            "model": kwargs.get("model", self.model),
            "input": {"messages": [{"role": "user", "content": prompt}]},
            "parameters": {
                "temperature": kwargs.get("temperature", self.config.temperature),
                "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
                "result_format": "message",
            },
        }

        response = self._call_with_retry(
            f"{self.base_url}/services/aigc/text-generation/generation",
            headers,
            payload,
        )

        if response and "output" in response:
            return response["output"]["choices"][0]["message"]["content"]
        raise RuntimeError("Invalid response from Qwen API")

    def is_available(self) -> bool:
        """检查Qwen API是否可用"""
        if not self.api_key:
            return False
        try:
            headers = {"Authorization": f"Bearer {self.api_key}"}
            # 使用简单的模型列表接口测试
            response = requests.get(
                f"{self.base_url}/models",
                headers=headers,
                timeout=self.config.timeout,
            )
            return response.status_code == 200
        except Exception:
            return False

    def validate_config(self) -> bool:
        """验证配置"""
        return bool(self.api_key and self.model)

    def _call_with_retry(self, url: str, headers: dict, payload: dict) -> dict | None:
        """带重试的API调用"""
        last_error = None
        for attempt in range(self.config.retry_count):
            try:
                response = requests.post(
                    url,
                    headers=headers,
                    json=payload,
                    timeout=self.config.timeout,
                )
                response.raise_for_status()
                return response.json()
            except requests.RequestException as e:
                last_error = e
                if attempt < self.config.retry_count - 1:
                    time.sleep(2**attempt)  # 指数退避
        raise RuntimeError(
            f"API call failed after {self.config.retry_count} attempts: {last_error}"
        )

generate

generate(prompt: str, **kwargs: Any) -> str

使用 Qwen API 生成文本

源代码位于: src/fund_cli/ai/providers.py
def generate(self, prompt: str, **kwargs: Any) -> str:
    """使用 Qwen API 生成文本"""
    if not self.api_key:
        raise ValueError("Qwen API key not configured")

    headers = {
        "Authorization": f"Bearer {self.api_key}",
        "Content-Type": "application/json",
    }

    payload = {
        "model": kwargs.get("model", self.model),
        "input": {"messages": [{"role": "user", "content": prompt}]},
        "parameters": {
            "temperature": kwargs.get("temperature", self.config.temperature),
            "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
            "result_format": "message",
        },
    }

    response = self._call_with_retry(
        f"{self.base_url}/services/aigc/text-generation/generation",
        headers,
        payload,
    )

    if response and "output" in response:
        return response["output"]["choices"][0]["message"]["content"]
    raise RuntimeError("Invalid response from Qwen API")

is_available

is_available() -> bool

检查Qwen API是否可用

源代码位于: src/fund_cli/ai/providers.py
def is_available(self) -> bool:
    """检查Qwen API是否可用"""
    if not self.api_key:
        return False
    try:
        headers = {"Authorization": f"Bearer {self.api_key}"}
        # 使用简单的模型列表接口测试
        response = requests.get(
            f"{self.base_url}/models",
            headers=headers,
            timeout=self.config.timeout,
        )
        return response.status_code == 200
    except Exception:
        return False

validate_config

validate_config() -> bool

验证配置

源代码位于: src/fund_cli/ai/providers.py
def validate_config(self) -> bool:
    """验证配置"""
    return bool(self.api_key and self.model)

LiteLLMProvider

Bases: LLMProvider


              flowchart TD
              fund_cli.ai.providers.LiteLLMProvider[LiteLLMProvider]
              fund_cli.ai.providers.LLMProvider[LLMProvider]

                              fund_cli.ai.providers.LLMProvider --> fund_cli.ai.providers.LiteLLMProvider
                


              click fund_cli.ai.providers.LiteLLMProvider href "" "fund_cli.ai.providers.LiteLLMProvider"
              click fund_cli.ai.providers.LLMProvider href "" "fund_cli.ai.providers.LLMProvider"
            

LiteLLM 统一封装提供商

源代码位于: src/fund_cli/ai/providers.py
class LiteLLMProvider(LLMProvider):
    """LiteLLM 统一封装提供商"""

    def __init__(self, config: AIConfig):
        super().__init__(config)
        self.api_key = config.api_key
        self.model = config.model or "gpt-4"

    def generate(self, prompt: str, **kwargs: Any) -> str:
        """使用 LiteLLM 生成文本"""
        try:
            import litellm

            litellm.api_key = self.api_key

            response = litellm.completion(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=kwargs.get("temperature", self.config.temperature),
                max_tokens=kwargs.get("max_tokens", self.config.max_tokens),
            )
            return response.choices[0].message.content
        except ImportError as err:
            raise RuntimeError("litellm not installed. Install with: pip install litellm") from err
        except Exception as e:
            raise RuntimeError(f"LiteLLM API error: {e}") from e

    def is_available(self) -> bool:
        """检查LiteLLM是否可用"""
        try:
            import importlib.util

            spec = importlib.util.find_spec("litellm")
            if spec is None:
                return False
            return bool(self.api_key)
        except ImportError:
            return False

    def validate_config(self) -> bool:
        """验证配置"""
        return bool(self.api_key and self.model)

generate

generate(prompt: str, **kwargs: Any) -> str

使用 LiteLLM 生成文本

源代码位于: src/fund_cli/ai/providers.py
def generate(self, prompt: str, **kwargs: Any) -> str:
    """使用 LiteLLM 生成文本"""
    try:
        import litellm

        litellm.api_key = self.api_key

        response = litellm.completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=kwargs.get("temperature", self.config.temperature),
            max_tokens=kwargs.get("max_tokens", self.config.max_tokens),
        )
        return response.choices[0].message.content
    except ImportError as err:
        raise RuntimeError("litellm not installed. Install with: pip install litellm") from err
    except Exception as e:
        raise RuntimeError(f"LiteLLM API error: {e}") from e

is_available

is_available() -> bool

检查LiteLLM是否可用

源代码位于: src/fund_cli/ai/providers.py
def is_available(self) -> bool:
    """检查LiteLLM是否可用"""
    try:
        import importlib.util

        spec = importlib.util.find_spec("litellm")
        if spec is None:
            return False
        return bool(self.api_key)
    except ImportError:
        return False

validate_config

validate_config() -> bool

验证配置

源代码位于: src/fund_cli/ai/providers.py
def validate_config(self) -> bool:
    """验证配置"""
    return bool(self.api_key and self.model)