跳转至

核心模块 API

核心模块提供数据管理、分析引擎、报告生成等基础功能。

DataManager

数据管理器

统一管理多个数据源,提供: - 自动数据源选择 - 数据缓存 - 统一的数据访问接口 - 多数据源注册和切换

源代码位于: src/fund_cli/core/data_manager.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
class DataManager:
    """
    数据管理器

    统一管理多个数据源,提供:
    - 自动数据源选择
    - 数据缓存
    - 统一的数据访问接口
    - 多数据源注册和切换
    """

    def __init__(
        self,
        cache: DataCache | None = None,
        primary_source: str | None = None,
    ):
        """
        初始化数据管理器

        Args:
            cache: 缓存管理器
            primary_source: 主数据源名称,默认使用配置中的设置
        """
        self.config = get_config()
        self._cache = cache or DataCache(
            cache_dir=self.config.data.cache_dir,
            default_ttl=self.config.data.cache_ttl,
        )
        self._primary_source = primary_source or self.config.data.primary_source
        self._adapters: dict[str, DataSourceAdapter] = {}
        self._gateway = DataSourceGateway()

        # 初始化数据源
        self._init_adapters()

    def _init_adapters(self) -> None:
        """
        初始化数据源适配器

        根据配置自动检测并注册可用的数据源适配器:
        - AKShare(默认数据源,零配置)
        - Tushare(需要 FUND_DATA_TUSHARE_TOKEN)
        - Wind(需要 WindPy 库且 wind_enabled=True)
        """
        # AKShare(默认数据源,零配置)
        if self.config.data.akshare_enabled:
            try:
                self._adapters["akshare"] = AKShareAdapter(cache=self._cache)
                self._gateway.register_adapter("akshare", self._adapters["akshare"])
                print("[DataManager] 已注册 AKShareAdapter")
            except Exception as e:
                print(f"[DataManager] 注册 AKShareAdapter 失败: {e}")

        # Tushare(需要 Token)
        if self.config.data.tushare_token:
            try:
                from fund_cli.data.adapters.tushare_adapter import TushareAdapter
                self._adapters["tushare"] = TushareAdapter(cache=self._cache)
                self._gateway.register_adapter("tushare", self._adapters["tushare"])
                print("[DataManager] 已注册 TushareAdapter")
            except ImportError:
                print("[DataManager] Tushare 未安装,跳过注册")
            except Exception as e:
                print(f"[DataManager] 注册 TushareAdapter 失败: {e}")
        else:
            print("[DataManager] Tushare Token 未配置,跳过注册")

        # Wind(需要 wind_enabled=True)
        if self.config.data.wind_enabled:
            try:
                from fund_cli.data.adapters.wind_adapter import WindAdapter
                wind_adapter = WindAdapter(cache=self._cache)
                if wind_adapter.is_available():
                    self._adapters["wind"] = wind_adapter
                    self._gateway.register_adapter("wind", wind_adapter)
                    print("[DataManager] 已注册 WindAdapter")
                else:
                    print("[DataManager] Wind 不可用,跳过注册")
            except ImportError:
                print("[DataManager] WindPy 未安装,跳过注册")
            except Exception as e:
                print(f"[DataManager] 注册 WindAdapter 失败: {e}")

        # 设置主数据源
        if self._primary_source not in self._adapters:
            # 如果配置的主数据源不可用,选择第一个可用的
            if self._adapters:
                self._primary_source = list(self._adapters.keys())[0]
                print(f"[DataManager] 主数据源 {self._primary_source} 不可用,使用 {self._primary_source}")

        print(f"[DataManager] 当前可用数据源: {list(self._adapters.keys())}, 主数据源: {self._primary_source}")

    def register_adapter(self, name: str, adapter: DataSourceAdapter) -> None:
        """
        注册新的数据源适配器

        Args:
            name: 适配器名称
            adapter: 适配器实例
        """
        self._adapters[name] = adapter
        print(f"[DataManager] 已注册适配器: {name}")

    @property
    def available_sources(self) -> list[str]:
        """获取可用数据源列表"""
        return list(self._adapters.keys())

    @property
    def source_priority(self) -> list[str]:
        """获取数据源优先级列表"""
        return self.config.data.source_priority_list

    @property
    def gateway(self) -> DataSourceGateway:
        """获取数据源网关实例"""
        return self._gateway

    def get_adapter(self, source: str | None = None) -> DataSourceAdapter:
        """
        获取数据源适配器

        Args:
            source: 数据源名称,默认使用主数据源

        Returns:
            数据源适配器实例

        Raises:
            DataSourceError: 数据源不可用
        """
        source_name = source or self._primary_source

        if source_name not in self._adapters:
            raise DataSourceError(f"数据源 {source_name} 未配置或不可用")

        adapter = self._adapters[source_name]

        if not adapter.is_available():
            raise DataSourceError(f"数据源 {source_name} 不可用")

        return adapter

    @property
    def _adapter(self) -> DataSourceAdapter:
        """获取主数据源适配器(便捷属性)"""
        return self.get_adapter()

    # ========== 基础基金数据接口 ==========

    def get_fund_info(self, fund_code: str) -> dict[str, Any]:
        """
        获取基金基础信息

        Args:
            fund_code: 基金代码

        Returns:
            基金信息字典
        """
        return self._adapter.get_fund_info(fund_code)

    def get_fund_nav(
        self,
        fund_code: str,
        start_date: date | None = None,
        end_date: date | None = None,
    ) -> pd.DataFrame:
        """
        获取基金净值数据

        Args:
            fund_code: 基金代码
            start_date: 开始日期
            end_date: 结束日期

        Returns:
            净值数据 DataFrame
        """
        return self._adapter.get_fund_nav(fund_code, start_date, end_date)

    def search_funds(
        self,
        fund_type: str | None = None,
        company: str | None = None,
        min_scale: float | None = None,
        max_scale: float | None = None,
        keyword: str | None = None,
        limit: int = 100,
    ) -> pd.DataFrame:
        """
        搜索基金

        Args:
            fund_type: 基金类型
            company: 基金公司
            min_scale: 最小规模
            max_scale: 最大规模
            keyword: 关键词
            limit: 返回数量限制

        Returns:
            基金列表 DataFrame
        """
        return self._adapter.search_funds(
            fund_type=fund_type,
            company=company,
            min_scale=min_scale,
            max_scale=max_scale,
            keyword=keyword,
            limit=limit,
        )

    def get_fund_list(self, fund_type: str | None = None) -> pd.DataFrame:
        """
        获取基金列表

        Args:
            fund_type: 基金类型筛选

        Returns:
            基金列表 DataFrame
        """
        return self._adapter.get_fund_list(fund_type)

    def get_benchmark_nav(
        self,
        benchmark_code: str,
        start_date: date | None = None,
        end_date: date | None = None,
    ) -> pd.DataFrame:
        """
        获取基准指数数据

        Args:
            benchmark_code: 基准指数代码
            start_date: 开始日期
            end_date: 结束日期

        Returns:
            基准数据 DataFrame
        """
        return self._adapter.get_benchmark_nav(benchmark_code, start_date, end_date)

    def get_fund_holdings(
        self,
        fund_code: str,
        report_date: date | None = None,
    ) -> pd.DataFrame:
        """获取基金持仓数据"""
        return self._adapter.get_fund_holdings(fund_code, report_date)

    def get_fund_manager(self, fund_code: str) -> dict[str, Any]:
        """获取基金经理信息"""
        return self._adapter.get_fund_manager(fund_code)

    def get_fund_fee(self, fund_code: str) -> dict[str, Any]:
        """获取基金费率信息"""
        return self._adapter.get_fund_fee(fund_code)

    def get_fund_rating(self, fund_code: str) -> int | None:
        """获取基金评级"""
        return self._adapter.get_fund_rating(fund_code)

    def batch_get_fund_nav(
        self,
        fund_codes: list[str],
        start_date: date | None = None,
        end_date: date | None = None,
    ) -> dict[str, pd.DataFrame]:
        """批量获取基金净值"""
        return self._adapter.batch_get_fund_nav(fund_codes, start_date, end_date)

    # ========== P0 级别接口 (18个) ==========

    def get_all_fund_names(self) -> pd.DataFrame:
        """获取所有基金名称列表"""
        return self._adapter.get_all_fund_names()

    def get_fund_info_ths(self, code: str) -> dict[str, Any]:
        """获取同花顺基金基本信息"""
        return self._adapter.get_fund_info_ths(code)

    def get_index_fund_info(
        self,
        category: str = "全部",
        indicator: str = "全部",
    ) -> pd.DataFrame:
        """获取指数型基金基本信息"""
        return self._adapter.get_index_fund_info(category, indicator)

    def get_fund_overview(self, code: str) -> dict[str, Any]:
        """获取基金档案基本概况"""
        return self._adapter.get_fund_overview(code)

    def get_fund_purchase_status(self) -> pd.DataFrame:
        """获取基金申购/赎回状态"""
        return self._adapter.get_fund_purchase_status()

    def get_fund_daily_nav(self) -> pd.DataFrame:
        """获取开放式基金每日净值(全部)"""
        return self._adapter.get_fund_daily_nav()

    def get_etf_spot(self) -> pd.DataFrame:
        """获取ETF实时行情"""
        return self._adapter.get_etf_spot()

    def get_fund_category_spot(
        self,
        category: str = "",
        date: str | None = None,
    ) -> pd.DataFrame:
        """获取同花顺基金实时行情(按类型)"""
        return self._adapter.get_fund_category_spot(category, date)

    def get_etf_spot_ths(self, date: str | None = None) -> pd.DataFrame:
        """获取同花顺ETF实时行情"""
        return self._adapter.get_etf_spot_ths(date)

    def get_lof_spot(self) -> pd.DataFrame:
        """获取LOF实时行情"""
        return self._adapter.get_lof_spot()

    def get_etf_hist(
        self,
        code: str,
        period: str = "daily",
        start: str | None = None,
        end: str | None = None,
    ) -> pd.DataFrame:
        """获取ETF历史行情"""
        return self._adapter.get_etf_hist(code, period, start, end)

    def get_lof_hist(
        self,
        code: str,
        period: str = "daily",
        start: str | None = None,
        end: str | None = None,
    ) -> pd.DataFrame:
        """获取LOF历史行情"""
        return self._adapter.get_lof_hist(code, period, start, end)

    def get_etf_minute(
        self,
        code: str,
        period: str = "1",
        start: str | None = None,
        end: str | None = None,
    ) -> pd.DataFrame:
        """获取ETF分时行情"""
        return self._adapter.get_etf_minute(code, period, start, end)

    def get_lof_minute(
        self,
        code: str,
        period: str = "1",
        start: str | None = None,
        end: str | None = None,
    ) -> pd.DataFrame:
        """获取LOF分时行情"""
        return self._adapter.get_lof_minute(code, period, start, end)

    def get_fund_bond_holdings(
        self,
        code: str,
        year: int | None = None,
    ) -> pd.DataFrame:
        """获取基金债券持仓"""
        return self._adapter.get_fund_bond_holdings(code, year)

    def get_fund_industry_allocation(
        self,
        code: str,
        year: int | None = None,
    ) -> pd.DataFrame:
        """获取基金行业配置"""
        return self._adapter.get_fund_industry_allocation(code, year)

    def get_fund_portfolio_change(
        self,
        code: str,
        indicator: str = "累计买入",
        year: int | None = None,
    ) -> pd.DataFrame:
        """获取基金重大变动(累计买入/卖出)"""
        return self._adapter.get_fund_portfolio_change(code, indicator, year)

    def get_all_fund_managers(self) -> pd.DataFrame:
        """获取基金经理大全"""
        return self._adapter.get_all_fund_managers()

    # ========== P1 级别接口 (25个) ==========

    # ---------- 基金公司/规模 (5个) ----------

    def get_fund_company_aum(self) -> pd.DataFrame:
        """基金公司管理规模排名"""
        return self._adapter.get_fund_company_aum()

    def get_fund_aum_trend(self) -> pd.DataFrame:
        """基金市场管理规模走势"""
        return self._adapter.get_fund_aum_trend()

    def get_fund_company_aum_history(self, year: int | None = None) -> pd.DataFrame:
        """基金公司历年管理规模"""
        return self._adapter.get_fund_company_aum_history(year)

    def get_fund_scale_change(self) -> pd.DataFrame:
        """规模变动(全市场汇总)"""
        return self._adapter.get_fund_scale_change()

    def get_fund_holder_structure(self) -> pd.DataFrame:
        """持有人结构"""
        return self._adapter.get_fund_holder_structure()

    # ---------- 基金评级 (4个) ----------

    def get_fund_ratings(self) -> pd.DataFrame:
        """基金评级总汇"""
        return self._adapter.get_fund_ratings()

    def get_fund_rating_sh(self, date: str | None = None) -> pd.DataFrame:
        """上海证券评级"""
        return self._adapter.get_fund_rating_sh(date)

    def get_fund_rating_zs(self, date: str | None = None) -> pd.DataFrame:
        """招商证券评级"""
        return self._adapter.get_fund_rating_zs(date)

    def get_fund_rating_ja(self, date: str | None = None) -> pd.DataFrame:
        """济安金信评级"""
        return self._adapter.get_fund_rating_ja(date)

    # ---------- 基金分红/拆分 (3个) ----------

    def get_fund_dividends(
        self,
        year: int | None = None,
        fund_type: str = "",  # type: ignore[assignment]
        page: int = -1,
    ) -> pd.DataFrame:
        """基金分红"""
        return self._adapter.get_fund_dividends(year, fund_type, page)

    def get_fund_splits(
        self,
        year: int | None = None,
        fund_type: str = "",  # type: ignore[assignment]
        page: int = -1,
    ) -> pd.DataFrame:
        """基金拆分"""
        return self._adapter.get_fund_splits(year, fund_type, page)

    def get_fund_dividend_rank(self) -> pd.DataFrame:
        """累计分红排行"""
        return self._adapter.get_fund_dividend_rank()

    # ---------- 基金排行 (5个) ----------

    def get_fund_rank_by_type(self, fund_type: str = "全部") -> pd.DataFrame:
        """开放式基金排行"""
        return self._adapter.get_fund_rank_by_type(fund_type)

    def get_exchange_fund_rank(self) -> pd.DataFrame:
        """场内交易基金排行"""
        return self._adapter.get_exchange_fund_rank()

    def get_money_fund_rank(self) -> pd.DataFrame:
        """货币型基金排行"""
        return self._adapter.get_money_fund_rank()

    def get_lcx_fund_rank(self) -> pd.DataFrame:
        """理财基金排行"""
        return self._adapter.get_lcx_fund_rank()

    def get_hk_fund_rank(self) -> pd.DataFrame:
        """香港基金排行"""
        return self._adapter.get_hk_fund_rank()

    # ---------- 基金业绩/分析 (3个) ----------

    def get_fund_achievement(self, code: str) -> pd.DataFrame:
        """基金业绩(年度+阶段)"""
        return self._adapter.get_fund_achievement(code)

    def get_fund_risk_analysis(self, code: str) -> pd.DataFrame:
        """基金数据分析(夏普/回撤)"""
        return self._adapter.get_fund_risk_analysis(code)

    def get_fund_profit_probability(self, code: str) -> pd.DataFrame:
        """盈利概率"""
        return self._adapter.get_fund_profit_probability(code)

    # ---------- 资产配置 (1个) ----------

    def get_fund_asset_allocation(
        self, code: str, date: str | None = None
    ) -> pd.DataFrame:
        """基金资产配置"""
        return self._adapter.get_fund_asset_allocation(code, date)

    # ---------- 市场指数扩展 (6个) ----------

    def get_index_spot_em(self, category: str = "沪深重要指数") -> pd.DataFrame:
        """东财指数实时行情"""
        return self._adapter.get_index_spot_em(category)

    def get_index_spot_sina(self) -> pd.DataFrame:
        """新浪指数实时行情"""
        return self._adapter.get_index_spot_sina()

    def get_index_daily_tx(
        self, code: str, start: str | None = None, end: str | None = None
    ) -> pd.DataFrame:
        """腾讯指数历史"""
        return self._adapter.get_index_daily_tx(code, start, end)

    def get_index_daily_em(
        self, code: str, start: str | None = None, end: str | None = None
    ) -> pd.DataFrame:
        """东财指数历史"""
        return self._adapter.get_index_daily_em(code, start, end)

    def get_index_hist(
        self,
        code: str,
        period: str = "daily",
        start: str | None = None,
        end: str | None = None,
    ) -> pd.DataFrame:
        """指数通用历史"""
        return self._adapter.get_index_hist(code, period, start, end)

    def get_index_minute(
        self,
        code: str,
        period: str = "1",
        start: str | None = None,
        end: str | None = None,
    ) -> pd.DataFrame:
        """指数分时"""
        return self._adapter.get_index_minute(code, period, start, end)

    # ========== P2 级别接口 (57个) ==========

    # ---------- 宏观经济数据 (22个) ----------

    def get_macro_leverage_ratio(self) -> pd.DataFrame:
        """获取中国宏观杠杆率数据"""
        return self._adapter.get_macro_leverage_ratio()

    def get_enterprise_price_index(self) -> pd.DataFrame:
        """获取企业商品价格指数"""
        return self._adapter.get_enterprise_price_index()

    def get_fdi_data(self) -> pd.DataFrame:
        """获取外商直接投资数据"""
        return self._adapter.get_fdi_data()

    def get_lpr_data(self) -> pd.DataFrame:
        """获取LPR品种数据"""
        return self._adapter.get_lpr_data()

    def get_urban_unemployment(self) -> pd.DataFrame:
        """获取城镇调查失业率"""
        return self._adapter.get_urban_unemployment()

    def get_social_financing(self) -> pd.DataFrame:
        """获取社会融资规模增量统计"""
        return self._adapter.get_social_financing()

    def get_gdp_yearly(self) -> pd.DataFrame:
        """获取中国GDP年率数据"""
        return self._adapter.get_gdp_yearly()

    def get_gdp_quarterly(self) -> pd.DataFrame:
        """获取中国GDP季度数据"""
        return self._adapter.get_gdp_quarterly()

    def get_cpi_yearly(self) -> pd.DataFrame:
        """获取中国CPI年率数据"""
        return self._adapter.get_cpi_yearly()

    def get_cpi_monthly(self) -> pd.DataFrame:
        """获取中国CPI月率数据"""
        return self._adapter.get_cpi_monthly()

    def get_ppi_yearly(self) -> pd.DataFrame:
        """获取中国PPI年率数据"""
        return self._adapter.get_ppi_yearly()

    def get_ppi_monthly(self) -> pd.DataFrame:
        """获取中国PPI月率数据"""
        return self._adapter.get_ppi_monthly()

    def get_exports_yearly(self) -> pd.DataFrame:
        """获取出口年率数据"""
        return self._adapter.get_exports_yearly()

    def get_imports_yearly(self) -> pd.DataFrame:
        """获取进口年率数据"""
        return self._adapter.get_imports_yearly()

    def get_trade_balance(self) -> pd.DataFrame:
        """获取贸易帐数据"""
        return self._adapter.get_trade_balance()

    def get_industrial_production(self) -> pd.DataFrame:
        """获取工业增加值增长数据"""
        return self._adapter.get_industrial_production()

    def get_pmi_official(self) -> pd.DataFrame:
        """获取官方制造业PMI数据"""
        return self._adapter.get_pmi_official()

    def get_pmi_caixin(self) -> pd.DataFrame:
        """获取财新制造业PMI数据"""
        return self._adapter.get_pmi_caixin()

    def get_services_pmi(self) -> pd.DataFrame:
        """获取财新服务业PMI数据"""
        return self._adapter.get_services_pmi()

    def get_non_manufacturing_pmi(self) -> pd.DataFrame:
        """获取官方非制造业PMI数据"""
        return self._adapter.get_non_manufacturing_pmi()

    def get_m2_yearly(self) -> pd.DataFrame:
        """获取M2货币供应年率数据"""
        return self._adapter.get_m2_yearly()

    def get_new_loan(self) -> pd.DataFrame:
        """获取新增人民币贷款数据"""
        return self._adapter.get_new_loan()

    def get_retail_sales_yearly(self) -> pd.DataFrame:
        """获取社会消费品零售总额年率数据"""
        return self._adapter.get_retail_sales_yearly()

    def get_fixed_asset_investment(self) -> pd.DataFrame:
        """获取固定资产投资年率数据"""
        return self._adapter.get_fixed_asset_investment()

    # ---------- 利率数据 (8个) ----------

    def get_china_interest_rate(self) -> pd.DataFrame:
        """获取中国央行利率决议数据"""
        return self._adapter.get_china_interest_rate()

    def get_usa_interest_rate(self) -> pd.DataFrame:
        """获取美联储利率决议数据"""
        return self._adapter.get_usa_interest_rate()

    def get_euro_interest_rate(self) -> pd.DataFrame:
        """获取欧洲央行利率决议数据"""
        return self._adapter.get_euro_interest_rate()

    def get_japan_interest_rate(self) -> pd.DataFrame:
        """获取日本央行利率决议数据"""
        return self._adapter.get_japan_interest_rate()

    def get_uk_interest_rate(self) -> pd.DataFrame:
        """获取英国央行利率决议数据"""
        return self._adapter.get_uk_interest_rate()

    def get_shibor(self) -> pd.DataFrame:
        """获取SHIBOR利率数据"""
        return self._adapter.get_shibor()

    def get_shibor_lpr(self) -> pd.DataFrame:
        """获取SHIBOR-LPR数据"""
        return self._adapter.get_shibor_lpr()

    def get_hibor(self) -> pd.DataFrame:
        """获取HIBOR利率数据"""
        return self._adapter.get_hibor()

    # ---------- 行业板块 (5个) ----------

    def get_industry_boards(self) -> pd.DataFrame:
        """获取行业板块名称列表"""
        return self._adapter.get_industry_boards()

    def get_industry_board_hist(
        self,
        code: str,
        period: str = "daily",
        start_date: str | None = None,
        end_date: str | None = None,
    ) -> pd.DataFrame:
        """获取行业板块历史行情"""
        return self._adapter.get_industry_board_hist(code, period, start_date, end_date)

    def get_concept_boards(self) -> pd.DataFrame:
        """获取概念板块名称列表"""
        return self._adapter.get_concept_boards()

    def get_concept_board_hist(
        self,
        code: str,
        period: str = "daily",
        start_date: str | None = None,
        end_date: str | None = None,
    ) -> pd.DataFrame:
        """获取概念板块历史行情"""
        return self._adapter.get_concept_board_hist(code, period, start_date, end_date)

    def get_sector_fund_flow(self, period: str = "今日") -> pd.DataFrame:
        """获取板块资金流向"""
        return self._adapter.get_sector_fund_flow(period)

    # ---------- 债券数据 (7个) ----------

    def get_china_us_bond_yield(self) -> pd.DataFrame:
        """获取中美国债收益率数据"""
        return self._adapter.get_china_us_bond_yield()

    def get_bond_yield_curve(
        self,
        bond_type: str = "国债",
        period: str = "daily",
        start_date: str | None = None,
        end_date: str | None = None,
    ) -> pd.DataFrame:
        """获取收盘收益率曲线历史数据"""
        return self._adapter.get_bond_yield_curve(bond_type, period, start_date, end_date)

    def get_bond_spot_quote(self) -> pd.DataFrame:
        """获取现券市场做市报价"""
        return self._adapter.get_bond_spot_quote()

    def get_convertible_bonds(self) -> pd.DataFrame:
        """获取可转债数据一览表"""
        return self._adapter.get_convertible_bonds()

    def get_convertible_bond_detail(self, code: str) -> pd.DataFrame:
        """获取可转债详情数据"""
        return self._adapter.get_convertible_bond_detail(code)

    def get_bond_spot(self, code: str) -> pd.DataFrame:
        """获取沪深债券实时行情"""
        return self._adapter.get_bond_spot(code)

    def get_bond_hist(
        self,
        code: str,
        period: str = "daily",
        start_date: str | None = None,
        end_date: str | None = None,
    ) -> pd.DataFrame:
        """获取沪深债券历史行情"""
        return self._adapter.get_bond_hist(code, period, start_date, end_date)

    # ---------- 估值指标 (5个) ----------

    def get_a_share_valuation(self) -> pd.DataFrame:
        """获取A股等权重与中位数市盈率/市净率"""
        return self._adapter.get_a_share_valuation()

    def get_stock_valuation_lg(self, code: str) -> pd.DataFrame:
        """获取个股估值数据(乐咕乐股)"""
        return self._adapter.get_stock_valuation_lg(code)

    def get_index_valuation(
        self, code: str, indicator: str = "pe"
    ) -> pd.DataFrame:
        """获取指数估值历史数据(乐咕乐股)"""
        return self._adapter.get_index_valuation(code, indicator)

    def get_market_pe_lg(self, code: str) -> pd.DataFrame:
        """获取指数市盈率数据(乐咕乐股)"""
        return self._adapter.get_market_pe_lg(code)

    def get_market_pb_lg(self, code: str) -> pd.DataFrame:
        """获取指数市净率数据(乐咕乐股)"""
        return self._adapter.get_market_pb_lg(code)

    # ---------- 资金流向 (3个) ----------

    def get_market_fund_flow(self) -> pd.DataFrame:
        """获取大盘资金流向数据"""
        return self._adapter.get_market_fund_flow()

    def get_stock_fund_flow(self, code: str, market: str = "sh") -> pd.DataFrame:
        """获取个股资金流向数据"""
        return self._adapter.get_stock_fund_flow(code, market)

    def get_north_fund_flow(self, market: str = "北向资金") -> pd.DataFrame:
        """获取北向资金流向数据"""
        return self._adapter.get_north_fund_flow(market)

    # ========== 缓存管理 ==========

    def clear_cache(self) -> None:
        """清空缓存"""
        self._cache.clear()

    def get_cache_stats(self) -> dict:
        """获取缓存统计"""
        return self._cache.get_stats()

    def __repr__(self) -> str:
        sources = list(self._adapters.keys())
        return f"DataManager(sources={sources}, primary={self._primary_source})"

available_sources property

available_sources: list[str]

获取可用数据源列表

source_priority property

source_priority: list[str]

获取数据源优先级列表

gateway property

gateway: DataSourceGateway

获取数据源网关实例

__init__

__init__(
    cache: DataCache | None = None,
    primary_source: str | None = None,
)

初始化数据管理器

参数:

名称 类型 描述 默认
cache DataCache | None

缓存管理器

None
primary_source str | None

主数据源名称,默认使用配置中的设置

None
源代码位于: src/fund_cli/core/data_manager.py
def __init__(
    self,
    cache: DataCache | None = None,
    primary_source: str | None = None,
):
    """
    初始化数据管理器

    Args:
        cache: 缓存管理器
        primary_source: 主数据源名称,默认使用配置中的设置
    """
    self.config = get_config()
    self._cache = cache or DataCache(
        cache_dir=self.config.data.cache_dir,
        default_ttl=self.config.data.cache_ttl,
    )
    self._primary_source = primary_source or self.config.data.primary_source
    self._adapters: dict[str, DataSourceAdapter] = {}
    self._gateway = DataSourceGateway()

    # 初始化数据源
    self._init_adapters()

register_adapter

register_adapter(
    name: str, adapter: DataSourceAdapter
) -> None

注册新的数据源适配器

参数:

名称 类型 描述 默认
name str

适配器名称

必需
adapter DataSourceAdapter

适配器实例

必需
源代码位于: src/fund_cli/core/data_manager.py
def register_adapter(self, name: str, adapter: DataSourceAdapter) -> None:
    """
    注册新的数据源适配器

    Args:
        name: 适配器名称
        adapter: 适配器实例
    """
    self._adapters[name] = adapter
    print(f"[DataManager] 已注册适配器: {name}")

get_adapter

get_adapter(source: str | None = None) -> DataSourceAdapter

获取数据源适配器

参数:

名称 类型 描述 默认
source str | None

数据源名称,默认使用主数据源

None

返回:

类型 描述
DataSourceAdapter

数据源适配器实例

引发:

类型 描述
DataSourceError

数据源不可用

源代码位于: src/fund_cli/core/data_manager.py
def get_adapter(self, source: str | None = None) -> DataSourceAdapter:
    """
    获取数据源适配器

    Args:
        source: 数据源名称,默认使用主数据源

    Returns:
        数据源适配器实例

    Raises:
        DataSourceError: 数据源不可用
    """
    source_name = source or self._primary_source

    if source_name not in self._adapters:
        raise DataSourceError(f"数据源 {source_name} 未配置或不可用")

    adapter = self._adapters[source_name]

    if not adapter.is_available():
        raise DataSourceError(f"数据源 {source_name} 不可用")

    return adapter

get_fund_info

get_fund_info(fund_code: str) -> dict[str, Any]

获取基金基础信息

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需

返回:

类型 描述
dict[str, Any]

基金信息字典

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_info(self, fund_code: str) -> dict[str, Any]:
    """
    获取基金基础信息

    Args:
        fund_code: 基金代码

    Returns:
        基金信息字典
    """
    return self._adapter.get_fund_info(fund_code)

get_fund_nav

get_fund_nav(
    fund_code: str,
    start_date: date | None = None,
    end_date: date | None = None,
) -> pd.DataFrame

获取基金净值数据

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
start_date date | None

开始日期

None
end_date date | None

结束日期

None

返回:

类型 描述
DataFrame

净值数据 DataFrame

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_nav(
    self,
    fund_code: str,
    start_date: date | None = None,
    end_date: date | None = None,
) -> pd.DataFrame:
    """
    获取基金净值数据

    Args:
        fund_code: 基金代码
        start_date: 开始日期
        end_date: 结束日期

    Returns:
        净值数据 DataFrame
    """
    return self._adapter.get_fund_nav(fund_code, start_date, end_date)

search_funds

search_funds(
    fund_type: str | None = None,
    company: str | None = None,
    min_scale: float | None = None,
    max_scale: float | None = None,
    keyword: str | None = None,
    limit: int = 100,
) -> pd.DataFrame

搜索基金

参数:

名称 类型 描述 默认
fund_type str | None

基金类型

None
company str | None

基金公司

None
min_scale float | None

最小规模

None
max_scale float | None

最大规模

None
keyword str | None

关键词

None
limit int

返回数量限制

100

返回:

类型 描述
DataFrame

基金列表 DataFrame

源代码位于: src/fund_cli/core/data_manager.py
def search_funds(
    self,
    fund_type: str | None = None,
    company: str | None = None,
    min_scale: float | None = None,
    max_scale: float | None = None,
    keyword: str | None = None,
    limit: int = 100,
) -> pd.DataFrame:
    """
    搜索基金

    Args:
        fund_type: 基金类型
        company: 基金公司
        min_scale: 最小规模
        max_scale: 最大规模
        keyword: 关键词
        limit: 返回数量限制

    Returns:
        基金列表 DataFrame
    """
    return self._adapter.search_funds(
        fund_type=fund_type,
        company=company,
        min_scale=min_scale,
        max_scale=max_scale,
        keyword=keyword,
        limit=limit,
    )

get_fund_list

get_fund_list(fund_type: str | None = None) -> pd.DataFrame

获取基金列表

参数:

名称 类型 描述 默认
fund_type str | None

基金类型筛选

None

返回:

类型 描述
DataFrame

基金列表 DataFrame

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_list(self, fund_type: str | None = None) -> pd.DataFrame:
    """
    获取基金列表

    Args:
        fund_type: 基金类型筛选

    Returns:
        基金列表 DataFrame
    """
    return self._adapter.get_fund_list(fund_type)

get_benchmark_nav

get_benchmark_nav(
    benchmark_code: str,
    start_date: date | None = None,
    end_date: date | None = None,
) -> pd.DataFrame

获取基准指数数据

参数:

名称 类型 描述 默认
benchmark_code str

基准指数代码

必需
start_date date | None

开始日期

None
end_date date | None

结束日期

None

返回:

类型 描述
DataFrame

基准数据 DataFrame

源代码位于: src/fund_cli/core/data_manager.py
def get_benchmark_nav(
    self,
    benchmark_code: str,
    start_date: date | None = None,
    end_date: date | None = None,
) -> pd.DataFrame:
    """
    获取基准指数数据

    Args:
        benchmark_code: 基准指数代码
        start_date: 开始日期
        end_date: 结束日期

    Returns:
        基准数据 DataFrame
    """
    return self._adapter.get_benchmark_nav(benchmark_code, start_date, end_date)

get_fund_holdings

get_fund_holdings(
    fund_code: str, report_date: date | None = None
) -> pd.DataFrame

获取基金持仓数据

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_holdings(
    self,
    fund_code: str,
    report_date: date | None = None,
) -> pd.DataFrame:
    """获取基金持仓数据"""
    return self._adapter.get_fund_holdings(fund_code, report_date)

get_fund_manager

get_fund_manager(fund_code: str) -> dict[str, Any]

获取基金经理信息

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_manager(self, fund_code: str) -> dict[str, Any]:
    """获取基金经理信息"""
    return self._adapter.get_fund_manager(fund_code)

get_fund_fee

get_fund_fee(fund_code: str) -> dict[str, Any]

获取基金费率信息

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_fee(self, fund_code: str) -> dict[str, Any]:
    """获取基金费率信息"""
    return self._adapter.get_fund_fee(fund_code)

get_fund_rating

get_fund_rating(fund_code: str) -> int | None

获取基金评级

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_rating(self, fund_code: str) -> int | None:
    """获取基金评级"""
    return self._adapter.get_fund_rating(fund_code)

batch_get_fund_nav

batch_get_fund_nav(
    fund_codes: list[str],
    start_date: date | None = None,
    end_date: date | None = None,
) -> dict[str, pd.DataFrame]

批量获取基金净值

源代码位于: src/fund_cli/core/data_manager.py
def batch_get_fund_nav(
    self,
    fund_codes: list[str],
    start_date: date | None = None,
    end_date: date | None = None,
) -> dict[str, pd.DataFrame]:
    """批量获取基金净值"""
    return self._adapter.batch_get_fund_nav(fund_codes, start_date, end_date)

get_all_fund_names

get_all_fund_names() -> pd.DataFrame

获取所有基金名称列表

源代码位于: src/fund_cli/core/data_manager.py
def get_all_fund_names(self) -> pd.DataFrame:
    """获取所有基金名称列表"""
    return self._adapter.get_all_fund_names()

get_fund_info_ths

get_fund_info_ths(code: str) -> dict[str, Any]

获取同花顺基金基本信息

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_info_ths(self, code: str) -> dict[str, Any]:
    """获取同花顺基金基本信息"""
    return self._adapter.get_fund_info_ths(code)

get_index_fund_info

get_index_fund_info(
    category: str = "全部", indicator: str = "全部"
) -> pd.DataFrame

获取指数型基金基本信息

源代码位于: src/fund_cli/core/data_manager.py
def get_index_fund_info(
    self,
    category: str = "全部",
    indicator: str = "全部",
) -> pd.DataFrame:
    """获取指数型基金基本信息"""
    return self._adapter.get_index_fund_info(category, indicator)

get_fund_overview

get_fund_overview(code: str) -> dict[str, Any]

获取基金档案基本概况

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_overview(self, code: str) -> dict[str, Any]:
    """获取基金档案基本概况"""
    return self._adapter.get_fund_overview(code)

get_fund_purchase_status

get_fund_purchase_status() -> pd.DataFrame

获取基金申购/赎回状态

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_purchase_status(self) -> pd.DataFrame:
    """获取基金申购/赎回状态"""
    return self._adapter.get_fund_purchase_status()

get_fund_daily_nav

get_fund_daily_nav() -> pd.DataFrame

获取开放式基金每日净值(全部)

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_daily_nav(self) -> pd.DataFrame:
    """获取开放式基金每日净值(全部)"""
    return self._adapter.get_fund_daily_nav()

get_etf_spot

get_etf_spot() -> pd.DataFrame

获取ETF实时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_etf_spot(self) -> pd.DataFrame:
    """获取ETF实时行情"""
    return self._adapter.get_etf_spot()

get_fund_category_spot

get_fund_category_spot(
    category: str = "", date: str | None = None
) -> pd.DataFrame

获取同花顺基金实时行情(按类型)

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_category_spot(
    self,
    category: str = "",
    date: str | None = None,
) -> pd.DataFrame:
    """获取同花顺基金实时行情(按类型)"""
    return self._adapter.get_fund_category_spot(category, date)

get_etf_spot_ths

get_etf_spot_ths(date: str | None = None) -> pd.DataFrame

获取同花顺ETF实时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_etf_spot_ths(self, date: str | None = None) -> pd.DataFrame:
    """获取同花顺ETF实时行情"""
    return self._adapter.get_etf_spot_ths(date)

get_lof_spot

get_lof_spot() -> pd.DataFrame

获取LOF实时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_lof_spot(self) -> pd.DataFrame:
    """获取LOF实时行情"""
    return self._adapter.get_lof_spot()

get_etf_hist

get_etf_hist(
    code: str,
    period: str = "daily",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

获取ETF历史行情

源代码位于: src/fund_cli/core/data_manager.py
def get_etf_hist(
    self,
    code: str,
    period: str = "daily",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame:
    """获取ETF历史行情"""
    return self._adapter.get_etf_hist(code, period, start, end)

get_lof_hist

get_lof_hist(
    code: str,
    period: str = "daily",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

获取LOF历史行情

源代码位于: src/fund_cli/core/data_manager.py
def get_lof_hist(
    self,
    code: str,
    period: str = "daily",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame:
    """获取LOF历史行情"""
    return self._adapter.get_lof_hist(code, period, start, end)

get_etf_minute

get_etf_minute(
    code: str,
    period: str = "1",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

获取ETF分时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_etf_minute(
    self,
    code: str,
    period: str = "1",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame:
    """获取ETF分时行情"""
    return self._adapter.get_etf_minute(code, period, start, end)

get_lof_minute

get_lof_minute(
    code: str,
    period: str = "1",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

获取LOF分时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_lof_minute(
    self,
    code: str,
    period: str = "1",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame:
    """获取LOF分时行情"""
    return self._adapter.get_lof_minute(code, period, start, end)

get_fund_bond_holdings

get_fund_bond_holdings(
    code: str, year: int | None = None
) -> pd.DataFrame

获取基金债券持仓

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_bond_holdings(
    self,
    code: str,
    year: int | None = None,
) -> pd.DataFrame:
    """获取基金债券持仓"""
    return self._adapter.get_fund_bond_holdings(code, year)

get_fund_industry_allocation

get_fund_industry_allocation(
    code: str, year: int | None = None
) -> pd.DataFrame

获取基金行业配置

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_industry_allocation(
    self,
    code: str,
    year: int | None = None,
) -> pd.DataFrame:
    """获取基金行业配置"""
    return self._adapter.get_fund_industry_allocation(code, year)

get_fund_portfolio_change

get_fund_portfolio_change(
    code: str,
    indicator: str = "累计买入",
    year: int | None = None,
) -> pd.DataFrame

获取基金重大变动(累计买入/卖出)

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_portfolio_change(
    self,
    code: str,
    indicator: str = "累计买入",
    year: int | None = None,
) -> pd.DataFrame:
    """获取基金重大变动(累计买入/卖出)"""
    return self._adapter.get_fund_portfolio_change(code, indicator, year)

get_all_fund_managers

get_all_fund_managers() -> pd.DataFrame

获取基金经理大全

源代码位于: src/fund_cli/core/data_manager.py
def get_all_fund_managers(self) -> pd.DataFrame:
    """获取基金经理大全"""
    return self._adapter.get_all_fund_managers()

get_fund_company_aum

get_fund_company_aum() -> pd.DataFrame

基金公司管理规模排名

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_company_aum(self) -> pd.DataFrame:
    """基金公司管理规模排名"""
    return self._adapter.get_fund_company_aum()

get_fund_aum_trend

get_fund_aum_trend() -> pd.DataFrame

基金市场管理规模走势

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_aum_trend(self) -> pd.DataFrame:
    """基金市场管理规模走势"""
    return self._adapter.get_fund_aum_trend()

get_fund_company_aum_history

get_fund_company_aum_history(
    year: int | None = None,
) -> pd.DataFrame

基金公司历年管理规模

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_company_aum_history(self, year: int | None = None) -> pd.DataFrame:
    """基金公司历年管理规模"""
    return self._adapter.get_fund_company_aum_history(year)

get_fund_scale_change

get_fund_scale_change() -> pd.DataFrame

规模变动(全市场汇总)

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_scale_change(self) -> pd.DataFrame:
    """规模变动(全市场汇总)"""
    return self._adapter.get_fund_scale_change()

get_fund_holder_structure

get_fund_holder_structure() -> pd.DataFrame

持有人结构

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_holder_structure(self) -> pd.DataFrame:
    """持有人结构"""
    return self._adapter.get_fund_holder_structure()

get_fund_ratings

get_fund_ratings() -> pd.DataFrame

基金评级总汇

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_ratings(self) -> pd.DataFrame:
    """基金评级总汇"""
    return self._adapter.get_fund_ratings()

get_fund_rating_sh

get_fund_rating_sh(date: str | None = None) -> pd.DataFrame

上海证券评级

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_rating_sh(self, date: str | None = None) -> pd.DataFrame:
    """上海证券评级"""
    return self._adapter.get_fund_rating_sh(date)

get_fund_rating_zs

get_fund_rating_zs(date: str | None = None) -> pd.DataFrame

招商证券评级

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_rating_zs(self, date: str | None = None) -> pd.DataFrame:
    """招商证券评级"""
    return self._adapter.get_fund_rating_zs(date)

get_fund_rating_ja

get_fund_rating_ja(date: str | None = None) -> pd.DataFrame

济安金信评级

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_rating_ja(self, date: str | None = None) -> pd.DataFrame:
    """济安金信评级"""
    return self._adapter.get_fund_rating_ja(date)

get_fund_dividends

get_fund_dividends(
    year: int | None = None,
    fund_type: str = "",
    page: int = -1,
) -> pd.DataFrame

基金分红

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_dividends(
    self,
    year: int | None = None,
    fund_type: str = "",  # type: ignore[assignment]
    page: int = -1,
) -> pd.DataFrame:
    """基金分红"""
    return self._adapter.get_fund_dividends(year, fund_type, page)

get_fund_splits

get_fund_splits(
    year: int | None = None,
    fund_type: str = "",
    page: int = -1,
) -> pd.DataFrame

基金拆分

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_splits(
    self,
    year: int | None = None,
    fund_type: str = "",  # type: ignore[assignment]
    page: int = -1,
) -> pd.DataFrame:
    """基金拆分"""
    return self._adapter.get_fund_splits(year, fund_type, page)

get_fund_dividend_rank

get_fund_dividend_rank() -> pd.DataFrame

累计分红排行

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_dividend_rank(self) -> pd.DataFrame:
    """累计分红排行"""
    return self._adapter.get_fund_dividend_rank()

get_fund_rank_by_type

get_fund_rank_by_type(
    fund_type: str = "全部",
) -> pd.DataFrame

开放式基金排行

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_rank_by_type(self, fund_type: str = "全部") -> pd.DataFrame:
    """开放式基金排行"""
    return self._adapter.get_fund_rank_by_type(fund_type)

get_exchange_fund_rank

get_exchange_fund_rank() -> pd.DataFrame

场内交易基金排行

源代码位于: src/fund_cli/core/data_manager.py
def get_exchange_fund_rank(self) -> pd.DataFrame:
    """场内交易基金排行"""
    return self._adapter.get_exchange_fund_rank()

get_money_fund_rank

get_money_fund_rank() -> pd.DataFrame

货币型基金排行

源代码位于: src/fund_cli/core/data_manager.py
def get_money_fund_rank(self) -> pd.DataFrame:
    """货币型基金排行"""
    return self._adapter.get_money_fund_rank()

get_lcx_fund_rank

get_lcx_fund_rank() -> pd.DataFrame

理财基金排行

源代码位于: src/fund_cli/core/data_manager.py
def get_lcx_fund_rank(self) -> pd.DataFrame:
    """理财基金排行"""
    return self._adapter.get_lcx_fund_rank()

get_hk_fund_rank

get_hk_fund_rank() -> pd.DataFrame

香港基金排行

源代码位于: src/fund_cli/core/data_manager.py
def get_hk_fund_rank(self) -> pd.DataFrame:
    """香港基金排行"""
    return self._adapter.get_hk_fund_rank()

get_fund_achievement

get_fund_achievement(code: str) -> pd.DataFrame

基金业绩(年度+阶段)

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_achievement(self, code: str) -> pd.DataFrame:
    """基金业绩(年度+阶段)"""
    return self._adapter.get_fund_achievement(code)

get_fund_risk_analysis

get_fund_risk_analysis(code: str) -> pd.DataFrame

基金数据分析(夏普/回撤)

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_risk_analysis(self, code: str) -> pd.DataFrame:
    """基金数据分析(夏普/回撤)"""
    return self._adapter.get_fund_risk_analysis(code)

get_fund_profit_probability

get_fund_profit_probability(code: str) -> pd.DataFrame

盈利概率

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_profit_probability(self, code: str) -> pd.DataFrame:
    """盈利概率"""
    return self._adapter.get_fund_profit_probability(code)

get_fund_asset_allocation

get_fund_asset_allocation(
    code: str, date: str | None = None
) -> pd.DataFrame

基金资产配置

源代码位于: src/fund_cli/core/data_manager.py
def get_fund_asset_allocation(
    self, code: str, date: str | None = None
) -> pd.DataFrame:
    """基金资产配置"""
    return self._adapter.get_fund_asset_allocation(code, date)

get_index_spot_em

get_index_spot_em(category: str = '沪深重要指数') -> pd.DataFrame

东财指数实时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_index_spot_em(self, category: str = "沪深重要指数") -> pd.DataFrame:
    """东财指数实时行情"""
    return self._adapter.get_index_spot_em(category)

get_index_spot_sina

get_index_spot_sina() -> pd.DataFrame

新浪指数实时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_index_spot_sina(self) -> pd.DataFrame:
    """新浪指数实时行情"""
    return self._adapter.get_index_spot_sina()

get_index_daily_tx

get_index_daily_tx(
    code: str,
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

腾讯指数历史

源代码位于: src/fund_cli/core/data_manager.py
def get_index_daily_tx(
    self, code: str, start: str | None = None, end: str | None = None
) -> pd.DataFrame:
    """腾讯指数历史"""
    return self._adapter.get_index_daily_tx(code, start, end)

get_index_daily_em

get_index_daily_em(
    code: str,
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

东财指数历史

源代码位于: src/fund_cli/core/data_manager.py
def get_index_daily_em(
    self, code: str, start: str | None = None, end: str | None = None
) -> pd.DataFrame:
    """东财指数历史"""
    return self._adapter.get_index_daily_em(code, start, end)

get_index_hist

get_index_hist(
    code: str,
    period: str = "daily",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

指数通用历史

源代码位于: src/fund_cli/core/data_manager.py
def get_index_hist(
    self,
    code: str,
    period: str = "daily",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame:
    """指数通用历史"""
    return self._adapter.get_index_hist(code, period, start, end)

get_index_minute

get_index_minute(
    code: str,
    period: str = "1",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame

指数分时

源代码位于: src/fund_cli/core/data_manager.py
def get_index_minute(
    self,
    code: str,
    period: str = "1",
    start: str | None = None,
    end: str | None = None,
) -> pd.DataFrame:
    """指数分时"""
    return self._adapter.get_index_minute(code, period, start, end)

get_macro_leverage_ratio

get_macro_leverage_ratio() -> pd.DataFrame

获取中国宏观杠杆率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_macro_leverage_ratio(self) -> pd.DataFrame:
    """获取中国宏观杠杆率数据"""
    return self._adapter.get_macro_leverage_ratio()

get_enterprise_price_index

get_enterprise_price_index() -> pd.DataFrame

获取企业商品价格指数

源代码位于: src/fund_cli/core/data_manager.py
def get_enterprise_price_index(self) -> pd.DataFrame:
    """获取企业商品价格指数"""
    return self._adapter.get_enterprise_price_index()

get_fdi_data

get_fdi_data() -> pd.DataFrame

获取外商直接投资数据

源代码位于: src/fund_cli/core/data_manager.py
def get_fdi_data(self) -> pd.DataFrame:
    """获取外商直接投资数据"""
    return self._adapter.get_fdi_data()

get_lpr_data

get_lpr_data() -> pd.DataFrame

获取LPR品种数据

源代码位于: src/fund_cli/core/data_manager.py
def get_lpr_data(self) -> pd.DataFrame:
    """获取LPR品种数据"""
    return self._adapter.get_lpr_data()

get_urban_unemployment

get_urban_unemployment() -> pd.DataFrame

获取城镇调查失业率

源代码位于: src/fund_cli/core/data_manager.py
def get_urban_unemployment(self) -> pd.DataFrame:
    """获取城镇调查失业率"""
    return self._adapter.get_urban_unemployment()

get_social_financing

get_social_financing() -> pd.DataFrame

获取社会融资规模增量统计

源代码位于: src/fund_cli/core/data_manager.py
def get_social_financing(self) -> pd.DataFrame:
    """获取社会融资规模增量统计"""
    return self._adapter.get_social_financing()

get_gdp_yearly

get_gdp_yearly() -> pd.DataFrame

获取中国GDP年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_gdp_yearly(self) -> pd.DataFrame:
    """获取中国GDP年率数据"""
    return self._adapter.get_gdp_yearly()

get_gdp_quarterly

get_gdp_quarterly() -> pd.DataFrame

获取中国GDP季度数据

源代码位于: src/fund_cli/core/data_manager.py
def get_gdp_quarterly(self) -> pd.DataFrame:
    """获取中国GDP季度数据"""
    return self._adapter.get_gdp_quarterly()

get_cpi_yearly

get_cpi_yearly() -> pd.DataFrame

获取中国CPI年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_cpi_yearly(self) -> pd.DataFrame:
    """获取中国CPI年率数据"""
    return self._adapter.get_cpi_yearly()

get_cpi_monthly

get_cpi_monthly() -> pd.DataFrame

获取中国CPI月率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_cpi_monthly(self) -> pd.DataFrame:
    """获取中国CPI月率数据"""
    return self._adapter.get_cpi_monthly()

get_ppi_yearly

get_ppi_yearly() -> pd.DataFrame

获取中国PPI年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_ppi_yearly(self) -> pd.DataFrame:
    """获取中国PPI年率数据"""
    return self._adapter.get_ppi_yearly()

get_ppi_monthly

get_ppi_monthly() -> pd.DataFrame

获取中国PPI月率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_ppi_monthly(self) -> pd.DataFrame:
    """获取中国PPI月率数据"""
    return self._adapter.get_ppi_monthly()

get_exports_yearly

get_exports_yearly() -> pd.DataFrame

获取出口年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_exports_yearly(self) -> pd.DataFrame:
    """获取出口年率数据"""
    return self._adapter.get_exports_yearly()

get_imports_yearly

get_imports_yearly() -> pd.DataFrame

获取进口年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_imports_yearly(self) -> pd.DataFrame:
    """获取进口年率数据"""
    return self._adapter.get_imports_yearly()

get_trade_balance

get_trade_balance() -> pd.DataFrame

获取贸易帐数据

源代码位于: src/fund_cli/core/data_manager.py
def get_trade_balance(self) -> pd.DataFrame:
    """获取贸易帐数据"""
    return self._adapter.get_trade_balance()

get_industrial_production

get_industrial_production() -> pd.DataFrame

获取工业增加值增长数据

源代码位于: src/fund_cli/core/data_manager.py
def get_industrial_production(self) -> pd.DataFrame:
    """获取工业增加值增长数据"""
    return self._adapter.get_industrial_production()

get_pmi_official

get_pmi_official() -> pd.DataFrame

获取官方制造业PMI数据

源代码位于: src/fund_cli/core/data_manager.py
def get_pmi_official(self) -> pd.DataFrame:
    """获取官方制造业PMI数据"""
    return self._adapter.get_pmi_official()

get_pmi_caixin

get_pmi_caixin() -> pd.DataFrame

获取财新制造业PMI数据

源代码位于: src/fund_cli/core/data_manager.py
def get_pmi_caixin(self) -> pd.DataFrame:
    """获取财新制造业PMI数据"""
    return self._adapter.get_pmi_caixin()

get_services_pmi

get_services_pmi() -> pd.DataFrame

获取财新服务业PMI数据

源代码位于: src/fund_cli/core/data_manager.py
def get_services_pmi(self) -> pd.DataFrame:
    """获取财新服务业PMI数据"""
    return self._adapter.get_services_pmi()

get_non_manufacturing_pmi

get_non_manufacturing_pmi() -> pd.DataFrame

获取官方非制造业PMI数据

源代码位于: src/fund_cli/core/data_manager.py
def get_non_manufacturing_pmi(self) -> pd.DataFrame:
    """获取官方非制造业PMI数据"""
    return self._adapter.get_non_manufacturing_pmi()

get_m2_yearly

get_m2_yearly() -> pd.DataFrame

获取M2货币供应年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_m2_yearly(self) -> pd.DataFrame:
    """获取M2货币供应年率数据"""
    return self._adapter.get_m2_yearly()

get_new_loan

get_new_loan() -> pd.DataFrame

获取新增人民币贷款数据

源代码位于: src/fund_cli/core/data_manager.py
def get_new_loan(self) -> pd.DataFrame:
    """获取新增人民币贷款数据"""
    return self._adapter.get_new_loan()

get_retail_sales_yearly

get_retail_sales_yearly() -> pd.DataFrame

获取社会消费品零售总额年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_retail_sales_yearly(self) -> pd.DataFrame:
    """获取社会消费品零售总额年率数据"""
    return self._adapter.get_retail_sales_yearly()

get_fixed_asset_investment

get_fixed_asset_investment() -> pd.DataFrame

获取固定资产投资年率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_fixed_asset_investment(self) -> pd.DataFrame:
    """获取固定资产投资年率数据"""
    return self._adapter.get_fixed_asset_investment()

get_china_interest_rate

get_china_interest_rate() -> pd.DataFrame

获取中国央行利率决议数据

源代码位于: src/fund_cli/core/data_manager.py
def get_china_interest_rate(self) -> pd.DataFrame:
    """获取中国央行利率决议数据"""
    return self._adapter.get_china_interest_rate()

get_usa_interest_rate

get_usa_interest_rate() -> pd.DataFrame

获取美联储利率决议数据

源代码位于: src/fund_cli/core/data_manager.py
def get_usa_interest_rate(self) -> pd.DataFrame:
    """获取美联储利率决议数据"""
    return self._adapter.get_usa_interest_rate()

get_euro_interest_rate

get_euro_interest_rate() -> pd.DataFrame

获取欧洲央行利率决议数据

源代码位于: src/fund_cli/core/data_manager.py
def get_euro_interest_rate(self) -> pd.DataFrame:
    """获取欧洲央行利率决议数据"""
    return self._adapter.get_euro_interest_rate()

get_japan_interest_rate

get_japan_interest_rate() -> pd.DataFrame

获取日本央行利率决议数据

源代码位于: src/fund_cli/core/data_manager.py
def get_japan_interest_rate(self) -> pd.DataFrame:
    """获取日本央行利率决议数据"""
    return self._adapter.get_japan_interest_rate()

get_uk_interest_rate

get_uk_interest_rate() -> pd.DataFrame

获取英国央行利率决议数据

源代码位于: src/fund_cli/core/data_manager.py
def get_uk_interest_rate(self) -> pd.DataFrame:
    """获取英国央行利率决议数据"""
    return self._adapter.get_uk_interest_rate()

get_shibor

get_shibor() -> pd.DataFrame

获取SHIBOR利率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_shibor(self) -> pd.DataFrame:
    """获取SHIBOR利率数据"""
    return self._adapter.get_shibor()

get_shibor_lpr

get_shibor_lpr() -> pd.DataFrame

获取SHIBOR-LPR数据

源代码位于: src/fund_cli/core/data_manager.py
def get_shibor_lpr(self) -> pd.DataFrame:
    """获取SHIBOR-LPR数据"""
    return self._adapter.get_shibor_lpr()

get_hibor

get_hibor() -> pd.DataFrame

获取HIBOR利率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_hibor(self) -> pd.DataFrame:
    """获取HIBOR利率数据"""
    return self._adapter.get_hibor()

get_industry_boards

get_industry_boards() -> pd.DataFrame

获取行业板块名称列表

源代码位于: src/fund_cli/core/data_manager.py
def get_industry_boards(self) -> pd.DataFrame:
    """获取行业板块名称列表"""
    return self._adapter.get_industry_boards()

get_industry_board_hist

get_industry_board_hist(
    code: str,
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame

获取行业板块历史行情

源代码位于: src/fund_cli/core/data_manager.py
def get_industry_board_hist(
    self,
    code: str,
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame:
    """获取行业板块历史行情"""
    return self._adapter.get_industry_board_hist(code, period, start_date, end_date)

get_concept_boards

get_concept_boards() -> pd.DataFrame

获取概念板块名称列表

源代码位于: src/fund_cli/core/data_manager.py
def get_concept_boards(self) -> pd.DataFrame:
    """获取概念板块名称列表"""
    return self._adapter.get_concept_boards()

get_concept_board_hist

get_concept_board_hist(
    code: str,
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame

获取概念板块历史行情

源代码位于: src/fund_cli/core/data_manager.py
def get_concept_board_hist(
    self,
    code: str,
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame:
    """获取概念板块历史行情"""
    return self._adapter.get_concept_board_hist(code, period, start_date, end_date)

get_sector_fund_flow

get_sector_fund_flow(period: str = '今日') -> pd.DataFrame

获取板块资金流向

源代码位于: src/fund_cli/core/data_manager.py
def get_sector_fund_flow(self, period: str = "今日") -> pd.DataFrame:
    """获取板块资金流向"""
    return self._adapter.get_sector_fund_flow(period)

get_china_us_bond_yield

get_china_us_bond_yield() -> pd.DataFrame

获取中美国债收益率数据

源代码位于: src/fund_cli/core/data_manager.py
def get_china_us_bond_yield(self) -> pd.DataFrame:
    """获取中美国债收益率数据"""
    return self._adapter.get_china_us_bond_yield()

get_bond_yield_curve

get_bond_yield_curve(
    bond_type: str = "国债",
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame

获取收盘收益率曲线历史数据

源代码位于: src/fund_cli/core/data_manager.py
def get_bond_yield_curve(
    self,
    bond_type: str = "国债",
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame:
    """获取收盘收益率曲线历史数据"""
    return self._adapter.get_bond_yield_curve(bond_type, period, start_date, end_date)

get_bond_spot_quote

get_bond_spot_quote() -> pd.DataFrame

获取现券市场做市报价

源代码位于: src/fund_cli/core/data_manager.py
def get_bond_spot_quote(self) -> pd.DataFrame:
    """获取现券市场做市报价"""
    return self._adapter.get_bond_spot_quote()

get_convertible_bonds

get_convertible_bonds() -> pd.DataFrame

获取可转债数据一览表

源代码位于: src/fund_cli/core/data_manager.py
def get_convertible_bonds(self) -> pd.DataFrame:
    """获取可转债数据一览表"""
    return self._adapter.get_convertible_bonds()

get_convertible_bond_detail

get_convertible_bond_detail(code: str) -> pd.DataFrame

获取可转债详情数据

源代码位于: src/fund_cli/core/data_manager.py
def get_convertible_bond_detail(self, code: str) -> pd.DataFrame:
    """获取可转债详情数据"""
    return self._adapter.get_convertible_bond_detail(code)

get_bond_spot

get_bond_spot(code: str) -> pd.DataFrame

获取沪深债券实时行情

源代码位于: src/fund_cli/core/data_manager.py
def get_bond_spot(self, code: str) -> pd.DataFrame:
    """获取沪深债券实时行情"""
    return self._adapter.get_bond_spot(code)

get_bond_hist

get_bond_hist(
    code: str,
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame

获取沪深债券历史行情

源代码位于: src/fund_cli/core/data_manager.py
def get_bond_hist(
    self,
    code: str,
    period: str = "daily",
    start_date: str | None = None,
    end_date: str | None = None,
) -> pd.DataFrame:
    """获取沪深债券历史行情"""
    return self._adapter.get_bond_hist(code, period, start_date, end_date)

get_a_share_valuation

get_a_share_valuation() -> pd.DataFrame

获取A股等权重与中位数市盈率/市净率

源代码位于: src/fund_cli/core/data_manager.py
def get_a_share_valuation(self) -> pd.DataFrame:
    """获取A股等权重与中位数市盈率/市净率"""
    return self._adapter.get_a_share_valuation()

get_stock_valuation_lg

get_stock_valuation_lg(code: str) -> pd.DataFrame

获取个股估值数据(乐咕乐股)

源代码位于: src/fund_cli/core/data_manager.py
def get_stock_valuation_lg(self, code: str) -> pd.DataFrame:
    """获取个股估值数据(乐咕乐股)"""
    return self._adapter.get_stock_valuation_lg(code)

get_index_valuation

get_index_valuation(
    code: str, indicator: str = "pe"
) -> pd.DataFrame

获取指数估值历史数据(乐咕乐股)

源代码位于: src/fund_cli/core/data_manager.py
def get_index_valuation(
    self, code: str, indicator: str = "pe"
) -> pd.DataFrame:
    """获取指数估值历史数据(乐咕乐股)"""
    return self._adapter.get_index_valuation(code, indicator)

get_market_pe_lg

get_market_pe_lg(code: str) -> pd.DataFrame

获取指数市盈率数据(乐咕乐股)

源代码位于: src/fund_cli/core/data_manager.py
def get_market_pe_lg(self, code: str) -> pd.DataFrame:
    """获取指数市盈率数据(乐咕乐股)"""
    return self._adapter.get_market_pe_lg(code)

get_market_pb_lg

get_market_pb_lg(code: str) -> pd.DataFrame

获取指数市净率数据(乐咕乐股)

源代码位于: src/fund_cli/core/data_manager.py
def get_market_pb_lg(self, code: str) -> pd.DataFrame:
    """获取指数市净率数据(乐咕乐股)"""
    return self._adapter.get_market_pb_lg(code)

get_market_fund_flow

get_market_fund_flow() -> pd.DataFrame

获取大盘资金流向数据

源代码位于: src/fund_cli/core/data_manager.py
def get_market_fund_flow(self) -> pd.DataFrame:
    """获取大盘资金流向数据"""
    return self._adapter.get_market_fund_flow()

get_stock_fund_flow

get_stock_fund_flow(
    code: str, market: str = "sh"
) -> pd.DataFrame

获取个股资金流向数据

源代码位于: src/fund_cli/core/data_manager.py
def get_stock_fund_flow(self, code: str, market: str = "sh") -> pd.DataFrame:
    """获取个股资金流向数据"""
    return self._adapter.get_stock_fund_flow(code, market)

get_north_fund_flow

get_north_fund_flow(market: str = '北向资金') -> pd.DataFrame

获取北向资金流向数据

源代码位于: src/fund_cli/core/data_manager.py
def get_north_fund_flow(self, market: str = "北向资金") -> pd.DataFrame:
    """获取北向资金流向数据"""
    return self._adapter.get_north_fund_flow(market)

clear_cache

clear_cache() -> None

清空缓存

源代码位于: src/fund_cli/core/data_manager.py
def clear_cache(self) -> None:
    """清空缓存"""
    self._cache.clear()

get_cache_stats

get_cache_stats() -> dict

获取缓存统计

源代码位于: src/fund_cli/core/data_manager.py
def get_cache_stats(self) -> dict:
    """获取缓存统计"""
    return self._cache.get_stats()

DataSourceGateway

数据源网关.

功能: - 管理多个数据源适配器 - 支持数据源优先级配置 - 实现降级切换逻辑 - 熔断与重试机制 - 统一的错误处理

源代码位于: src/fund_cli/core/data_gateway.py
class DataSourceGateway:
    """
    数据源网关.

    功能:
    - 管理多个数据源适配器
    - 支持数据源优先级配置
    - 实现降级切换逻辑
    - 熔断与重试机制
    - 统一的错误处理
    """

    def __init__(self):
        """初始化数据源网关."""
        self._adapters: dict[str, DataSourceAdapter] = {}
        self._priority: list[str] = []
        self._circuit_states: dict[str, CircuitState] = {}
        self._failure_counts: dict[str, int] = {}
        self._last_failure_time: dict[str, datetime] = {}
        self._success_counts: dict[str, int] = {}

        # 熔断配置
        self._failure_threshold = 5       # 连续失败阈值
        self._recovery_timeout = 60       # 熔断恢复时间(秒)
        self._half_open_max_calls = 3     # 半开状态最大调用次数

        # 请求缓存配置
        self._call_cache: dict[str, tuple[datetime, Any]] = {}
        self._cache_ttl = 300  # 5分钟缓存

        self._load_config()

    def _load_config(self) -> None:
        """加载配置."""
        config = get_config()
        self._priority = config.data.source_priority_list

    def register_adapter(self, name: str, adapter: DataSourceAdapter) -> None:
        """
        注册数据源适配器.

        Args:
            name: 适配器名称
            adapter: 适配器实例
        """
        self._adapters[name] = adapter
        self._circuit_states[name] = CircuitState.CLOSED
        self._failure_counts[name] = 0
        self._success_counts[name] = 0

    def get_adapter(self, name: str) -> DataSourceAdapter | None:
        """
        获取指定适配器.

        Args:
            name: 适配器名称

        Returns:
            适配器实例或None
        """
        return self._adapters.get(name)

    def get_available_adapters(self) -> list[str]:
        """
        获取可用的适配器列表.

        Returns:
            按优先级排序的可用适配器名称列表
        """
        available = []
        for name in self._priority:
            if name in self._adapters:
                adapter = self._adapters[name]
                if adapter.is_available() and self._circuit_states.get(name) != CircuitState.OPEN:
                    available.append(name)
        return available

    def _update_circuit_state(self, name: str, success: bool) -> None:
        """更新熔断器状态."""
        state = self._circuit_states.get(name, CircuitState.CLOSED)

        if state == CircuitState.CLOSED:
            if success:
                self._failure_counts[name] = 0
                self._success_counts[name] += 1
            else:
                self._failure_counts[name] += 1
                self._last_failure_time[name] = datetime.now()

                if self._failure_counts[name] >= self._failure_threshold:
                    self._circuit_states[name] = CircuitState.OPEN
                    print(f"[DataSourceGateway] {name} 熔断器打开")

        elif state == CircuitState.OPEN:
            # 检查是否到达恢复时间
            last_failure = self._last_failure_time.get(name)
            if last_failure and datetime.now() - last_failure > timedelta(seconds=self._recovery_timeout):
                self._circuit_states[name] = CircuitState.HALF_OPEN
                self._success_counts[name] = 0
                print(f"[DataSourceGateway] {name} 熔断器半开")

        elif state == CircuitState.HALF_OPEN:
            if success:
                self._success_counts[name] += 1
                if self._success_counts[name] >= self._half_open_max_calls:
                    self._circuit_states[name] = CircuitState.CLOSED
                    self._failure_counts[name] = 0
                    print(f"[DataSourceGateway] {name} 熔断器关闭")
            else:
                self._circuit_states[name] = CircuitState.OPEN
                self._failure_counts[name] += 1
                self._last_failure_time[name] = datetime.now()
                print(f"[DataSourceGateway] {name} 熔断器重新打开")

    def _call_with_retry(
        self,
        adapter_name: str,
        method: Callable[..., T],
        max_retries: int = 3,
        *args: Any,
        **kwargs: Any
    ) -> T:
        """
        带重试的调用.

        Args:
            adapter_name: 适配器名称
            method: 要调用的方法
            max_retries: 最大重试次数
            *args, **kwargs: 方法参数

        Returns:
            方法返回值

        Raises:
            DataSourceError: 所有重试都失败
        """
        last_error = None

        for attempt in range(max_retries):
            try:
                result = method(*args, **kwargs)
                self._update_circuit_state(adapter_name, True)
                return result
            except DataNotFoundError:
                # 数据不存在不重试
                raise
            except Exception as e:
                last_error = e
                self._update_circuit_state(adapter_name, False)
                print(f"[DataSourceGateway] {adapter_name} 调用失败 (尝试 {attempt + 1}/{max_retries}): {e}")

        raise DataSourceError(f"{adapter_name} 调用失败,已重试 {max_retries} 次: {last_error}") from last_error

    def call(
        self,
        method_name: str,
        *args: Any,
        fallback: bool = True,
        **kwargs: Any
    ) -> Any:
        """
        调用数据源方法.

        Args:
            method_name: 方法名称
            args: 方法位置参数
            kwargs: 方法关键字参数
            fallback: 是否启用降级切换

        Returns:
            方法返回值

        Raises:
            DataSourceError: 所有数据源都失败
        """
        available = self.get_available_adapters()

        if not available:
            raise DataSourceError("没有可用的数据源")

        last_error = None

        for adapter_name in available:
            adapter = self._adapters[adapter_name]
            method = getattr(adapter, method_name, None)

            if method is None:
                continue

            try:
                return self._call_with_retry(adapter_name, method, 3, *args, **kwargs)
            except DataNotFoundError:
                # 数据不存在,尝试下一个数据源
                continue
            except Exception as e:
                last_error = e
                if not fallback:
                    raise
                # 继续尝试下一个数据源
                continue

        if last_error:
            raise DataSourceError(f"所有数据源都失败: {last_error}") from last_error
        raise DataSourceError(f"方法 {method_name} 在所有数据源都不可用")

    def get_fund_info(self, fund_code: str) -> dict[str, Any]:
        """获取基金信息."""
        return self.call("get_fund_info", fund_code)

    def get_fund_nav(self, fund_code: str, start_date=None, end_date=None) -> Any:
        """获取基金净值."""
        return self.call("get_fund_nav", fund_code, start_date, end_date)

    def get_fund_manager(self, fund_code: str) -> Any:
        """获取基金经理."""
        return self.call("get_fund_manager", fund_code)

    def get_fund_holdings(self, fund_code: str, date=None) -> Any:
        """获取基金持仓."""
        return self.call("get_fund_holdings", fund_code, date)

    def get_fund_asset_allocation(self, fund_code: str) -> dict[str, Any]:
        """获取基金资产配置."""
        return self.call("get_fund_asset_allocation", fund_code)

    def get_fund_benchmark(self, fund_code: str) -> dict[str, Any]:
        """获取基金业绩基准."""
        return self.call("get_fund_benchmark", fund_code)

    def get_etf_spot(self) -> Any:
        """获取ETF实时行情."""
        return self.call("get_etf_spot")

    def get_lof_spot(self) -> Any:
        """获取LOF实时行情."""
        return self.call("get_lof_spot")

    def get_fund_purchase_status(self) -> Any:
        """获取基金申购状态."""
        return self.call("get_fund_purchase_status")

    def get_all_fund_names(self) -> Any:
        """获取所有基金名称."""
        return self.call("get_all_fund_names")

    def get_fund_daily_nav(self) -> Any:
        """获取基金每日净值."""
        return self.call("get_fund_daily_nav")

    def _get_cache_key(self, method_name: str, args: tuple, kwargs: dict) -> str:
        """生成缓存键."""
        kwargs_str = str(sorted(kwargs.items())) if kwargs else ""
        return f"{method_name}:{hash(str(args))}:{hash(kwargs_str)}"

    def _get_from_cache(self, key: str) -> Any | None:
        """从缓存获取."""
        if key in self._call_cache:
            timestamp, value = self._call_cache[key]
            if datetime.now() - timestamp < timedelta(seconds=self._cache_ttl):
                return value
            del self._call_cache[key]
        return None

    def _set_cache(self, key: str, value: Any) -> None:
        """设置缓存."""
        self._call_cache[key] = (datetime.now(), value)

    def clear_cache(self) -> None:
        """清空请求缓存."""
        self._call_cache.clear()

    def get_status(self) -> dict[str, Any]:
        """
        获取网关状态.

        Returns:
            状态信息字典
        """
        return {
            "adapters": {
                name: {
                    "available": adapter.is_available(),
                    "circuit_state": self._circuit_states.get(name, CircuitState.CLOSED).value,
                    "failure_count": self._failure_counts.get(name, 0),
                    "success_count": self._success_counts.get(name, 0),
                }
                for name, adapter in self._adapters.items()
            },
            "priority": self._priority,
            "available_adapters": self.get_available_adapters(),
            "cache_size": len(self._call_cache),
        }

__init__

__init__()

初始化数据源网关.

源代码位于: src/fund_cli/core/data_gateway.py
def __init__(self):
    """初始化数据源网关."""
    self._adapters: dict[str, DataSourceAdapter] = {}
    self._priority: list[str] = []
    self._circuit_states: dict[str, CircuitState] = {}
    self._failure_counts: dict[str, int] = {}
    self._last_failure_time: dict[str, datetime] = {}
    self._success_counts: dict[str, int] = {}

    # 熔断配置
    self._failure_threshold = 5       # 连续失败阈值
    self._recovery_timeout = 60       # 熔断恢复时间(秒)
    self._half_open_max_calls = 3     # 半开状态最大调用次数

    # 请求缓存配置
    self._call_cache: dict[str, tuple[datetime, Any]] = {}
    self._cache_ttl = 300  # 5分钟缓存

    self._load_config()

register_adapter

register_adapter(
    name: str, adapter: DataSourceAdapter
) -> None

注册数据源适配器.

参数:

名称 类型 描述 默认
name str

适配器名称

必需
adapter DataSourceAdapter

适配器实例

必需
源代码位于: src/fund_cli/core/data_gateway.py
def register_adapter(self, name: str, adapter: DataSourceAdapter) -> None:
    """
    注册数据源适配器.

    Args:
        name: 适配器名称
        adapter: 适配器实例
    """
    self._adapters[name] = adapter
    self._circuit_states[name] = CircuitState.CLOSED
    self._failure_counts[name] = 0
    self._success_counts[name] = 0

get_adapter

get_adapter(name: str) -> DataSourceAdapter | None

获取指定适配器.

参数:

名称 类型 描述 默认
name str

适配器名称

必需

返回:

类型 描述
DataSourceAdapter | None

适配器实例或None

源代码位于: src/fund_cli/core/data_gateway.py
def get_adapter(self, name: str) -> DataSourceAdapter | None:
    """
    获取指定适配器.

    Args:
        name: 适配器名称

    Returns:
        适配器实例或None
    """
    return self._adapters.get(name)

get_available_adapters

get_available_adapters() -> list[str]

获取可用的适配器列表.

返回:

类型 描述
list[str]

按优先级排序的可用适配器名称列表

源代码位于: src/fund_cli/core/data_gateway.py
def get_available_adapters(self) -> list[str]:
    """
    获取可用的适配器列表.

    Returns:
        按优先级排序的可用适配器名称列表
    """
    available = []
    for name in self._priority:
        if name in self._adapters:
            adapter = self._adapters[name]
            if adapter.is_available() and self._circuit_states.get(name) != CircuitState.OPEN:
                available.append(name)
    return available

call

call(
    method_name: str,
    *args: Any,
    fallback: bool = True,
    **kwargs: Any,
) -> Any

调用数据源方法.

参数:

名称 类型 描述 默认
method_name str

方法名称

必需
args Any

方法位置参数

()
kwargs Any

方法关键字参数

{}
fallback bool

是否启用降级切换

True

返回:

类型 描述
Any

方法返回值

引发:

类型 描述
DataSourceError

所有数据源都失败

源代码位于: src/fund_cli/core/data_gateway.py
def call(
    self,
    method_name: str,
    *args: Any,
    fallback: bool = True,
    **kwargs: Any
) -> Any:
    """
    调用数据源方法.

    Args:
        method_name: 方法名称
        args: 方法位置参数
        kwargs: 方法关键字参数
        fallback: 是否启用降级切换

    Returns:
        方法返回值

    Raises:
        DataSourceError: 所有数据源都失败
    """
    available = self.get_available_adapters()

    if not available:
        raise DataSourceError("没有可用的数据源")

    last_error = None

    for adapter_name in available:
        adapter = self._adapters[adapter_name]
        method = getattr(adapter, method_name, None)

        if method is None:
            continue

        try:
            return self._call_with_retry(adapter_name, method, 3, *args, **kwargs)
        except DataNotFoundError:
            # 数据不存在,尝试下一个数据源
            continue
        except Exception as e:
            last_error = e
            if not fallback:
                raise
            # 继续尝试下一个数据源
            continue

    if last_error:
        raise DataSourceError(f"所有数据源都失败: {last_error}") from last_error
    raise DataSourceError(f"方法 {method_name} 在所有数据源都不可用")

get_fund_info

get_fund_info(fund_code: str) -> dict[str, Any]

获取基金信息.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_info(self, fund_code: str) -> dict[str, Any]:
    """获取基金信息."""
    return self.call("get_fund_info", fund_code)

get_fund_nav

get_fund_nav(
    fund_code: str, start_date=None, end_date=None
) -> Any

获取基金净值.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_nav(self, fund_code: str, start_date=None, end_date=None) -> Any:
    """获取基金净值."""
    return self.call("get_fund_nav", fund_code, start_date, end_date)

get_fund_manager

get_fund_manager(fund_code: str) -> Any

获取基金经理.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_manager(self, fund_code: str) -> Any:
    """获取基金经理."""
    return self.call("get_fund_manager", fund_code)

get_fund_holdings

get_fund_holdings(fund_code: str, date=None) -> Any

获取基金持仓.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_holdings(self, fund_code: str, date=None) -> Any:
    """获取基金持仓."""
    return self.call("get_fund_holdings", fund_code, date)

get_fund_asset_allocation

get_fund_asset_allocation(fund_code: str) -> dict[str, Any]

获取基金资产配置.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_asset_allocation(self, fund_code: str) -> dict[str, Any]:
    """获取基金资产配置."""
    return self.call("get_fund_asset_allocation", fund_code)

get_fund_benchmark

get_fund_benchmark(fund_code: str) -> dict[str, Any]

获取基金业绩基准.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_benchmark(self, fund_code: str) -> dict[str, Any]:
    """获取基金业绩基准."""
    return self.call("get_fund_benchmark", fund_code)

get_etf_spot

get_etf_spot() -> Any

获取ETF实时行情.

源代码位于: src/fund_cli/core/data_gateway.py
def get_etf_spot(self) -> Any:
    """获取ETF实时行情."""
    return self.call("get_etf_spot")

get_lof_spot

get_lof_spot() -> Any

获取LOF实时行情.

源代码位于: src/fund_cli/core/data_gateway.py
def get_lof_spot(self) -> Any:
    """获取LOF实时行情."""
    return self.call("get_lof_spot")

get_fund_purchase_status

get_fund_purchase_status() -> Any

获取基金申购状态.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_purchase_status(self) -> Any:
    """获取基金申购状态."""
    return self.call("get_fund_purchase_status")

get_all_fund_names

get_all_fund_names() -> Any

获取所有基金名称.

源代码位于: src/fund_cli/core/data_gateway.py
def get_all_fund_names(self) -> Any:
    """获取所有基金名称."""
    return self.call("get_all_fund_names")

get_fund_daily_nav

get_fund_daily_nav() -> Any

获取基金每日净值.

源代码位于: src/fund_cli/core/data_gateway.py
def get_fund_daily_nav(self) -> Any:
    """获取基金每日净值."""
    return self.call("get_fund_daily_nav")

clear_cache

clear_cache() -> None

清空请求缓存.

源代码位于: src/fund_cli/core/data_gateway.py
def clear_cache(self) -> None:
    """清空请求缓存."""
    self._call_cache.clear()

get_status

get_status() -> dict[str, Any]

获取网关状态.

返回:

类型 描述
dict[str, Any]

状态信息字典

源代码位于: src/fund_cli/core/data_gateway.py
def get_status(self) -> dict[str, Any]:
    """
    获取网关状态.

    Returns:
        状态信息字典
    """
    return {
        "adapters": {
            name: {
                "available": adapter.is_available(),
                "circuit_state": self._circuit_states.get(name, CircuitState.CLOSED).value,
                "failure_count": self._failure_counts.get(name, 0),
                "success_count": self._success_counts.get(name, 0),
            }
            for name, adapter in self._adapters.items()
        },
        "priority": self._priority,
        "available_adapters": self.get_available_adapters(),
        "cache_size": len(self._call_cache),
    }

AIAnalyzer

AI分析器.

源代码位于: src/fund_cli/core/ai_analyzer.py
class AIAnalyzer:
    """AI分析器."""

    def __init__(self, backend: AIBackend = AIBackend.RULE_BASED, **kwargs):
        self._backend = self._create_backend(backend, **kwargs)
        self._analysis_cache: dict[str, Any] = {}

    def _get_cache_key(self, fund_code: str, metrics: dict[str, Any]) -> str:
        """生成分析缓存键.

        基于基金代码和部分关键指标生成缓存键
        """
        # 选取关键指标进行缓存
        key_metrics = {k: metrics.get(k) for k in ['total_return', 'sharpe_ratio', 'max_drawdown'] if k in metrics}
        return f"{fund_code}:{hash(str(sorted(key_metrics.items())))}"

    def _create_backend(self, backend: AIBackend, **kwargs) -> AIBackendInterface:
        if backend == AIBackend.RULE_BASED:
            return RuleBasedBackend()
        elif backend == AIBackend.OPENAI:
            return OpenAIBackend(**kwargs)
        else:
            raise ValueError(f"不支持的AI后端: {backend}")

    def analyze_fund(
        self,
        fund_code: str,
        fund_name: str,
        metrics: dict[str, Any],
        holdings: list[dict[str, Any]] | None = None,
        asset_allocation: dict[str, float] | None = None,
        use_cache: bool = True,
    ) -> AnalysisResult:
        """分析单只基金.

        Args:
            fund_code: 基金代码
            fund_name: 基金名称
            metrics: 基金指标
            holdings: 持仓数据
            asset_allocation: 资产配置
            use_cache: 是否使用缓存
        """
        # 检查缓存
        if use_cache:
            cache_key = self._get_cache_key(fund_code, metrics)
            if cache_key in self._analysis_cache:
                return self._analysis_cache[cache_key]

        context = self._build_fund_context(fund_code, fund_name, metrics, holdings, asset_allocation)

        summary = self._backend.analyze(context, "请生成该基金的总体摘要")
        risk = self._backend.analyze(context, "请分析该基金的风险")
        advice = self._backend.analyze(context, "请给出投资建议")
        performance = self._backend.analyze(context, "请评价该基金的业绩表现")

        highlights = self._extract_highlights(metrics)
        concerns = self._extract_concerns(metrics)

        result = AnalysisResult(
            summary=summary,
            risk_warning=risk,
            investment_advice=advice,
            performance_comment=performance,
            highlights=highlights,
            concerns=concerns,
            confidence=0.7 if isinstance(self._backend, RuleBasedBackend) else 0.9,
            data_source=fund_code,
            analysis_date=date.today().strftime("%Y-%m-%d"),
        )

        # 存入缓存
        if use_cache:
            cache_key = self._get_cache_key(fund_code, metrics)
            self._analysis_cache[cache_key] = result

        return result

    def clear_cache(self) -> None:
        """清空分析缓存."""
        self._analysis_cache.clear()

    def analyze_portfolio(
        self,
        funds: list[dict[str, Any]],
        portfolio_metrics: dict[str, Any],
    ) -> AnalysisResult:
        """分析投资组合."""
        context = f"组合包含{len(funds)}只基金,组合指标:{portfolio_metrics}"

        summary = self._backend.analyze(context, "请生成该投资组合的总体摘要")
        risk = self._backend.analyze(context, "请分析该投资组合的风险")

        return AnalysisResult(
            summary=summary,
            risk_warning=risk,
            confidence=0.7 if isinstance(self._backend, RuleBasedBackend) else 0.9,
            analysis_date=date.today().strftime("%Y-%m-%d"),
        )

    def _build_fund_context(self, fund_code, fund_name, metrics, holdings, asset_allocation) -> str:
        """构建分析上下文."""
        parts = [f"基金{fund_name}({fund_code})"]
        parts.append(f"核心指标: {metrics}")
        if holdings:
            parts.append(f"前十大重仓: {holdings[:5]}")
        if asset_allocation:
            parts.append(f"资产配置: {asset_allocation}")
        return "\n".join(parts)

    def _extract_highlights(self, metrics: dict) -> list[str]:
        """提取亮点."""
        highlights = []
        total_return = metrics.get("total_return", 0)
        sharpe = metrics.get("sharpe_ratio", 0)
        max_dd = metrics.get("max_drawdown", 0)

        if total_return and total_return > 0.1:
            highlights.append(f"总收益率 {total_return:.2%},表现优异")
        if sharpe and sharpe > 1.5:
            highlights.append(f"夏普比率 {sharpe:.2f},风险调整收益优秀")
        if max_dd and max_dd > -0.1:
            highlights.append(f"最大回撤仅 {max_dd:.2%},风控良好")
        return highlights or ["基金运作平稳"]

    def _extract_concerns(self, metrics: dict) -> list[str]:
        """提取风险点."""
        concerns = []
        max_dd = metrics.get("max_drawdown", 0)
        vol = metrics.get("volatility", 0)
        sharpe = metrics.get("sharpe_ratio", 0)

        if max_dd and max_dd < -0.2:
            concerns.append(f"最大回撤 {max_dd:.2%},回撤较大")
        if vol and vol > 0.25:
            concerns.append(f"波动率 {vol:.2%},波动较高")
        if sharpe is not None and sharpe < 0.5:
            concerns.append(f"夏普比率 {sharpe:.2f},风险收益比不佳")
        return concerns or ["暂无显著风险点"]

analyze_fund

analyze_fund(
    fund_code: str,
    fund_name: str,
    metrics: dict[str, Any],
    holdings: list[dict[str, Any]] | None = None,
    asset_allocation: dict[str, float] | None = None,
    use_cache: bool = True,
) -> AnalysisResult

分析单只基金.

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
fund_name str

基金名称

必需
metrics dict[str, Any]

基金指标

必需
holdings list[dict[str, Any]] | None

持仓数据

None
asset_allocation dict[str, float] | None

资产配置

None
use_cache bool

是否使用缓存

True
源代码位于: src/fund_cli/core/ai_analyzer.py
def analyze_fund(
    self,
    fund_code: str,
    fund_name: str,
    metrics: dict[str, Any],
    holdings: list[dict[str, Any]] | None = None,
    asset_allocation: dict[str, float] | None = None,
    use_cache: bool = True,
) -> AnalysisResult:
    """分析单只基金.

    Args:
        fund_code: 基金代码
        fund_name: 基金名称
        metrics: 基金指标
        holdings: 持仓数据
        asset_allocation: 资产配置
        use_cache: 是否使用缓存
    """
    # 检查缓存
    if use_cache:
        cache_key = self._get_cache_key(fund_code, metrics)
        if cache_key in self._analysis_cache:
            return self._analysis_cache[cache_key]

    context = self._build_fund_context(fund_code, fund_name, metrics, holdings, asset_allocation)

    summary = self._backend.analyze(context, "请生成该基金的总体摘要")
    risk = self._backend.analyze(context, "请分析该基金的风险")
    advice = self._backend.analyze(context, "请给出投资建议")
    performance = self._backend.analyze(context, "请评价该基金的业绩表现")

    highlights = self._extract_highlights(metrics)
    concerns = self._extract_concerns(metrics)

    result = AnalysisResult(
        summary=summary,
        risk_warning=risk,
        investment_advice=advice,
        performance_comment=performance,
        highlights=highlights,
        concerns=concerns,
        confidence=0.7 if isinstance(self._backend, RuleBasedBackend) else 0.9,
        data_source=fund_code,
        analysis_date=date.today().strftime("%Y-%m-%d"),
    )

    # 存入缓存
    if use_cache:
        cache_key = self._get_cache_key(fund_code, metrics)
        self._analysis_cache[cache_key] = result

    return result

clear_cache

clear_cache() -> None

清空分析缓存.

源代码位于: src/fund_cli/core/ai_analyzer.py
def clear_cache(self) -> None:
    """清空分析缓存."""
    self._analysis_cache.clear()

analyze_portfolio

analyze_portfolio(
    funds: list[dict[str, Any]],
    portfolio_metrics: dict[str, Any],
) -> AnalysisResult

分析投资组合.

源代码位于: src/fund_cli/core/ai_analyzer.py
def analyze_portfolio(
    self,
    funds: list[dict[str, Any]],
    portfolio_metrics: dict[str, Any],
) -> AnalysisResult:
    """分析投资组合."""
    context = f"组合包含{len(funds)}只基金,组合指标:{portfolio_metrics}"

    summary = self._backend.analyze(context, "请生成该投资组合的总体摘要")
    risk = self._backend.analyze(context, "请分析该投资组合的风险")

    return AnalysisResult(
        summary=summary,
        risk_warning=risk,
        confidence=0.7 if isinstance(self._backend, RuleBasedBackend) else 0.9,
        analysis_date=date.today().strftime("%Y-%m-%d"),
    )

AnalysisResult

分析结果.

源代码位于: src/fund_cli/core/ai_analyzer.py
@dataclass
class AnalysisResult:
    """分析结果."""
    summary: str = ""                          # 总体摘要
    risk_warning: str = ""                     # 风险提示
    investment_advice: str = ""                # 投资建议
    performance_comment: str = ""              # 业绩评价
    highlights: list[str] = field(default_factory=list)  # 亮点
    concerns: list[str] = field(default_factory=list)    # 风险点
    confidence: float = 0.0                    # 置信度 0-1
    data_source: str = ""                      # 数据来源
    analysis_date: str = ""                    # 分析日期

TemplateEngine

报告模板引擎.

源代码位于: src/fund_cli/core/template_engine.py
class TemplateEngine:
    """报告模板引擎."""

    def __init__(self, template_dirs: list[str] | None = None):
        if template_dirs is None:
            # 默认模板目录
            base_dir = Path(__file__).parent.parent / "templates"
            template_dirs = [str(base_dir)]
        self._env = Environment(
            loader=FileSystemLoader(template_dirs),
            autoescape=select_autoescape(['html', 'xml']),
            cache_size=100,  # 启用模板缓存
        )
        self._register_filters()
        self._register_globals()

    def _register_filters(self):
        """注册自定义过滤器."""
        def percentage(value, decimals=2):
            if value is None:
                return "N/A"
            return f"{float(value) * 100:.{decimals}f}%"

        def format_number(value, decimals=4):
            if value is None:
                return "N/A"
            return f"{float(value):.{decimals}f}"

        def color_class(value):
            if value is None:
                return ""
            return "positive" if float(value) > 0 else "negative" if float(value) < 0 else ""

        self._env.filters['percentage'] = percentage
        self._env.filters['format_number'] = format_number
        self._env.filters['color_class'] = color_class

    def _register_globals(self):
        """注册全局函数."""
        from datetime import date

        self._env.globals['today'] = date.today()
        self._env.globals['now'] = lambda: date.today().strftime('%Y-%m-%d')

    def render(self, template_name: str, **context) -> str:
        """渲染模板."""
        template = self._env.get_template(template_name)
        return template.render(**context)

    def render_string(self, template_string: str, **context) -> str:
        """从字符串渲染模板."""
        template = self._env.from_string(template_string)
        return template.render(**context)

    def get_template(self, template_name: str):
        """获取模板对象."""
        return self._env.get_template(template_name)

    def list_templates(self, directory: str = "") -> list[str]:
        """列出可用模板."""
        return self._env.list_templates(
            filter_func=lambda x: x.startswith(directory) if directory else True
        )

render

render(template_name: str, **context) -> str

渲染模板.

源代码位于: src/fund_cli/core/template_engine.py
def render(self, template_name: str, **context) -> str:
    """渲染模板."""
    template = self._env.get_template(template_name)
    return template.render(**context)

render_string

render_string(template_string: str, **context) -> str

从字符串渲染模板.

源代码位于: src/fund_cli/core/template_engine.py
def render_string(self, template_string: str, **context) -> str:
    """从字符串渲染模板."""
    template = self._env.from_string(template_string)
    return template.render(**context)

get_template

get_template(template_name: str)

获取模板对象.

源代码位于: src/fund_cli/core/template_engine.py
def get_template(self, template_name: str):
    """获取模板对象."""
    return self._env.get_template(template_name)

list_templates

list_templates(directory: str = '') -> list[str]

列出可用模板.

源代码位于: src/fund_cli/core/template_engine.py
def list_templates(self, directory: str = "") -> list[str]:
    """列出可用模板."""
    return self._env.list_templates(
        filter_func=lambda x: x.startswith(directory) if directory else True
    )

Reporter

Bases: ABC


              flowchart TD
              fund_cli.core.reporter.Reporter[Reporter]

              

              click fund_cli.core.reporter.Reporter href "" "fund_cli.core.reporter.Reporter"
            

报告生成器基类.

所有报告生成器必须继承此类。

源代码位于: src/fund_cli/core/reporter.py
class Reporter(ABC):
    """
    报告生成器基类.

    所有报告生成器必须继承此类。
    """

    @abstractmethod
    def generate(
        self,
        fund_code: str,
        metrics: dict[str, Any],
        nav_data: pd.DataFrame | None = None,
        benchmark_data: pd.DataFrame | None = None,
        **kwargs: Any,
    ) -> str:
        """
        生成报告.

        Args:
            fund_code: 基金代码
            metrics: 分析指标字典
            nav_data: 净值数据
            benchmark_data: 基准数据
            **kwargs: 额外参数

        Returns:
            报告内容字符串
        """
        pass

    @abstractmethod
    def save(
        self,
        content: str,
        output_path: str,
    ) -> None:
        """
        保存报告到文件.

        Args:
            content: 报告内容
            output_path: 输出文件路径
        """
        pass

    @abstractmethod
    def get_formats(self) -> list:
        """
        获取支持的报告格式.

        Returns:
            格式列表(如 ['html', 'markdown', 'pdf'])
        """
        pass

    def render_to_template(self, data: dict[str, Any], template_path: str) -> str:
        """使用模板渲染报告内容(子类可覆盖以支持不同模板引擎)."""
        from jinja2 import Environment, FileSystemLoader, select_autoescape

        template_dir = str(Path(template_path).parent)
        template_name = Path(template_path).name
        env = Environment(
            loader=FileSystemLoader(template_dir),
            autoescape=select_autoescape(['html', 'xml']),
        )
        template = env.get_template(template_name)
        return template.render(**data)

    def export_pdf(self, content: str, output_path: str) -> Path:
        """导出为PDF(需要weasyprint)."""
        try:
            from weasyprint import HTML

            HTML(string=content).write_pdf(output_path)
            return Path(output_path)
        except ImportError as exc:
            raise RuntimeError("weasyprint 未安装,请运行: pip install weasyprint") from exc

    def export_docx(self, content: str, output_path: str) -> Path:
        """导出为Word(需要python-docx)."""
        raise NotImplementedError("Word导出将在阶段三实现")

    def export_pptx(self, content: str, output_path: str) -> Path:
        """导出为PPT(需要python-pptx)."""
        raise NotImplementedError("PPT导出将在阶段三实现")

    def get_supported_formats(self) -> list[str]:
        """获取支持的导出格式."""
        formats = list(self.get_formats())
        try:
            import weasyprint  # noqa: F401

            formats.append("pdf")
        except ImportError:
            # weasyprint 未安装,不支持 PDF 格式
            pass
        return formats

generate abstractmethod

generate(
    fund_code: str,
    metrics: dict[str, Any],
    nav_data: DataFrame | None = None,
    benchmark_data: DataFrame | None = None,
    **kwargs: Any,
) -> str

生成报告.

参数:

名称 类型 描述 默认
fund_code str

基金代码

必需
metrics dict[str, Any]

分析指标字典

必需
nav_data DataFrame | None

净值数据

None
benchmark_data DataFrame | None

基准数据

None
**kwargs Any

额外参数

{}

返回:

类型 描述
str

报告内容字符串

源代码位于: src/fund_cli/core/reporter.py
@abstractmethod
def generate(
    self,
    fund_code: str,
    metrics: dict[str, Any],
    nav_data: pd.DataFrame | None = None,
    benchmark_data: pd.DataFrame | None = None,
    **kwargs: Any,
) -> str:
    """
    生成报告.

    Args:
        fund_code: 基金代码
        metrics: 分析指标字典
        nav_data: 净值数据
        benchmark_data: 基准数据
        **kwargs: 额外参数

    Returns:
        报告内容字符串
    """
    pass

save abstractmethod

save(content: str, output_path: str) -> None

保存报告到文件.

参数:

名称 类型 描述 默认
content str

报告内容

必需
output_path str

输出文件路径

必需
源代码位于: src/fund_cli/core/reporter.py
@abstractmethod
def save(
    self,
    content: str,
    output_path: str,
) -> None:
    """
    保存报告到文件.

    Args:
        content: 报告内容
        output_path: 输出文件路径
    """
    pass

get_formats abstractmethod

get_formats() -> list

获取支持的报告格式.

返回:

类型 描述
list

格式列表(如 ['html', 'markdown', 'pdf'])

源代码位于: src/fund_cli/core/reporter.py
@abstractmethod
def get_formats(self) -> list:
    """
    获取支持的报告格式.

    Returns:
        格式列表(如 ['html', 'markdown', 'pdf'])
    """
    pass

render_to_template

render_to_template(
    data: dict[str, Any], template_path: str
) -> str

使用模板渲染报告内容(子类可覆盖以支持不同模板引擎).

源代码位于: src/fund_cli/core/reporter.py
def render_to_template(self, data: dict[str, Any], template_path: str) -> str:
    """使用模板渲染报告内容(子类可覆盖以支持不同模板引擎)."""
    from jinja2 import Environment, FileSystemLoader, select_autoescape

    template_dir = str(Path(template_path).parent)
    template_name = Path(template_path).name
    env = Environment(
        loader=FileSystemLoader(template_dir),
        autoescape=select_autoescape(['html', 'xml']),
    )
    template = env.get_template(template_name)
    return template.render(**data)

export_pdf

export_pdf(content: str, output_path: str) -> Path

导出为PDF(需要weasyprint).

源代码位于: src/fund_cli/core/reporter.py
def export_pdf(self, content: str, output_path: str) -> Path:
    """导出为PDF(需要weasyprint)."""
    try:
        from weasyprint import HTML

        HTML(string=content).write_pdf(output_path)
        return Path(output_path)
    except ImportError as exc:
        raise RuntimeError("weasyprint 未安装,请运行: pip install weasyprint") from exc

export_docx

export_docx(content: str, output_path: str) -> Path

导出为Word(需要python-docx).

源代码位于: src/fund_cli/core/reporter.py
def export_docx(self, content: str, output_path: str) -> Path:
    """导出为Word(需要python-docx)."""
    raise NotImplementedError("Word导出将在阶段三实现")

export_pptx

export_pptx(content: str, output_path: str) -> Path

导出为PPT(需要python-pptx).

源代码位于: src/fund_cli/core/reporter.py
def export_pptx(self, content: str, output_path: str) -> Path:
    """导出为PPT(需要python-pptx)."""
    raise NotImplementedError("PPT导出将在阶段三实现")

get_supported_formats

get_supported_formats() -> list[str]

获取支持的导出格式.

源代码位于: src/fund_cli/core/reporter.py
def get_supported_formats(self) -> list[str]:
    """获取支持的导出格式."""
    formats = list(self.get_formats())
    try:
        import weasyprint  # noqa: F401

        formats.append("pdf")
    except ImportError:
        # weasyprint 未安装,不支持 PDF 格式
        pass
    return formats