跳转至

分析模块 API

PerformanceAnalyzer

Bases: Analyzer


              flowchart TD
              fund_cli.analysis.performance.PerformanceAnalyzer[PerformanceAnalyzer]
              fund_cli.core.analyzer.Analyzer[Analyzer]

                              fund_cli.core.analyzer.Analyzer --> fund_cli.analysis.performance.PerformanceAnalyzer
                


              click fund_cli.analysis.performance.PerformanceAnalyzer href "" "fund_cli.analysis.performance.PerformanceAnalyzer"
              click fund_cli.core.analyzer.Analyzer href "" "fund_cli.core.analyzer.Analyzer"
            

业绩分析引擎

使用 QuantStats 库计算专业业绩指标,包括: - 收益指标:总收益、年化收益、累计收益 - 风险指标:波动率、VaR、CVaR - 风险调整收益:夏普比率、索提诺比率、卡玛比率 - 相对指标:Alpha、Beta、信息比率、跟踪误差

源代码位于: src/fund_cli/analysis/performance.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
class PerformanceAnalyzer(Analyzer):
    """
    业绩分析引擎

    使用 QuantStats 库计算专业业绩指标,包括:
    - 收益指标:总收益、年化收益、累计收益
    - 风险指标:波动率、VaR、CVaR
    - 风险调整收益:夏普比率、索提诺比率、卡玛比率
    - 相对指标:Alpha、Beta、信息比率、跟踪误差
    """

    def __init__(self, risk_free_rate: float | None = None):
        """
        初始化业绩分析引擎

        Args:
            risk_free_rate: 无风险利率,默认从配置读取
        """
        config = get_config()
        self.risk_free_rate = risk_free_rate or config.analysis.risk_free_rate
        self._qs = None

    def _get_quantstats(self):
        """延迟加载 QuantStats"""
        if self._qs is None:
            try:
                import quantstats as qs

                self._qs = qs
            except ImportError as e:
                raise ImportError("QuantStats 未安装,请运行: pip install quantstats") from e
        return self._qs

    def analyze(  # type: ignore[override]
        self,
        returns: pd.Series,
        benchmark: pd.Series | None = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        执行业绩分析

        Args:
            returns: 收益率序列(日频)
            benchmark: 基准收益率序列(可选)
            **kwargs: 额外参数

        Returns:
            分析结果字典
        """
        qs = self._get_quantstats()

        # 确保输入为 Series
        if isinstance(returns, pd.DataFrame):
            returns = returns.iloc[:, 0]

        # 清理数据
        returns = returns.dropna()

        # 基础收益指标
        metrics = {
            # 收益指标
            "total_return": self._safe_calc(qs.stats.comp, returns) * 100,
            "cagr": self._safe_calc(qs.stats.cagr, returns) * 100,
            "mean_return": returns.mean() * 252 * 100,
            # 风险指标
            "volatility": self._safe_calc(qs.stats.volatility, returns) * 100,
            "max_drawdown": self._safe_calc(qs.stats.max_drawdown, returns) * 100,
            "var_95": self._safe_calc(qs.stats.var, returns) * 100,
            "cvar_95": self._safe_calc(qs.stats.cvar, returns) * 100,
            # 风险调整收益
            "sharpe": self._safe_calc(qs.stats.sharpe, returns, rf=self.risk_free_rate),
            "sortino": self._safe_calc(qs.stats.sortino, returns),
            "calmar": self._safe_calc(qs.stats.calmar, returns),
            # 其他指标
            "skew": self._safe_calc(qs.stats.skew, returns),
            "kurtosis": self._safe_calc(qs.stats.kurtosis, returns),
            "best_day": returns.max() * 100 if not returns.empty else 0,
            "worst_day": returns.min() * 100 if not returns.empty else 0,
            "avg_win": self._safe_calc(qs.stats.avg_win, returns) * 100,
            "avg_loss": self._safe_calc(qs.stats.avg_loss, returns) * 100,
            "win_rate": self._safe_calc(qs.stats.win_rate, returns) * 100,
        }

        # 相对指标(如果有基准)
        if benchmark is not None:
            if isinstance(benchmark, pd.DataFrame):
                benchmark = benchmark.iloc[:, 0]
            benchmark = benchmark.dropna()

            # 对齐日期
            common_dates = returns.index.intersection(benchmark.index)
            if len(common_dates) > 0:
                returns_aligned = returns.loc[common_dates]
                benchmark_aligned = benchmark.loc[common_dates]

                try:
                    greeks = qs.stats.greeks(returns_aligned, benchmark_aligned)
                    if greeks is not None:
                        metrics["alpha"] = float(greeks.iloc[0])
                        metrics["beta"] = float(greeks.iloc[1])
                    else:
                        metrics["alpha"] = None
                        metrics["beta"] = None
                except Exception:
                    metrics["alpha"] = None
                    metrics["beta"] = None

                # tracking_error / information_ratio / r_squared 在部分版本不可用
                # 使用 hasattr 检查,因为属性访问时会抛出 AttributeError
                if hasattr(qs.stats, "tracking_error"):
                    metrics["tracking_error"] = self._safe_calc(
                        qs.stats.tracking_error, returns_aligned, benchmark_aligned
                    )
                    if metrics["tracking_error"] is not None and not (
                        isinstance(metrics["tracking_error"], float)
                        and metrics["tracking_error"] != metrics["tracking_error"]
                    ):
                        metrics["tracking_error"] = metrics["tracking_error"] * 100
                    else:
                        metrics["tracking_error"] = None
                else:
                    metrics["tracking_error"] = None

                if metrics["tracking_error"] is None:
                    # 手动计算 tracking_error
                    excess = returns_aligned - benchmark_aligned
                    metrics["tracking_error"] = float(excess.std() * np.sqrt(252)) * 100

                if hasattr(qs.stats, "information_ratio"):
                    metrics["information_ratio"] = self._safe_calc(
                        qs.stats.information_ratio, returns_aligned, benchmark_aligned
                    )
                else:
                    metrics["information_ratio"] = None

                if metrics["information_ratio"] is None or (
                    isinstance(metrics["information_ratio"], float)
                    and metrics["information_ratio"] != metrics["information_ratio"]
                ):
                    # 手动计算 information_ratio
                    excess = returns_aligned - benchmark_aligned
                    te = excess.std() * np.sqrt(252)
                    metrics["information_ratio"] = (
                        float(excess.mean() * 252 / te) if te > 0 else None
                    )

                if hasattr(qs.stats, "r_squared"):
                    metrics["r_squared"] = self._safe_calc(
                        qs.stats.r_squared, returns_aligned, benchmark_aligned
                    )
                else:
                    metrics["r_squared"] = None

                if metrics["r_squared"] is None or (
                    isinstance(metrics["r_squared"], float)
                    and metrics["r_squared"] != metrics["r_squared"]
                ):
                    metrics["r_squared"] = float(returns_aligned.corr(benchmark_aligned) ** 2)

        return metrics

    def _safe_calc(self, func, *args, **kwargs) -> Any:
        """安全计算,捕获异常"""
        try:
            result = func(*args, **kwargs)
            if result is None:
                return float("nan")
            return result
        except Exception:
            return float("nan")

    def get_metrics(self) -> list[str]:
        """
        获取可计算的指标列表

        Returns:
            指标名称列表
        """
        return [
            "total_return",
            "cagr",
            "volatility",
            "max_drawdown",
            "sharpe",
            "sortino",
            "calmar",
            "var_95",
            "cvar_95",
            "alpha",
            "beta",
            "tracking_error",
            "information_ratio",
        ]

    def calculate_returns(
        self,
        nav_data: pd.DataFrame,
        nav_column: str = "unit_nav",
    ) -> pd.Series:
        """
        从净值数据计算收益率

        Args:
            nav_data: 净值数据 DataFrame
            nav_column: 净值列名

        Returns:
            日收益率序列
        """
        nav = nav_data.set_index("nav_date")[nav_column]
        returns = nav.pct_change().dropna()
        returns.name = "daily_return"
        return returns

    def calculate_cumulative_return(
        self,
        returns: pd.Series,
    ) -> pd.Series:
        """
        计算累计收益率

        Args:
            returns: 日收益率序列

        Returns:
            累计收益率序列
        """
        return (1 + returns).cumprod() - 1

    def calculate_drawdown(
        self,
        returns: pd.Series,
    ) -> pd.Series:
        """
        计算回撤序列

        Args:
            returns: 日收益率序列

        Returns:
            回撤序列
        """
        wealth = (1 + returns).cumprod()
        rolling_max = wealth.cummax()
        drawdown = (wealth - rolling_max) / rolling_max
        return drawdown

    def rolling_performance(self, returns: pd.Series, window: int = 60) -> pd.DataFrame:
        """
        滚动业绩分析 (FUND-ANALYZE-006)

        Args:
            returns: 日收益率序列
            window: 滚动窗口(交易日)

        Returns:
            滚动指标 DataFrame,包含 rolling_return, rolling_sharpe, rolling_volatility, rolling_max_drawdown
        """
        if len(returns) < window:
            return pd.DataFrame()

        rolling_ret = returns.rolling(window=window).apply(lambda x: (1 + x).prod() - 1) * 100
        rolling_vol = returns.rolling(window=window).std() * np.sqrt(252) * 100
        rolling_sharpe = (
            (rolling_ret / rolling_vol) if rolling_vol.notna().any() else pd.Series(dtype=float)
        )

        def _rolling_mdd(x):
            if len(x) == 0:
                return 0
            cumprod = (1 + x).cumprod()
            cummax = cumprod.cummax()
            return ((cummax - cumprod) / cummax).min() * 100

        rolling_mdd = returns.rolling(window=window).apply(_rolling_mdd)

        return pd.DataFrame(
            {
                "rolling_return": rolling_ret,
                "rolling_volatility": rolling_vol,
                "rolling_sharpe": rolling_sharpe,
                "rolling_max_drawdown": rolling_mdd,
            }
        ).dropna()

    def monthly_return_distribution(self, returns: pd.Series) -> dict[str, Any]:
        """
        月度收益分布 (FUND-ANALYZE-008)

        Args:
            returns: 日收益率序列

        Returns:
            月度分布统计字典
        """
        if returns.empty:
            return {
                "monthly_returns": [],
                "positive_months": 0,
                "negative_months": 0,
                "avg_monthly_return": 0,
                "max_month": 0,
                "min_month": 0,
            }

        monthly = returns.resample("ME").apply(lambda x: (1 + x).prod() - 1) * 100
        monthly = monthly.dropna()

        positive = (monthly > 0).sum()
        negative = (monthly < 0).sum()

        return {
            "total_months": len(monthly),
            "positive_months": int(positive),
            "negative_months": int(negative),
            "win_rate": round(positive / len(monthly) * 100, 1) if len(monthly) > 0 else 0,
            "avg_monthly_return": round(monthly.mean(), 4),
            "std_monthly_return": round(monthly.std(), 4),
            "max_month": round(monthly.max(), 4),
            "min_month": round(monthly.min(), 4),
            "monthly_returns": monthly.to_dict(),
        }

    def scenario_analysis(
        self, returns: pd.Series, scenarios: dict[str, Any] | None = None
    ) -> dict[str, Any]:
        """
        情景分析 (FUND-ANALYZE-009)

        预定义情景:牛市(+20%年化)、熊市(-20%年化)、震荡市(0%年化)

        Args:
            returns: 日收益率序列
            scenarios: 自定义情景 {名称: 年化收益率}

        Returns:
            情景分析结果
        """
        if scenarios is None:
            scenarios = {
                "牛市": 0.20,
                "温和牛市": 0.10,
                "震荡市": 0.0,
                "温和熊市": -0.10,
                "熊市": -0.20,
            }

        results = {}
        for name, annual_return in scenarios.items():
            daily_return = (1 + annual_return) ** (1 / 252) - 1
            n_days = len(returns)
            simulated = np.random.normal(daily_return, returns.std(), n_days)
            total = (1 + pd.Series(simulated)).prod() - 1
            vol = pd.Series(simulated).std() * np.sqrt(252)
            results[name] = {
                "annual_return": round(annual_return * 100, 2),
                "simulated_total_return": round(total * 100, 2),
                "simulated_volatility": round(vol * 100, 2),
            }

        return results

    def performance_persistence(
        self, returns: pd.Series, periods_per_year: int = 12
    ) -> dict[str, Any]:
        """
        业绩持续性分析 (FUND-ANALYZE-010)

        Args:
            returns: 日收益率序列
            periods_per_year: 每年周期数

        Returns:
            持续性分析结果
        """
        if len(returns) < periods_per_year * 2:
            return {"persistence_score": 0, "message": "数据不足"}

        monthly = returns.resample("ME").apply(lambda x: (1 + x).prod() - 1)
        monthly = monthly.dropna()

        if len(monthly) < 6:
            return {"persistence_score": 0, "message": "月度数据不足"}

        # 计算排名相关性(相邻周期排名的相关系数)
        half = len(monthly) // 2
        first_half = monthly.iloc[:half]
        second_half = monthly.iloc[half : 2 * half]

        if len(first_half) > 1 and len(second_half) > 1:
            rank_corr = first_half.rank().corr(second_half.rank())
        else:
            rank_corr = 0

        # 胜率
        win_rate = (monthly > 0).sum() / len(monthly) * 100

        # 连续正/负月数
        max_positive_streak = 0
        max_negative_streak = 0
        current_streak = 0
        current_sign = 1
        for val in monthly:
            sign = 1 if val > 0 else -1
            if sign == current_sign:
                current_streak += 1
            else:
                current_streak = 1
                current_sign = sign
            if sign == 1:
                max_positive_streak = max(max_positive_streak, current_streak)
            else:
                max_negative_streak = max(max_negative_streak, current_streak)

        persistence_score = max(0, min(100, (rank_corr + 1) * 50))

        return {
            "persistence_score": round(persistence_score, 1),
            "rank_correlation": round(rank_corr, 4),
            "monthly_win_rate": round(win_rate, 1),
            "max_positive_streak": max_positive_streak,
            "max_negative_streak": max_negative_streak,
            "total_months": len(monthly),
        }

__init__

__init__(risk_free_rate: float | None = None)

初始化业绩分析引擎

参数:

名称 类型 描述 默认
risk_free_rate float | None

无风险利率,默认从配置读取

None
源代码位于: src/fund_cli/analysis/performance.py
def __init__(self, risk_free_rate: float | None = None):
    """
    初始化业绩分析引擎

    Args:
        risk_free_rate: 无风险利率,默认从配置读取
    """
    config = get_config()
    self.risk_free_rate = risk_free_rate or config.analysis.risk_free_rate
    self._qs = None

analyze

analyze(
    returns: Series,
    benchmark: Series | None = None,
    **kwargs: Any,
) -> dict[str, Any]

执行业绩分析

参数:

名称 类型 描述 默认
returns Series

收益率序列(日频)

必需
benchmark Series | None

基准收益率序列(可选)

None
**kwargs Any

额外参数

{}

返回:

类型 描述
dict[str, Any]

分析结果字典

源代码位于: src/fund_cli/analysis/performance.py
def analyze(  # type: ignore[override]
    self,
    returns: pd.Series,
    benchmark: pd.Series | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """
    执行业绩分析

    Args:
        returns: 收益率序列(日频)
        benchmark: 基准收益率序列(可选)
        **kwargs: 额外参数

    Returns:
        分析结果字典
    """
    qs = self._get_quantstats()

    # 确保输入为 Series
    if isinstance(returns, pd.DataFrame):
        returns = returns.iloc[:, 0]

    # 清理数据
    returns = returns.dropna()

    # 基础收益指标
    metrics = {
        # 收益指标
        "total_return": self._safe_calc(qs.stats.comp, returns) * 100,
        "cagr": self._safe_calc(qs.stats.cagr, returns) * 100,
        "mean_return": returns.mean() * 252 * 100,
        # 风险指标
        "volatility": self._safe_calc(qs.stats.volatility, returns) * 100,
        "max_drawdown": self._safe_calc(qs.stats.max_drawdown, returns) * 100,
        "var_95": self._safe_calc(qs.stats.var, returns) * 100,
        "cvar_95": self._safe_calc(qs.stats.cvar, returns) * 100,
        # 风险调整收益
        "sharpe": self._safe_calc(qs.stats.sharpe, returns, rf=self.risk_free_rate),
        "sortino": self._safe_calc(qs.stats.sortino, returns),
        "calmar": self._safe_calc(qs.stats.calmar, returns),
        # 其他指标
        "skew": self._safe_calc(qs.stats.skew, returns),
        "kurtosis": self._safe_calc(qs.stats.kurtosis, returns),
        "best_day": returns.max() * 100 if not returns.empty else 0,
        "worst_day": returns.min() * 100 if not returns.empty else 0,
        "avg_win": self._safe_calc(qs.stats.avg_win, returns) * 100,
        "avg_loss": self._safe_calc(qs.stats.avg_loss, returns) * 100,
        "win_rate": self._safe_calc(qs.stats.win_rate, returns) * 100,
    }

    # 相对指标(如果有基准)
    if benchmark is not None:
        if isinstance(benchmark, pd.DataFrame):
            benchmark = benchmark.iloc[:, 0]
        benchmark = benchmark.dropna()

        # 对齐日期
        common_dates = returns.index.intersection(benchmark.index)
        if len(common_dates) > 0:
            returns_aligned = returns.loc[common_dates]
            benchmark_aligned = benchmark.loc[common_dates]

            try:
                greeks = qs.stats.greeks(returns_aligned, benchmark_aligned)
                if greeks is not None:
                    metrics["alpha"] = float(greeks.iloc[0])
                    metrics["beta"] = float(greeks.iloc[1])
                else:
                    metrics["alpha"] = None
                    metrics["beta"] = None
            except Exception:
                metrics["alpha"] = None
                metrics["beta"] = None

            # tracking_error / information_ratio / r_squared 在部分版本不可用
            # 使用 hasattr 检查,因为属性访问时会抛出 AttributeError
            if hasattr(qs.stats, "tracking_error"):
                metrics["tracking_error"] = self._safe_calc(
                    qs.stats.tracking_error, returns_aligned, benchmark_aligned
                )
                if metrics["tracking_error"] is not None and not (
                    isinstance(metrics["tracking_error"], float)
                    and metrics["tracking_error"] != metrics["tracking_error"]
                ):
                    metrics["tracking_error"] = metrics["tracking_error"] * 100
                else:
                    metrics["tracking_error"] = None
            else:
                metrics["tracking_error"] = None

            if metrics["tracking_error"] is None:
                # 手动计算 tracking_error
                excess = returns_aligned - benchmark_aligned
                metrics["tracking_error"] = float(excess.std() * np.sqrt(252)) * 100

            if hasattr(qs.stats, "information_ratio"):
                metrics["information_ratio"] = self._safe_calc(
                    qs.stats.information_ratio, returns_aligned, benchmark_aligned
                )
            else:
                metrics["information_ratio"] = None

            if metrics["information_ratio"] is None or (
                isinstance(metrics["information_ratio"], float)
                and metrics["information_ratio"] != metrics["information_ratio"]
            ):
                # 手动计算 information_ratio
                excess = returns_aligned - benchmark_aligned
                te = excess.std() * np.sqrt(252)
                metrics["information_ratio"] = (
                    float(excess.mean() * 252 / te) if te > 0 else None
                )

            if hasattr(qs.stats, "r_squared"):
                metrics["r_squared"] = self._safe_calc(
                    qs.stats.r_squared, returns_aligned, benchmark_aligned
                )
            else:
                metrics["r_squared"] = None

            if metrics["r_squared"] is None or (
                isinstance(metrics["r_squared"], float)
                and metrics["r_squared"] != metrics["r_squared"]
            ):
                metrics["r_squared"] = float(returns_aligned.corr(benchmark_aligned) ** 2)

    return metrics

get_metrics

get_metrics() -> list[str]

获取可计算的指标列表

返回:

类型 描述
list[str]

指标名称列表

源代码位于: src/fund_cli/analysis/performance.py
def get_metrics(self) -> list[str]:
    """
    获取可计算的指标列表

    Returns:
        指标名称列表
    """
    return [
        "total_return",
        "cagr",
        "volatility",
        "max_drawdown",
        "sharpe",
        "sortino",
        "calmar",
        "var_95",
        "cvar_95",
        "alpha",
        "beta",
        "tracking_error",
        "information_ratio",
    ]

calculate_returns

calculate_returns(
    nav_data: DataFrame, nav_column: str = "unit_nav"
) -> pd.Series

从净值数据计算收益率

参数:

名称 类型 描述 默认
nav_data DataFrame

净值数据 DataFrame

必需
nav_column str

净值列名

'unit_nav'

返回:

类型 描述
Series

日收益率序列

源代码位于: src/fund_cli/analysis/performance.py
def calculate_returns(
    self,
    nav_data: pd.DataFrame,
    nav_column: str = "unit_nav",
) -> pd.Series:
    """
    从净值数据计算收益率

    Args:
        nav_data: 净值数据 DataFrame
        nav_column: 净值列名

    Returns:
        日收益率序列
    """
    nav = nav_data.set_index("nav_date")[nav_column]
    returns = nav.pct_change().dropna()
    returns.name = "daily_return"
    return returns

calculate_cumulative_return

calculate_cumulative_return(returns: Series) -> pd.Series

计算累计收益率

参数:

名称 类型 描述 默认
returns Series

日收益率序列

必需

返回:

类型 描述
Series

累计收益率序列

源代码位于: src/fund_cli/analysis/performance.py
def calculate_cumulative_return(
    self,
    returns: pd.Series,
) -> pd.Series:
    """
    计算累计收益率

    Args:
        returns: 日收益率序列

    Returns:
        累计收益率序列
    """
    return (1 + returns).cumprod() - 1

calculate_drawdown

calculate_drawdown(returns: Series) -> pd.Series

计算回撤序列

参数:

名称 类型 描述 默认
returns Series

日收益率序列

必需

返回:

类型 描述
Series

回撤序列

源代码位于: src/fund_cli/analysis/performance.py
def calculate_drawdown(
    self,
    returns: pd.Series,
) -> pd.Series:
    """
    计算回撤序列

    Args:
        returns: 日收益率序列

    Returns:
        回撤序列
    """
    wealth = (1 + returns).cumprod()
    rolling_max = wealth.cummax()
    drawdown = (wealth - rolling_max) / rolling_max
    return drawdown

rolling_performance

rolling_performance(
    returns: Series, window: int = 60
) -> pd.DataFrame

滚动业绩分析 (FUND-ANALYZE-006)

参数:

名称 类型 描述 默认
returns Series

日收益率序列

必需
window int

滚动窗口(交易日)

60

返回:

类型 描述
DataFrame

滚动指标 DataFrame,包含 rolling_return, rolling_sharpe, rolling_volatility, rolling_max_drawdown

源代码位于: src/fund_cli/analysis/performance.py
def rolling_performance(self, returns: pd.Series, window: int = 60) -> pd.DataFrame:
    """
    滚动业绩分析 (FUND-ANALYZE-006)

    Args:
        returns: 日收益率序列
        window: 滚动窗口(交易日)

    Returns:
        滚动指标 DataFrame,包含 rolling_return, rolling_sharpe, rolling_volatility, rolling_max_drawdown
    """
    if len(returns) < window:
        return pd.DataFrame()

    rolling_ret = returns.rolling(window=window).apply(lambda x: (1 + x).prod() - 1) * 100
    rolling_vol = returns.rolling(window=window).std() * np.sqrt(252) * 100
    rolling_sharpe = (
        (rolling_ret / rolling_vol) if rolling_vol.notna().any() else pd.Series(dtype=float)
    )

    def _rolling_mdd(x):
        if len(x) == 0:
            return 0
        cumprod = (1 + x).cumprod()
        cummax = cumprod.cummax()
        return ((cummax - cumprod) / cummax).min() * 100

    rolling_mdd = returns.rolling(window=window).apply(_rolling_mdd)

    return pd.DataFrame(
        {
            "rolling_return": rolling_ret,
            "rolling_volatility": rolling_vol,
            "rolling_sharpe": rolling_sharpe,
            "rolling_max_drawdown": rolling_mdd,
        }
    ).dropna()

monthly_return_distribution

monthly_return_distribution(
    returns: Series,
) -> dict[str, Any]

月度收益分布 (FUND-ANALYZE-008)

参数:

名称 类型 描述 默认
returns Series

日收益率序列

必需

返回:

类型 描述
dict[str, Any]

月度分布统计字典

源代码位于: src/fund_cli/analysis/performance.py
def monthly_return_distribution(self, returns: pd.Series) -> dict[str, Any]:
    """
    月度收益分布 (FUND-ANALYZE-008)

    Args:
        returns: 日收益率序列

    Returns:
        月度分布统计字典
    """
    if returns.empty:
        return {
            "monthly_returns": [],
            "positive_months": 0,
            "negative_months": 0,
            "avg_monthly_return": 0,
            "max_month": 0,
            "min_month": 0,
        }

    monthly = returns.resample("ME").apply(lambda x: (1 + x).prod() - 1) * 100
    monthly = monthly.dropna()

    positive = (monthly > 0).sum()
    negative = (monthly < 0).sum()

    return {
        "total_months": len(monthly),
        "positive_months": int(positive),
        "negative_months": int(negative),
        "win_rate": round(positive / len(monthly) * 100, 1) if len(monthly) > 0 else 0,
        "avg_monthly_return": round(monthly.mean(), 4),
        "std_monthly_return": round(monthly.std(), 4),
        "max_month": round(monthly.max(), 4),
        "min_month": round(monthly.min(), 4),
        "monthly_returns": monthly.to_dict(),
    }

scenario_analysis

scenario_analysis(
    returns: Series, scenarios: dict[str, Any] | None = None
) -> dict[str, Any]

情景分析 (FUND-ANALYZE-009)

预定义情景:牛市(+20%年化)、熊市(-20%年化)、震荡市(0%年化)

参数:

名称 类型 描述 默认
returns Series

日收益率序列

必需
scenarios dict[str, Any] | None

自定义情景 {名称: 年化收益率}

None

返回:

类型 描述
dict[str, Any]

情景分析结果

源代码位于: src/fund_cli/analysis/performance.py
def scenario_analysis(
    self, returns: pd.Series, scenarios: dict[str, Any] | None = None
) -> dict[str, Any]:
    """
    情景分析 (FUND-ANALYZE-009)

    预定义情景:牛市(+20%年化)、熊市(-20%年化)、震荡市(0%年化)

    Args:
        returns: 日收益率序列
        scenarios: 自定义情景 {名称: 年化收益率}

    Returns:
        情景分析结果
    """
    if scenarios is None:
        scenarios = {
            "牛市": 0.20,
            "温和牛市": 0.10,
            "震荡市": 0.0,
            "温和熊市": -0.10,
            "熊市": -0.20,
        }

    results = {}
    for name, annual_return in scenarios.items():
        daily_return = (1 + annual_return) ** (1 / 252) - 1
        n_days = len(returns)
        simulated = np.random.normal(daily_return, returns.std(), n_days)
        total = (1 + pd.Series(simulated)).prod() - 1
        vol = pd.Series(simulated).std() * np.sqrt(252)
        results[name] = {
            "annual_return": round(annual_return * 100, 2),
            "simulated_total_return": round(total * 100, 2),
            "simulated_volatility": round(vol * 100, 2),
        }

    return results

performance_persistence

performance_persistence(
    returns: Series, periods_per_year: int = 12
) -> dict[str, Any]

业绩持续性分析 (FUND-ANALYZE-010)

参数:

名称 类型 描述 默认
returns Series

日收益率序列

必需
periods_per_year int

每年周期数

12

返回:

类型 描述
dict[str, Any]

持续性分析结果

源代码位于: src/fund_cli/analysis/performance.py
def performance_persistence(
    self, returns: pd.Series, periods_per_year: int = 12
) -> dict[str, Any]:
    """
    业绩持续性分析 (FUND-ANALYZE-010)

    Args:
        returns: 日收益率序列
        periods_per_year: 每年周期数

    Returns:
        持续性分析结果
    """
    if len(returns) < periods_per_year * 2:
        return {"persistence_score": 0, "message": "数据不足"}

    monthly = returns.resample("ME").apply(lambda x: (1 + x).prod() - 1)
    monthly = monthly.dropna()

    if len(monthly) < 6:
        return {"persistence_score": 0, "message": "月度数据不足"}

    # 计算排名相关性(相邻周期排名的相关系数)
    half = len(monthly) // 2
    first_half = monthly.iloc[:half]
    second_half = monthly.iloc[half : 2 * half]

    if len(first_half) > 1 and len(second_half) > 1:
        rank_corr = first_half.rank().corr(second_half.rank())
    else:
        rank_corr = 0

    # 胜率
    win_rate = (monthly > 0).sum() / len(monthly) * 100

    # 连续正/负月数
    max_positive_streak = 0
    max_negative_streak = 0
    current_streak = 0
    current_sign = 1
    for val in monthly:
        sign = 1 if val > 0 else -1
        if sign == current_sign:
            current_streak += 1
        else:
            current_streak = 1
            current_sign = sign
        if sign == 1:
            max_positive_streak = max(max_positive_streak, current_streak)
        else:
            max_negative_streak = max(max_negative_streak, current_streak)

    persistence_score = max(0, min(100, (rank_corr + 1) * 50))

    return {
        "persistence_score": round(persistence_score, 1),
        "rank_correlation": round(rank_corr, 4),
        "monthly_win_rate": round(win_rate, 1),
        "max_positive_streak": max_positive_streak,
        "max_negative_streak": max_negative_streak,
        "total_months": len(monthly),
    }

RiskAnalyzer

Bases: Analyzer


              flowchart TD
              fund_cli.analysis.risk.RiskAnalyzer[RiskAnalyzer]
              fund_cli.core.analyzer.Analyzer[Analyzer]

                              fund_cli.core.analyzer.Analyzer --> fund_cli.analysis.risk.RiskAnalyzer
                


              click fund_cli.analysis.risk.RiskAnalyzer href "" "fund_cli.analysis.risk.RiskAnalyzer"
              click fund_cli.core.analyzer.Analyzer href "" "fund_cli.core.analyzer.Analyzer"
            

风险分析引擎

计算各类风险指标,包括: - 波动率风险:年化波动率、下行波动率 - 回撤风险:最大回撤、平均回撤、回撤持续时间 - 尾部风险:VaR、CVaR、偏度、峰度 - 相关性风险:相关性矩阵、Beta

源代码位于: src/fund_cli/analysis/risk.py
class RiskAnalyzer(Analyzer):
    """
    风险分析引擎

    计算各类风险指标,包括:
    - 波动率风险:年化波动率、下行波动率
    - 回撤风险:最大回撤、平均回撤、回撤持续时间
    - 尾部风险:VaR、CVaR、偏度、峰度
    - 相关性风险:相关性矩阵、Beta
    """

    def __init__(
        self,
        confidence_level: float = 0.95,
        periods_per_year: int = 252,
    ):
        """
        初始化风险分析引擎

        Args:
            confidence_level: VaR 置信水平
            periods_per_year: 年交易日数
        """
        self.confidence_level = confidence_level
        self.periods_per_year = periods_per_year

    def analyze(  # type: ignore[override]
        self,
        returns: pd.Series,
        benchmark: pd.Series | None = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        执行风险分析

        Args:
            returns: 收益率序列
            benchmark: 基准收益率序列(可选)
            **kwargs: 额外参数

        Returns:
            风险分析结果字典
        """
        # 确保输入为 Series
        if isinstance(returns, pd.DataFrame):
            returns = returns.iloc[:, 0]

        returns = returns.dropna()

        metrics = {
            # 波动率风险
            "volatility_annual": self.annualized_volatility(returns) * 100,
            "downside_volatility": self.downside_volatility(returns) * 100,
            # 回撤风险
            "max_drawdown": self.max_drawdown(returns) * 100,
            "avg_drawdown": self.avg_drawdown(returns) * 100,
            "max_drawdown_duration": self.max_drawdown_duration(returns),
            # 尾部风险
            "var_95": self.var(returns, 0.95) * 100,
            "var_99": self.var(returns, 0.99) * 100,
            "cvar_95": self.cvar(returns, 0.95) * 100,
            "skewness": self.skewness(returns),
            "kurtosis": self.kurtosis(returns),
            # 分布特征
            "best_day": returns.max() * 100 if not returns.empty else 0,
            "worst_day": returns.min() * 100 if not returns.empty else 0,
            "std_dev": returns.std() * 100 if not returns.empty else 0,
        }

        # 相对风险指标
        if benchmark is not None:
            if isinstance(benchmark, pd.DataFrame):
                benchmark = benchmark.iloc[:, 0]
            benchmark = benchmark.dropna()

            common_dates = returns.index.intersection(benchmark.index)
            if len(common_dates) > 0:
                returns_aligned = returns.loc[common_dates]
                benchmark_aligned = benchmark.loc[common_dates]

                metrics["beta"] = self.beta(returns_aligned, benchmark_aligned)
                metrics["correlation"] = self.correlation(returns_aligned, benchmark_aligned)
                metrics["tracking_error"] = (
                    self.tracking_error(returns_aligned, benchmark_aligned) * 100
                )

        return metrics

    def get_metrics(self) -> list[str]:
        """获取可计算的指标列表"""
        return [
            "volatility_annual",
            "downside_volatility",
            "max_drawdown",
            "var_95",
            "var_99",
            "cvar_95",
            "skewness",
            "kurtosis",
            "beta",
            "correlation",
            "tracking_error",
        ]

    # ========== 波动率计算 ==========

    def annualized_volatility(self, returns: pd.Series) -> float:
        """计算年化波动率"""
        if returns.empty:
            return 0.0
        return returns.std() * np.sqrt(self.periods_per_year)

    def downside_volatility(
        self,
        returns: pd.Series,
        mar: float = 0.0,
    ) -> float:
        """
        计算下行波动率

        Args:
            returns: 收益率序列
            mar: 最低可接受收益率

        Returns:
            下行波动率
        """
        if returns.empty:
            return 0.0

        downside_returns = returns[returns < mar] - mar
        return np.sqrt((downside_returns**2).mean()) * np.sqrt(self.periods_per_year)

    # ========== 回撤计算 ==========

    def max_drawdown(self, returns: pd.Series) -> float:
        """计算最大回撤"""
        if returns.empty:
            return 0.0

        cumulative = (1 + returns).cumprod()
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        return drawdown.min()

    def avg_drawdown(self, returns: pd.Series) -> float:
        """计算平均回撤"""
        if returns.empty:
            return 0.0

        cumulative = (1 + returns).cumprod()
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        return drawdown.mean()

    def max_drawdown_duration(self, returns: pd.Series) -> int:
        """
        计算最大回撤持续天数

        Returns:
            最大回撤持续天数
        """
        if returns.empty:
            return 0

        cumulative = (1 + returns).cumprod()
        running_max = cumulative.cummax()

        # 找到回撤期间
        in_drawdown = cumulative < running_max

        if not in_drawdown.any():
            return 0

        # 计算最长回撤期
        drawdown_periods = []
        current_period = 0

        for is_dd in in_drawdown:
            if is_dd:
                current_period += 1
            else:
                if current_period > 0:
                    drawdown_periods.append(current_period)
                current_period = 0

        if current_period > 0:
            drawdown_periods.append(current_period)

        return max(drawdown_periods) if drawdown_periods else 0

    # ========== 尾部风险计算 ==========

    def var(
        self,
        returns: pd.Series,
        confidence: float = 0.95,
    ) -> float:
        """
        计算风险价值 (Value at Risk)

        Args:
            returns: 收益率序列
            confidence: 置信水平

        Returns:
            VaR 值(负数表示损失)
        """
        if returns.empty:
            return 0.0

        return float(np.percentile(returns, (1 - confidence) * 100))

    def cvar(
        self,
        returns: pd.Series,
        confidence: float = 0.95,
    ) -> float:
        """
        计算条件风险价值 (Conditional VaR / Expected Shortfall)

        Args:
            returns: 收益率序列
            confidence: 置信水平

        Returns:
            CVaR 值
        """
        if returns.empty:
            return 0.0

        var = self.var(returns, confidence)
        return returns[returns <= var].mean()

    def skewness(self, returns: pd.Series) -> float:
        """计算偏度"""
        if returns.empty:
            return 0.0
        return returns.skew()

    def kurtosis(self, returns: pd.Series) -> float:
        """计算峰度"""
        if returns.empty:
            return 0.0
        return returns.kurtosis()

    # ========== 相对风险计算 ==========

    def beta(
        self,
        returns: pd.Series,
        benchmark: pd.Series,
    ) -> float:
        """计算 Beta"""
        if returns.empty or benchmark.empty:
            return 0.0

        covariance = returns.cov(benchmark)
        variance = benchmark.var()

        if variance == 0:
            return 0.0

        return covariance / variance

    def correlation(
        self,
        returns: pd.Series,
        benchmark: pd.Series,
    ) -> float:
        """计算相关系数"""
        if returns.empty or benchmark.empty:
            return 0.0
        return returns.corr(benchmark)

    def tracking_error(
        self,
        returns: pd.Series,
        benchmark: pd.Series,
    ) -> float:
        """计算跟踪误差"""
        if returns.empty or benchmark.empty:
            return 0.0

        excess_returns = returns - benchmark
        return excess_returns.std() * np.sqrt(self.periods_per_year)

__init__

__init__(
    confidence_level: float = 0.95,
    periods_per_year: int = 252,
)

初始化风险分析引擎

参数:

名称 类型 描述 默认
confidence_level float

VaR 置信水平

0.95
periods_per_year int

年交易日数

252
源代码位于: src/fund_cli/analysis/risk.py
def __init__(
    self,
    confidence_level: float = 0.95,
    periods_per_year: int = 252,
):
    """
    初始化风险分析引擎

    Args:
        confidence_level: VaR 置信水平
        periods_per_year: 年交易日数
    """
    self.confidence_level = confidence_level
    self.periods_per_year = periods_per_year

analyze

analyze(
    returns: Series,
    benchmark: Series | None = None,
    **kwargs: Any,
) -> dict[str, Any]

执行风险分析

参数:

名称 类型 描述 默认
returns Series

收益率序列

必需
benchmark Series | None

基准收益率序列(可选)

None
**kwargs Any

额外参数

{}

返回:

类型 描述
dict[str, Any]

风险分析结果字典

源代码位于: src/fund_cli/analysis/risk.py
def analyze(  # type: ignore[override]
    self,
    returns: pd.Series,
    benchmark: pd.Series | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """
    执行风险分析

    Args:
        returns: 收益率序列
        benchmark: 基准收益率序列(可选)
        **kwargs: 额外参数

    Returns:
        风险分析结果字典
    """
    # 确保输入为 Series
    if isinstance(returns, pd.DataFrame):
        returns = returns.iloc[:, 0]

    returns = returns.dropna()

    metrics = {
        # 波动率风险
        "volatility_annual": self.annualized_volatility(returns) * 100,
        "downside_volatility": self.downside_volatility(returns) * 100,
        # 回撤风险
        "max_drawdown": self.max_drawdown(returns) * 100,
        "avg_drawdown": self.avg_drawdown(returns) * 100,
        "max_drawdown_duration": self.max_drawdown_duration(returns),
        # 尾部风险
        "var_95": self.var(returns, 0.95) * 100,
        "var_99": self.var(returns, 0.99) * 100,
        "cvar_95": self.cvar(returns, 0.95) * 100,
        "skewness": self.skewness(returns),
        "kurtosis": self.kurtosis(returns),
        # 分布特征
        "best_day": returns.max() * 100 if not returns.empty else 0,
        "worst_day": returns.min() * 100 if not returns.empty else 0,
        "std_dev": returns.std() * 100 if not returns.empty else 0,
    }

    # 相对风险指标
    if benchmark is not None:
        if isinstance(benchmark, pd.DataFrame):
            benchmark = benchmark.iloc[:, 0]
        benchmark = benchmark.dropna()

        common_dates = returns.index.intersection(benchmark.index)
        if len(common_dates) > 0:
            returns_aligned = returns.loc[common_dates]
            benchmark_aligned = benchmark.loc[common_dates]

            metrics["beta"] = self.beta(returns_aligned, benchmark_aligned)
            metrics["correlation"] = self.correlation(returns_aligned, benchmark_aligned)
            metrics["tracking_error"] = (
                self.tracking_error(returns_aligned, benchmark_aligned) * 100
            )

    return metrics

get_metrics

get_metrics() -> list[str]

获取可计算的指标列表

源代码位于: src/fund_cli/analysis/risk.py
def get_metrics(self) -> list[str]:
    """获取可计算的指标列表"""
    return [
        "volatility_annual",
        "downside_volatility",
        "max_drawdown",
        "var_95",
        "var_99",
        "cvar_95",
        "skewness",
        "kurtosis",
        "beta",
        "correlation",
        "tracking_error",
    ]

annualized_volatility

annualized_volatility(returns: Series) -> float

计算年化波动率

源代码位于: src/fund_cli/analysis/risk.py
def annualized_volatility(self, returns: pd.Series) -> float:
    """计算年化波动率"""
    if returns.empty:
        return 0.0
    return returns.std() * np.sqrt(self.periods_per_year)

downside_volatility

downside_volatility(
    returns: Series, mar: float = 0.0
) -> float

计算下行波动率

参数:

名称 类型 描述 默认
returns Series

收益率序列

必需
mar float

最低可接受收益率

0.0

返回:

类型 描述
float

下行波动率

源代码位于: src/fund_cli/analysis/risk.py
def downside_volatility(
    self,
    returns: pd.Series,
    mar: float = 0.0,
) -> float:
    """
    计算下行波动率

    Args:
        returns: 收益率序列
        mar: 最低可接受收益率

    Returns:
        下行波动率
    """
    if returns.empty:
        return 0.0

    downside_returns = returns[returns < mar] - mar
    return np.sqrt((downside_returns**2).mean()) * np.sqrt(self.periods_per_year)

max_drawdown

max_drawdown(returns: Series) -> float

计算最大回撤

源代码位于: src/fund_cli/analysis/risk.py
def max_drawdown(self, returns: pd.Series) -> float:
    """计算最大回撤"""
    if returns.empty:
        return 0.0

    cumulative = (1 + returns).cumprod()
    running_max = cumulative.cummax()
    drawdown = (cumulative - running_max) / running_max
    return drawdown.min()

avg_drawdown

avg_drawdown(returns: Series) -> float

计算平均回撤

源代码位于: src/fund_cli/analysis/risk.py
def avg_drawdown(self, returns: pd.Series) -> float:
    """计算平均回撤"""
    if returns.empty:
        return 0.0

    cumulative = (1 + returns).cumprod()
    running_max = cumulative.cummax()
    drawdown = (cumulative - running_max) / running_max
    return drawdown.mean()

max_drawdown_duration

max_drawdown_duration(returns: Series) -> int

计算最大回撤持续天数

返回:

类型 描述
int

最大回撤持续天数

源代码位于: src/fund_cli/analysis/risk.py
def max_drawdown_duration(self, returns: pd.Series) -> int:
    """
    计算最大回撤持续天数

    Returns:
        最大回撤持续天数
    """
    if returns.empty:
        return 0

    cumulative = (1 + returns).cumprod()
    running_max = cumulative.cummax()

    # 找到回撤期间
    in_drawdown = cumulative < running_max

    if not in_drawdown.any():
        return 0

    # 计算最长回撤期
    drawdown_periods = []
    current_period = 0

    for is_dd in in_drawdown:
        if is_dd:
            current_period += 1
        else:
            if current_period > 0:
                drawdown_periods.append(current_period)
            current_period = 0

    if current_period > 0:
        drawdown_periods.append(current_period)

    return max(drawdown_periods) if drawdown_periods else 0

var

var(returns: Series, confidence: float = 0.95) -> float

计算风险价值 (Value at Risk)

参数:

名称 类型 描述 默认
returns Series

收益率序列

必需
confidence float

置信水平

0.95

返回:

类型 描述
float

VaR 值(负数表示损失)

源代码位于: src/fund_cli/analysis/risk.py
def var(
    self,
    returns: pd.Series,
    confidence: float = 0.95,
) -> float:
    """
    计算风险价值 (Value at Risk)

    Args:
        returns: 收益率序列
        confidence: 置信水平

    Returns:
        VaR 值(负数表示损失)
    """
    if returns.empty:
        return 0.0

    return float(np.percentile(returns, (1 - confidence) * 100))

cvar

cvar(returns: Series, confidence: float = 0.95) -> float

计算条件风险价值 (Conditional VaR / Expected Shortfall)

参数:

名称 类型 描述 默认
returns Series

收益率序列

必需
confidence float

置信水平

0.95

返回:

类型 描述
float

CVaR 值

源代码位于: src/fund_cli/analysis/risk.py
def cvar(
    self,
    returns: pd.Series,
    confidence: float = 0.95,
) -> float:
    """
    计算条件风险价值 (Conditional VaR / Expected Shortfall)

    Args:
        returns: 收益率序列
        confidence: 置信水平

    Returns:
        CVaR 值
    """
    if returns.empty:
        return 0.0

    var = self.var(returns, confidence)
    return returns[returns <= var].mean()

skewness

skewness(returns: Series) -> float

计算偏度

源代码位于: src/fund_cli/analysis/risk.py
def skewness(self, returns: pd.Series) -> float:
    """计算偏度"""
    if returns.empty:
        return 0.0
    return returns.skew()

kurtosis

kurtosis(returns: Series) -> float

计算峰度

源代码位于: src/fund_cli/analysis/risk.py
def kurtosis(self, returns: pd.Series) -> float:
    """计算峰度"""
    if returns.empty:
        return 0.0
    return returns.kurtosis()

beta

beta(returns: Series, benchmark: Series) -> float

计算 Beta

源代码位于: src/fund_cli/analysis/risk.py
def beta(
    self,
    returns: pd.Series,
    benchmark: pd.Series,
) -> float:
    """计算 Beta"""
    if returns.empty or benchmark.empty:
        return 0.0

    covariance = returns.cov(benchmark)
    variance = benchmark.var()

    if variance == 0:
        return 0.0

    return covariance / variance

correlation

correlation(returns: Series, benchmark: Series) -> float

计算相关系数

源代码位于: src/fund_cli/analysis/risk.py
def correlation(
    self,
    returns: pd.Series,
    benchmark: pd.Series,
) -> float:
    """计算相关系数"""
    if returns.empty or benchmark.empty:
        return 0.0
    return returns.corr(benchmark)

tracking_error

tracking_error(returns: Series, benchmark: Series) -> float

计算跟踪误差

源代码位于: src/fund_cli/analysis/risk.py
def tracking_error(
    self,
    returns: pd.Series,
    benchmark: pd.Series,
) -> float:
    """计算跟踪误差"""
    if returns.empty or benchmark.empty:
        return 0.0

    excess_returns = returns - benchmark
    return excess_returns.std() * np.sqrt(self.periods_per_year)

AttributionAnalyzer

Bases: Analyzer


              flowchart TD
              fund_cli.analysis.attribution.AttributionAnalyzer[AttributionAnalyzer]
              fund_cli.core.analyzer.Analyzer[Analyzer]

                              fund_cli.core.analyzer.Analyzer --> fund_cli.analysis.attribution.AttributionAnalyzer
                


              click fund_cli.analysis.attribution.AttributionAnalyzer href "" "fund_cli.analysis.attribution.AttributionAnalyzer"
              click fund_cli.core.analyzer.Analyzer href "" "fund_cli.core.analyzer.Analyzer"
            

归因分析引擎

支持: - Brinson 归因分析(配置效应、选择效应、交互效应) - 收益率分解

源代码位于: src/fund_cli/analysis/attribution.py
class AttributionAnalyzer(Analyzer):
    """
    归因分析引擎

    支持:
    - Brinson 归因分析(配置效应、选择效应、交互效应)
    - 收益率分解
    """

    def analyze(
        self,
        data: pd.DataFrame,
        benchmark_weights: dict[str, float] | None = None,
        portfolio_weights: dict[str, float] | None = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        执行归因分析

        Args:
            data: 包含各资产收益率的 DataFrame
            benchmark_weights: 基准组合权重 {资产名: 权重}
            portfolio_weights: 投资组合权重 {资产名: 权重}
            **kwargs: 额外参数

        Returns:
            归因分析结果字典
        """
        if benchmark_weights is None or portfolio_weights is None:
            return self._simple_decomposition(data)

        return self._brinson_attribution(data, benchmark_weights, portfolio_weights)

    def _simple_decomposition(self, returns: pd.DataFrame) -> dict[str, Any]:
        """
        简单收益率分解

        Args:
            returns: 收益率 DataFrame

        Returns:
            分解结果
        """
        if isinstance(returns, pd.Series):
            returns = returns.to_frame("fund")

        result = {}
        for col in returns.columns:
            col_data = returns[col].dropna()
            if len(col_data) == 0:
                continue

            cumulative = (1 + col_data).prod() - 1
            annualized = (1 + cumulative) ** (252 / len(col_data)) - 1

            result[col] = {
                "total_return": float(cumulative * 100),
                "annualized_return": float(annualized * 100),
                "volatility": float(col_data.std() * np.sqrt(252) * 100),
                "sharpe": float(
                    col_data.mean() / col_data.std() * np.sqrt(252) if col_data.std() > 0 else 0
                ),
            }

        return result

    def _brinson_attribution(
        self,
        returns: pd.DataFrame,
        benchmark_weights: dict[str, float],
        portfolio_weights: dict[str, float],
    ) -> dict[str, Any]:
        """
        Brinson 归因分析

        将组合收益与基准收益的差异分解为:
        - 配置效应(Allocation Effect)
        - 选择效应(Selection Effect)
        - 交互效应(Interaction Effect)

        Args:
            returns: 各资产收益率 DataFrame
            benchmark_weights: 基准权重
            portfolio_weights: 组合权重

        Returns:
            Brinson 归因结果
        """
        common_assets = (
            set(benchmark_weights.keys()) & set(portfolio_weights.keys()) & set(returns.columns)
        )

        if not common_assets:
            return {
                "allocation_effect": 0.0,
                "selection_effect": 0.0,
                "interaction_effect": 0.0,
                "total_active_return": 0.0,
            }

        # 计算各资产平均收益率
        asset_returns = {}
        for asset in common_assets:
            if asset in returns.columns:
                asset_returns[asset] = returns[asset].mean()

        # Brinson 归因分解
        allocation_effect = 0.0
        selection_effect = 0.0
        interaction_effect = 0.0

        benchmark_total_return = 0.0
        portfolio_total_return = 0.0

        for asset in common_assets:
            wp = portfolio_weights.get(asset, 0)
            wb = benchmark_weights.get(asset, 0)
            rp = asset_returns.get(asset, 0)
            rb = rp  # 简化:假设基准收益率等于资产收益率

            allocation_effect += (wp - wb) * rb
            selection_effect += wb * (rp - rb)
            interaction_effect += (wp - wb) * (rp - rb)

            portfolio_total_return += wp * rp
            benchmark_total_return += wb * rb

        total_active = portfolio_total_return - benchmark_total_return

        return {
            "allocation_effect": float(allocation_effect * 252 * 100),
            "selection_effect": float(selection_effect * 252 * 100),
            "interaction_effect": float(interaction_effect * 252 * 100),
            "total_active_return": float(total_active * 252 * 100),
            "portfolio_return": float(portfolio_total_return * 252 * 100),
            "benchmark_return": float(benchmark_total_return * 252 * 100),
            "asset_count": len(common_assets),
        }

    def get_metrics(self) -> list[str]:
        """获取可计算的指标列表"""
        return [
            "allocation_effect",
            "selection_effect",
            "interaction_effect",
            "total_active_return",
        ]

analyze

analyze(
    data: DataFrame,
    benchmark_weights: dict[str, float] | None = None,
    portfolio_weights: dict[str, float] | None = None,
    **kwargs: Any,
) -> dict[str, Any]

执行归因分析

参数:

名称 类型 描述 默认
data DataFrame

包含各资产收益率的 DataFrame

必需
benchmark_weights dict[str, float] | None

基准组合权重 {资产名: 权重}

None
portfolio_weights dict[str, float] | None

投资组合权重 {资产名: 权重}

None
**kwargs Any

额外参数

{}

返回:

类型 描述
dict[str, Any]

归因分析结果字典

源代码位于: src/fund_cli/analysis/attribution.py
def analyze(
    self,
    data: pd.DataFrame,
    benchmark_weights: dict[str, float] | None = None,
    portfolio_weights: dict[str, float] | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """
    执行归因分析

    Args:
        data: 包含各资产收益率的 DataFrame
        benchmark_weights: 基准组合权重 {资产名: 权重}
        portfolio_weights: 投资组合权重 {资产名: 权重}
        **kwargs: 额外参数

    Returns:
        归因分析结果字典
    """
    if benchmark_weights is None or portfolio_weights is None:
        return self._simple_decomposition(data)

    return self._brinson_attribution(data, benchmark_weights, portfolio_weights)

get_metrics

get_metrics() -> list[str]

获取可计算的指标列表

源代码位于: src/fund_cli/analysis/attribution.py
def get_metrics(self) -> list[str]:
    """获取可计算的指标列表"""
    return [
        "allocation_effect",
        "selection_effect",
        "interaction_effect",
        "total_active_return",
    ]

PortfolioAnalyzer

Bases: Analyzer


              flowchart TD
              fund_cli.analysis.portfolio.PortfolioAnalyzer[PortfolioAnalyzer]
              fund_cli.core.analyzer.Analyzer[Analyzer]

                              fund_cli.core.analyzer.Analyzer --> fund_cli.analysis.portfolio.PortfolioAnalyzer
                


              click fund_cli.analysis.portfolio.PortfolioAnalyzer href "" "fund_cli.analysis.portfolio.PortfolioAnalyzer"
              click fund_cli.core.analyzer.Analyzer href "" "fund_cli.core.analyzer.Analyzer"
            

投资组合分析引擎

支持: - 组合权重分析 - 组合风险分散度 - 资产相关性分析 - 组合收益贡献分析

源代码位于: src/fund_cli/analysis/portfolio.py
class PortfolioAnalyzer(Analyzer):
    """
    投资组合分析引擎

    支持:
    - 组合权重分析
    - 组合风险分散度
    - 资产相关性分析
    - 组合收益贡献分析
    """

    def analyze(
        self,
        data: pd.DataFrame,
        weights: dict[str, float] | None = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        执行组合分析

        Args:
            data: 多资产收益率 DataFrame,每列一个资产
            weights: 各资产权重字典
            **kwargs: 额外参数

        Returns:
            组合分析结果字典
        """
        if isinstance(data, pd.Series):
            data = data.to_frame("asset")

        if weights is None:
            # 等权配置
            n = data.shape[1]
            weights = dict.fromkeys(data.columns, 1.0 / n)

        result = {
            "asset_count": len(weights),
            "weights": weights,
        }

        # 组合收益率
        portfolio_returns = self._calculate_portfolio_returns(data, weights)
        result["portfolio_return"] = float(portfolio_returns.mean() * 252 * 100)
        result["portfolio_volatility"] = float(portfolio_returns.std() * np.sqrt(252) * 100)
        result["portfolio_sharpe"] = float(
            portfolio_returns.mean() / portfolio_returns.std() * np.sqrt(252)
            if portfolio_returns.std() > 0
            else 0
        )

        # 相关性分析
        result["correlation_matrix"] = data.corr().to_dict()
        avg_correlation = self._average_correlation(data.corr())
        result["average_correlation"] = float(avg_correlation)

        # 风险分散度(DR)
        result["diversification_ratio"] = float(self._diversification_ratio(data, weights))

        # 各资产贡献
        result["contribution"] = self._return_contribution(data, weights)

        return result

    def _calculate_portfolio_returns(
        self,
        returns: pd.DataFrame,
        weights: dict[str, float],
    ) -> pd.Series:
        """计算组合收益率序列"""
        valid_cols = [c for c in weights.keys() if c in returns.columns]
        w = np.array([weights[c] for c in valid_cols])
        return returns[valid_cols].dot(w)

    def _average_correlation(self, corr_matrix: pd.DataFrame) -> float:
        """计算平均相关系数"""
        n = len(corr_matrix)
        if n <= 1:
            return 1.0

        # 取上三角(不含对角线)
        mask = np.triu(np.ones((n, n), dtype=bool), k=1)
        values = corr_matrix.values[mask]

        return float(np.mean(values)) if len(values) > 0 else 1.0

    def _diversification_ratio(
        self,
        returns: pd.DataFrame,
        weights: dict[str, float],
    ) -> float:
        """
        计算风险分散度(Diversification Ratio)

        DR = 加权平均波动率 / 组合波动率
        DR > 1 表示组合具有分散化效果
        """
        valid_cols = [c for c in weights.keys() if c in returns.columns]
        if not valid_cols:
            return 1.0

        w = np.array([weights[c] for c in valid_cols])
        vols = returns[valid_cols].std() * np.sqrt(252)

        weighted_vol = float(np.dot(w, vols))
        portfolio_vol = float(returns[valid_cols].dot(w).std() * np.sqrt(252))

        if portfolio_vol == 0:
            return 1.0

        return weighted_vol / portfolio_vol

    def _return_contribution(
        self,
        returns: pd.DataFrame,
        weights: dict[str, float],
    ) -> dict[str, float]:
        """计算各资产收益贡献"""
        contribution = {}
        for asset, weight in weights.items():
            if asset in returns.columns:
                annual_return = returns[asset].mean() * 252 * 100
                contribution[asset] = {
                    "weight": weight,
                    "return": float(annual_return),
                    "contribution": float(weight * annual_return),
                }
        return contribution  # type: ignore[return-value]

    def get_metrics(self) -> list[str]:
        """获取可计算的指标列表"""
        return [
            "portfolio_return",
            "portfolio_volatility",
            "portfolio_sharpe",
            "average_correlation",
            "diversification_ratio",
        ]

analyze

analyze(
    data: DataFrame,
    weights: dict[str, float] | None = None,
    **kwargs: Any,
) -> dict[str, Any]

执行组合分析

参数:

名称 类型 描述 默认
data DataFrame

多资产收益率 DataFrame,每列一个资产

必需
weights dict[str, float] | None

各资产权重字典

None
**kwargs Any

额外参数

{}

返回:

类型 描述
dict[str, Any]

组合分析结果字典

源代码位于: src/fund_cli/analysis/portfolio.py
def analyze(
    self,
    data: pd.DataFrame,
    weights: dict[str, float] | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """
    执行组合分析

    Args:
        data: 多资产收益率 DataFrame,每列一个资产
        weights: 各资产权重字典
        **kwargs: 额外参数

    Returns:
        组合分析结果字典
    """
    if isinstance(data, pd.Series):
        data = data.to_frame("asset")

    if weights is None:
        # 等权配置
        n = data.shape[1]
        weights = dict.fromkeys(data.columns, 1.0 / n)

    result = {
        "asset_count": len(weights),
        "weights": weights,
    }

    # 组合收益率
    portfolio_returns = self._calculate_portfolio_returns(data, weights)
    result["portfolio_return"] = float(portfolio_returns.mean() * 252 * 100)
    result["portfolio_volatility"] = float(portfolio_returns.std() * np.sqrt(252) * 100)
    result["portfolio_sharpe"] = float(
        portfolio_returns.mean() / portfolio_returns.std() * np.sqrt(252)
        if portfolio_returns.std() > 0
        else 0
    )

    # 相关性分析
    result["correlation_matrix"] = data.corr().to_dict()
    avg_correlation = self._average_correlation(data.corr())
    result["average_correlation"] = float(avg_correlation)

    # 风险分散度(DR)
    result["diversification_ratio"] = float(self._diversification_ratio(data, weights))

    # 各资产贡献
    result["contribution"] = self._return_contribution(data, weights)

    return result

get_metrics

get_metrics() -> list[str]

获取可计算的指标列表

源代码位于: src/fund_cli/analysis/portfolio.py
def get_metrics(self) -> list[str]:
    """获取可计算的指标列表"""
    return [
        "portfolio_return",
        "portfolio_volatility",
        "portfolio_sharpe",
        "average_correlation",
        "diversification_ratio",
    ]

ManagerAnalyzer

Bases: Analyzer


              flowchart TD
              fund_cli.analysis.manager.ManagerAnalyzer[ManagerAnalyzer]
              fund_cli.core.analyzer.Analyzer[Analyzer]

                              fund_cli.core.analyzer.Analyzer --> fund_cli.analysis.manager.ManagerAnalyzer
                


              click fund_cli.analysis.manager.ManagerAnalyzer href "" "fund_cli.analysis.manager.ManagerAnalyzer"
              click fund_cli.core.analyzer.Analyzer href "" "fund_cli.core.analyzer.Analyzer"
            

基金经理分析引擎

分析基金经理的综合信息,包括: - 经理基本信息查询 - 业绩统计(管理基金数、平均收益率等) - 稳定性分析(任职年限、管理规模变化)

源代码位于: src/fund_cli/analysis/manager.py
class ManagerAnalyzer(Analyzer):
    """
    基金经理分析引擎

    分析基金经理的综合信息,包括:
    - 经理基本信息查询
    - 业绩统计(管理基金数、平均收益率等)
    - 稳定性分析(任职年限、管理规模变化)
    """

    def analyze(self, data: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
        """
        执行综合经理分析

        Args:
            data: 经理数据字典,需包含 name, fund_code, fund_name 等字段

        Returns:
            综合分析结果
        """
        result = {}
        result["info"] = self.manager_info(data)
        result["performance"] = self.performance_stats(data)
        result["stability"] = self.stability_analysis(data)
        return result

    def manager_info(self, data: dict[str, Any]) -> dict[str, Any]:
        """
        经理信息查询 (FUND-MANAGER-001)

        Args:
            data: 经理数据字典

        Returns:
            经理信息字典
        """
        return {
            "name": data.get("name", ""),
            "fund_code": data.get("fund_code", ""),
            "fund_name": data.get("fund_name", ""),
            "company": data.get("company", ""),
            "start_date": str(data.get("start_date", "")),
            "tenure_days": data.get("tenure_days", 0),
        }

    def performance_stats(self, data: dict[str, Any]) -> dict[str, Any]:
        """
        经理业绩统计 (FUND-MANAGER-002)

        Args:
            data: 经理数据字典,可包含 total_return, annual_return, funds 列表

        Returns:
            业绩统计字典
        """
        funds = data.get("funds", [])
        if funds:
            returns = [f.get("total_return", 0) for f in funds if f.get("total_return") is not None]
            avg_return = sum(returns) / len(returns) if returns else 0
            best_fund = max(funds, key=lambda f: f.get("total_return", float("-inf")), default=None)
            worst_fund = min(funds, key=lambda f: f.get("total_return", float("inf")), default=None)
            return {
                "total_funds": len(funds),
                "avg_return": round(avg_return, 2),
                "best_fund": best_fund.get("fund_name", "") if best_fund else "",
                "best_return": best_fund.get("total_return", 0) if best_fund else 0,
                "worst_fund": worst_fund.get("fund_name", "") if worst_fund else "",
                "worst_return": worst_fund.get("total_return", 0) if worst_fund else 0,
            }
        else:
            return {
                "total_funds": 1,
                "avg_return": data.get("annual_return", 0),
                "best_fund": data.get("fund_name", ""),
                "best_return": data.get("total_return", 0),
                "worst_fund": data.get("fund_name", ""),
                "worst_return": data.get("total_return", 0),
            }

    def stability_analysis(self, data: dict[str, Any]) -> dict[str, Any]:
        """
        经理稳定性分析 (FUND-MANAGER-003)

        Args:
            data: 经理数据字典,可包含 tenure_days, start_date, funds 列表

        Returns:
            稳定性分析字典
        """
        tenure_days = data.get("tenure_days", 0)
        tenure_years = tenure_days / 365.25 if tenure_days else 0

        # 稳定性评级
        if tenure_years >= 5:
            stability_level = "非常稳定"
            stability_score = 5
        elif tenure_years >= 3:
            stability_level = "稳定"
            stability_score = 4
        elif tenure_years >= 1:
            stability_level = "一般"
            stability_score = 3
        else:
            stability_level = "较新"
            stability_score = 2

        # 多基金管理分析
        funds = data.get("funds", [])
        multi_fund = len(funds) > 1

        return {
            "tenure_days": tenure_days,
            "tenure_years": round(tenure_years, 1),
            "stability_level": stability_level,
            "stability_score": stability_score,
            "multi_fund_manager": multi_fund,
            "managed_fund_count": len(funds) if funds else 1,
        }

    def get_metrics(self) -> list[str]:
        """返回支持的指标列表"""
        return ["manager_info", "performance_stats", "stability_analysis"]

analyze

analyze(
    data: dict[str, Any], **kwargs: Any
) -> dict[str, Any]

执行综合经理分析

参数:

名称 类型 描述 默认
data dict[str, Any]

经理数据字典,需包含 name, fund_code, fund_name 等字段

必需

返回:

类型 描述
dict[str, Any]

综合分析结果

源代码位于: src/fund_cli/analysis/manager.py
def analyze(self, data: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
    """
    执行综合经理分析

    Args:
        data: 经理数据字典,需包含 name, fund_code, fund_name 等字段

    Returns:
        综合分析结果
    """
    result = {}
    result["info"] = self.manager_info(data)
    result["performance"] = self.performance_stats(data)
    result["stability"] = self.stability_analysis(data)
    return result

manager_info

manager_info(data: dict[str, Any]) -> dict[str, Any]

经理信息查询 (FUND-MANAGER-001)

参数:

名称 类型 描述 默认
data dict[str, Any]

经理数据字典

必需

返回:

类型 描述
dict[str, Any]

经理信息字典

源代码位于: src/fund_cli/analysis/manager.py
def manager_info(self, data: dict[str, Any]) -> dict[str, Any]:
    """
    经理信息查询 (FUND-MANAGER-001)

    Args:
        data: 经理数据字典

    Returns:
        经理信息字典
    """
    return {
        "name": data.get("name", ""),
        "fund_code": data.get("fund_code", ""),
        "fund_name": data.get("fund_name", ""),
        "company": data.get("company", ""),
        "start_date": str(data.get("start_date", "")),
        "tenure_days": data.get("tenure_days", 0),
    }

performance_stats

performance_stats(data: dict[str, Any]) -> dict[str, Any]

经理业绩统计 (FUND-MANAGER-002)

参数:

名称 类型 描述 默认
data dict[str, Any]

经理数据字典,可包含 total_return, annual_return, funds 列表

必需

返回:

类型 描述
dict[str, Any]

业绩统计字典

源代码位于: src/fund_cli/analysis/manager.py
def performance_stats(self, data: dict[str, Any]) -> dict[str, Any]:
    """
    经理业绩统计 (FUND-MANAGER-002)

    Args:
        data: 经理数据字典,可包含 total_return, annual_return, funds 列表

    Returns:
        业绩统计字典
    """
    funds = data.get("funds", [])
    if funds:
        returns = [f.get("total_return", 0) for f in funds if f.get("total_return") is not None]
        avg_return = sum(returns) / len(returns) if returns else 0
        best_fund = max(funds, key=lambda f: f.get("total_return", float("-inf")), default=None)
        worst_fund = min(funds, key=lambda f: f.get("total_return", float("inf")), default=None)
        return {
            "total_funds": len(funds),
            "avg_return": round(avg_return, 2),
            "best_fund": best_fund.get("fund_name", "") if best_fund else "",
            "best_return": best_fund.get("total_return", 0) if best_fund else 0,
            "worst_fund": worst_fund.get("fund_name", "") if worst_fund else "",
            "worst_return": worst_fund.get("total_return", 0) if worst_fund else 0,
        }
    else:
        return {
            "total_funds": 1,
            "avg_return": data.get("annual_return", 0),
            "best_fund": data.get("fund_name", ""),
            "best_return": data.get("total_return", 0),
            "worst_fund": data.get("fund_name", ""),
            "worst_return": data.get("total_return", 0),
        }

stability_analysis

stability_analysis(data: dict[str, Any]) -> dict[str, Any]

经理稳定性分析 (FUND-MANAGER-003)

参数:

名称 类型 描述 默认
data dict[str, Any]

经理数据字典,可包含 tenure_days, start_date, funds 列表

必需

返回:

类型 描述
dict[str, Any]

稳定性分析字典

源代码位于: src/fund_cli/analysis/manager.py
def stability_analysis(self, data: dict[str, Any]) -> dict[str, Any]:
    """
    经理稳定性分析 (FUND-MANAGER-003)

    Args:
        data: 经理数据字典,可包含 tenure_days, start_date, funds 列表

    Returns:
        稳定性分析字典
    """
    tenure_days = data.get("tenure_days", 0)
    tenure_years = tenure_days / 365.25 if tenure_days else 0

    # 稳定性评级
    if tenure_years >= 5:
        stability_level = "非常稳定"
        stability_score = 5
    elif tenure_years >= 3:
        stability_level = "稳定"
        stability_score = 4
    elif tenure_years >= 1:
        stability_level = "一般"
        stability_score = 3
    else:
        stability_level = "较新"
        stability_score = 2

    # 多基金管理分析
    funds = data.get("funds", [])
    multi_fund = len(funds) > 1

    return {
        "tenure_days": tenure_days,
        "tenure_years": round(tenure_years, 1),
        "stability_level": stability_level,
        "stability_score": stability_score,
        "multi_fund_manager": multi_fund,
        "managed_fund_count": len(funds) if funds else 1,
    }

get_metrics

get_metrics() -> list[str]

返回支持的指标列表

源代码位于: src/fund_cli/analysis/manager.py
def get_metrics(self) -> list[str]:
    """返回支持的指标列表"""
    return ["manager_info", "performance_stats", "stability_analysis"]

HoldingAnalyzer

Bases: Analyzer


              flowchart TD
              fund_cli.analysis.holding.HoldingAnalyzer[HoldingAnalyzer]
              fund_cli.core.analyzer.Analyzer[Analyzer]

                              fund_cli.core.analyzer.Analyzer --> fund_cli.analysis.holding.HoldingAnalyzer
                


              click fund_cli.analysis.holding.HoldingAnalyzer href "" "fund_cli.analysis.holding.HoldingAnalyzer"
              click fund_cli.core.analyzer.Analyzer href "" "fund_cli.core.analyzer.Analyzer"
            

持仓分析引擎

分析基金持仓数据,提供: - 行业配置分析 - 重仓股分析 - 持仓集中度(HHI指数) - 持仓变化追踪 - 风格分析(九宫格)

源代码位于: src/fund_cli/analysis/holding.py
class HoldingAnalyzer(Analyzer):
    """
    持仓分析引擎

    分析基金持仓数据,提供:
    - 行业配置分析
    - 重仓股分析
    - 持仓集中度(HHI指数)
    - 持仓变化追踪
    - 风格分析(九宫格)
    """

    def analyze(self, data: pd.DataFrame, **kwargs: Any) -> dict[str, Any]:
        """
        执行综合持仓分析

        Args:
            data: 持仓数据 DataFrame,需包含 stock_code, stock_name, weight, industry 列

        Returns:
            综合分析结果字典
        """
        result: dict[str, Any] = {}
        if "industry" in data.columns:
            result["industry_distribution"] = self.industry_distribution(data)
        result["top_holdings"] = self.top_holdings(data)
        result["concentration_hhi"] = self.concentration_hhi(data)
        result["concentration_level"] = self._hhi_level(result["concentration_hhi"])
        if "industry" in data.columns:
            result["style_analysis"] = self.style_analysis(data)
        return result

    def industry_distribution(self, holdings: pd.DataFrame) -> dict[str, float]:
        """
        行业配置分析 (FUND-HOLDING-002)

        Args:
            holdings: 持仓数据,需包含 industry 和 weight 列

        Returns:
            行业分布字典 {行业名称: 占比(%)}

        Raises:
            ValueError: 数据缺少 industry 列
        """
        if "industry" not in holdings.columns:
            raise ValueError("持仓数据缺少 industry 列")
        distribution = holdings.groupby("industry")["weight"].sum()
        return distribution.sort_values(ascending=False).to_dict()

    def top_holdings(self, holdings: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
        """
        重仓股分析 (FUND-HOLDING-003)

        Args:
            holdings: 持仓数据
            top_n: 返回前N大持仓,默认10

        Returns:
            前N大持仓 DataFrame
        """
        df = holdings.copy()
        if "weight" in df.columns:
            df = df.sort_values("weight", ascending=False)
        return df.head(top_n).reset_index(drop=True)

    def concentration_hhi(self, holdings: pd.DataFrame) -> float:
        """
        持仓集中度 - HHI指数 (FUND-HOLDING-004)

        HHI = sum(weight_i^2),其中 weight_i 为第i只股票占净值比例(小数形式)

        Args:
            holdings: 持仓数据,需包含 weight 列

        Returns:
            HHI指数值。>0.25 高度集中, 0.15~0.25 中度集中, <0.15 分散
        """
        if holdings.empty or "weight" not in holdings.columns:
            return 0.0
        weights = holdings["weight"].values / 100.0  # 转为小数
        return float(np.sum(weights**2))

    def track_changes(
        self,
        current: pd.DataFrame,
        previous: pd.DataFrame,
    ) -> pd.DataFrame:
        """
        持仓变化追踪 (FUND-HOLDING-005)

        对比两期持仓数据,标识新增、删除、增持、减持的股票。

        Args:
            current: 当期持仓数据
            previous: 上期持仓数据

        Returns:
            变化分析 DataFrame,包含 change_type 列(新增/删除/增持/减持/不变)
        """
        if current.empty or previous.empty:
            return pd.DataFrame()

        curr = (
            current[["stock_code", "stock_name", "weight"]].copy()
            if "stock_code" in current.columns
            else current.copy()
        )
        prev = (
            previous[["stock_code", "weight"]].copy()
            if "stock_code" in previous.columns
            else previous.copy()
        )

        merged = curr.merge(prev, on="stock_code", how="outer", suffixes=("_curr", "_prev"))

        def classify_change(row: pd.Series) -> str:
            if pd.isna(row.get("weight_prev")):
                return "新增"
            if pd.isna(row.get("weight_curr")):
                return "删除"
            diff = row["weight_curr"] - row["weight_prev"]
            if abs(diff) < 0.01:
                return "不变"
            return "增持" if diff > 0 else "减持"

        merged["change_type"] = merged.apply(classify_change, axis=1)
        merged["weight_change"] = merged.get("weight_curr", 0) - merged.get("weight_prev", 0)
        return merged.sort_values("weight_change", ascending=False, na_position="last").reset_index(
            drop=True
        )

    def style_analysis(self, holdings: pd.DataFrame) -> dict[str, Any]:
        """
        风格分析 (FUND-HOLDING-006)

        基于持仓股票的行业分布进行风格分析,输出风格九宫格位置。
        简化实现:基于行业分布判断大盘/小盘和价值/成长倾向。

        Args:
            holdings: 持仓数据,需包含 industry 列

        Returns:
            风格分析结果字典
        """
        if "industry" not in holdings.columns:
            return {"market_cap_style": "未知", "investment_style": "未知", "grid_position": "中"}

        # 基于行业分布判断风格
        distribution = self.industry_distribution(holdings)
        total = sum(distribution.values()) or 1.0

        # 大盘行业权重
        large_cap_industries = {"银行", "非银金融", "食品饮料", "医药生物", "电子", "电力设备"}
        large_cap_weight = sum(distribution.get(ind, 0) for ind in large_cap_industries) / total

        # 价值行业权重
        value_industries = {
            "银行",
            "房地产",
            "建筑装饰",
            "公用事业",
            "交通运输",
            "煤炭",
            "石油石化",
        }
        value_weight = sum(distribution.get(ind, 0) for ind in value_industries) / total

        market_cap_style = (
            "大盘" if large_cap_weight > 0.5 else ("小盘" if large_cap_weight < 0.3 else "中盘")
        )
        investment_style = (
            "价值" if value_weight > 0.3 else ("成长" if value_weight < 0.15 else "平衡")
        )

        return {
            "market_cap_style": market_cap_style,
            "investment_style": investment_style,
            "grid_position": f"{market_cap_style}{investment_style}",
            "large_cap_weight": round(large_cap_weight * 100, 2),
            "value_weight": round(value_weight * 100, 2),
            "industry_distribution": distribution,
        }

    def get_metrics(self) -> list[str]:
        """返回支持的指标列表"""
        return [
            "industry_distribution",
            "top_holdings",
            "concentration_hhi",
            "concentration_level",
            "style_analysis",
        ]

    @staticmethod
    def _hhi_level(hhi: float) -> str:
        """根据HHI值返回集中度等级"""
        if hhi >= 0.25:
            return "高度集中"
        elif hhi >= 0.15:
            return "中度集中"
        else:
            return "分散"

analyze

analyze(data: DataFrame, **kwargs: Any) -> dict[str, Any]

执行综合持仓分析

参数:

名称 类型 描述 默认
data DataFrame

持仓数据 DataFrame,需包含 stock_code, stock_name, weight, industry 列

必需

返回:

类型 描述
dict[str, Any]

综合分析结果字典

源代码位于: src/fund_cli/analysis/holding.py
def analyze(self, data: pd.DataFrame, **kwargs: Any) -> dict[str, Any]:
    """
    执行综合持仓分析

    Args:
        data: 持仓数据 DataFrame,需包含 stock_code, stock_name, weight, industry 列

    Returns:
        综合分析结果字典
    """
    result: dict[str, Any] = {}
    if "industry" in data.columns:
        result["industry_distribution"] = self.industry_distribution(data)
    result["top_holdings"] = self.top_holdings(data)
    result["concentration_hhi"] = self.concentration_hhi(data)
    result["concentration_level"] = self._hhi_level(result["concentration_hhi"])
    if "industry" in data.columns:
        result["style_analysis"] = self.style_analysis(data)
    return result

industry_distribution

industry_distribution(
    holdings: DataFrame,
) -> dict[str, float]

行业配置分析 (FUND-HOLDING-002)

参数:

名称 类型 描述 默认
holdings DataFrame

持仓数据,需包含 industry 和 weight 列

必需

返回:

类型 描述
dict[str, float]

行业分布字典 {行业名称: 占比(%)}

引发:

类型 描述
ValueError

数据缺少 industry 列

源代码位于: src/fund_cli/analysis/holding.py
def industry_distribution(self, holdings: pd.DataFrame) -> dict[str, float]:
    """
    行业配置分析 (FUND-HOLDING-002)

    Args:
        holdings: 持仓数据,需包含 industry 和 weight 列

    Returns:
        行业分布字典 {行业名称: 占比(%)}

    Raises:
        ValueError: 数据缺少 industry 列
    """
    if "industry" not in holdings.columns:
        raise ValueError("持仓数据缺少 industry 列")
    distribution = holdings.groupby("industry")["weight"].sum()
    return distribution.sort_values(ascending=False).to_dict()

top_holdings

top_holdings(
    holdings: DataFrame, top_n: int = 10
) -> pd.DataFrame

重仓股分析 (FUND-HOLDING-003)

参数:

名称 类型 描述 默认
holdings DataFrame

持仓数据

必需
top_n int

返回前N大持仓,默认10

10

返回:

类型 描述
DataFrame

前N大持仓 DataFrame

源代码位于: src/fund_cli/analysis/holding.py
def top_holdings(self, holdings: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
    """
    重仓股分析 (FUND-HOLDING-003)

    Args:
        holdings: 持仓数据
        top_n: 返回前N大持仓,默认10

    Returns:
        前N大持仓 DataFrame
    """
    df = holdings.copy()
    if "weight" in df.columns:
        df = df.sort_values("weight", ascending=False)
    return df.head(top_n).reset_index(drop=True)

concentration_hhi

concentration_hhi(holdings: DataFrame) -> float

持仓集中度 - HHI指数 (FUND-HOLDING-004)

HHI = sum(weight_i^2),其中 weight_i 为第i只股票占净值比例(小数形式)

参数:

名称 类型 描述 默认
holdings DataFrame

持仓数据,需包含 weight 列

必需

返回:

类型 描述
float

HHI指数值。>0.25 高度集中, 0.15~0.25 中度集中, <0.15 分散

源代码位于: src/fund_cli/analysis/holding.py
def concentration_hhi(self, holdings: pd.DataFrame) -> float:
    """
    持仓集中度 - HHI指数 (FUND-HOLDING-004)

    HHI = sum(weight_i^2),其中 weight_i 为第i只股票占净值比例(小数形式)

    Args:
        holdings: 持仓数据,需包含 weight 列

    Returns:
        HHI指数值。>0.25 高度集中, 0.15~0.25 中度集中, <0.15 分散
    """
    if holdings.empty or "weight" not in holdings.columns:
        return 0.0
    weights = holdings["weight"].values / 100.0  # 转为小数
    return float(np.sum(weights**2))

track_changes

track_changes(
    current: DataFrame, previous: DataFrame
) -> pd.DataFrame

持仓变化追踪 (FUND-HOLDING-005)

对比两期持仓数据,标识新增、删除、增持、减持的股票。

参数:

名称 类型 描述 默认
current DataFrame

当期持仓数据

必需
previous DataFrame

上期持仓数据

必需

返回:

类型 描述
DataFrame

变化分析 DataFrame,包含 change_type 列(新增/删除/增持/减持/不变)

源代码位于: src/fund_cli/analysis/holding.py
def track_changes(
    self,
    current: pd.DataFrame,
    previous: pd.DataFrame,
) -> pd.DataFrame:
    """
    持仓变化追踪 (FUND-HOLDING-005)

    对比两期持仓数据,标识新增、删除、增持、减持的股票。

    Args:
        current: 当期持仓数据
        previous: 上期持仓数据

    Returns:
        变化分析 DataFrame,包含 change_type 列(新增/删除/增持/减持/不变)
    """
    if current.empty or previous.empty:
        return pd.DataFrame()

    curr = (
        current[["stock_code", "stock_name", "weight"]].copy()
        if "stock_code" in current.columns
        else current.copy()
    )
    prev = (
        previous[["stock_code", "weight"]].copy()
        if "stock_code" in previous.columns
        else previous.copy()
    )

    merged = curr.merge(prev, on="stock_code", how="outer", suffixes=("_curr", "_prev"))

    def classify_change(row: pd.Series) -> str:
        if pd.isna(row.get("weight_prev")):
            return "新增"
        if pd.isna(row.get("weight_curr")):
            return "删除"
        diff = row["weight_curr"] - row["weight_prev"]
        if abs(diff) < 0.01:
            return "不变"
        return "增持" if diff > 0 else "减持"

    merged["change_type"] = merged.apply(classify_change, axis=1)
    merged["weight_change"] = merged.get("weight_curr", 0) - merged.get("weight_prev", 0)
    return merged.sort_values("weight_change", ascending=False, na_position="last").reset_index(
        drop=True
    )

style_analysis

style_analysis(holdings: DataFrame) -> dict[str, Any]

风格分析 (FUND-HOLDING-006)

基于持仓股票的行业分布进行风格分析,输出风格九宫格位置。 简化实现:基于行业分布判断大盘/小盘和价值/成长倾向。

参数:

名称 类型 描述 默认
holdings DataFrame

持仓数据,需包含 industry 列

必需

返回:

类型 描述
dict[str, Any]

风格分析结果字典

源代码位于: src/fund_cli/analysis/holding.py
def style_analysis(self, holdings: pd.DataFrame) -> dict[str, Any]:
    """
    风格分析 (FUND-HOLDING-006)

    基于持仓股票的行业分布进行风格分析,输出风格九宫格位置。
    简化实现:基于行业分布判断大盘/小盘和价值/成长倾向。

    Args:
        holdings: 持仓数据,需包含 industry 列

    Returns:
        风格分析结果字典
    """
    if "industry" not in holdings.columns:
        return {"market_cap_style": "未知", "investment_style": "未知", "grid_position": "中"}

    # 基于行业分布判断风格
    distribution = self.industry_distribution(holdings)
    total = sum(distribution.values()) or 1.0

    # 大盘行业权重
    large_cap_industries = {"银行", "非银金融", "食品饮料", "医药生物", "电子", "电力设备"}
    large_cap_weight = sum(distribution.get(ind, 0) for ind in large_cap_industries) / total

    # 价值行业权重
    value_industries = {
        "银行",
        "房地产",
        "建筑装饰",
        "公用事业",
        "交通运输",
        "煤炭",
        "石油石化",
    }
    value_weight = sum(distribution.get(ind, 0) for ind in value_industries) / total

    market_cap_style = (
        "大盘" if large_cap_weight > 0.5 else ("小盘" if large_cap_weight < 0.3 else "中盘")
    )
    investment_style = (
        "价值" if value_weight > 0.3 else ("成长" if value_weight < 0.15 else "平衡")
    )

    return {
        "market_cap_style": market_cap_style,
        "investment_style": investment_style,
        "grid_position": f"{market_cap_style}{investment_style}",
        "large_cap_weight": round(large_cap_weight * 100, 2),
        "value_weight": round(value_weight * 100, 2),
        "industry_distribution": distribution,
    }

get_metrics

get_metrics() -> list[str]

返回支持的指标列表

源代码位于: src/fund_cli/analysis/holding.py
def get_metrics(self) -> list[str]:
    """返回支持的指标列表"""
    return [
        "industry_distribution",
        "top_holdings",
        "concentration_hhi",
        "concentration_level",
        "style_analysis",
    ]