Metadata-Version: 2.4
Name: stock-storage-frame
Version: 1.0.2
Summary: A configuration-driven stock data storage framework
Author-email: Pan Huachao <tzzjchao@126.com>
License: MIT
Project-URL: Homepage, https://gitee.com/panhuachao/stock-storage-frame
Project-URL: Repository, https://gitee.com/panhuachao/stock-storage-frame.git
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Financial and Insurance Industry
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Office/Business :: Financial :: Investment
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pandas>=1.5.0
Requires-Dist: numpy>=1.24.0
Requires-Dist: sqlalchemy>=2.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pyyaml>=6.0
Requires-Dist: loguru>=0.7.0
Requires-Dist: python-dateutil>=2.8.0
Requires-Dist: typing-extensions>=4.5.0
Requires-Dist: croniter>=1.3.0
Requires-Dist: aiomysql>=0.2.0
Requires-Dist: akshare>=1.10.0
Requires-Dist: tushare>=1.2.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: isort>=5.12.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Provides-Extra: mysql
Requires-Dist: mysql-connector-python>=8.0.0; extra == "mysql"
Requires-Dist: pymysql>=1.0.0; extra == "mysql"
Requires-Dist: aiomysql>=0.2.0; extra == "mysql"
Provides-Extra: postgresql
Requires-Dist: asyncpg>=0.29.0; extra == "postgresql"
Provides-Extra: akshare
Requires-Dist: akshare>=1.10.0; extra == "akshare"
Provides-Extra: tushare
Requires-Dist: tushare>=1.2.0; extra == "tushare"
Provides-Extra: extra
Requires-Dist: chinese_calendar>=1.11.0; extra == "extra"
Requires-Dist: selenium>=4.0.0; extra == "extra"
Requires-Dist: webdriver-manager>=4.0.0; extra == "extra"
Provides-Extra: all
Requires-Dist: mysql-connector-python>=8.0.0; extra == "all"
Requires-Dist: pymysql>=1.0.0; extra == "all"
Requires-Dist: aiomysql>=0.2.0; extra == "all"
Requires-Dist: psycopg2-binary>=2.9.0; extra == "all"
Requires-Dist: akshare>=1.10.0; extra == "all"
Requires-Dist: tushare>=1.2.0; extra == "all"
Requires-Dist: chinese_calendar>=1.11.0; extra == "all"
Requires-Dist: selenium>=4.0.0; extra == "all"
Requires-Dist: webdriver-manager>=4.0.0; extra == "all"
Dynamic: license-file

# Stock Storage Frame

一个配置驱动的股票数据存储框架，专注于后端数据处理流程。

## 特性

- **配置驱动**：<b>完全通过YAML配置文件定义</b>数据处理流程
- **模块化设计**：采集器、处理器、存储器分离，易于扩展
- **灵活的数据处理**：支持自定义Python脚本进行数据转换
- **多存储支持**：支持SQLite、MySQL、PostgreSQL、CSV等多种存储后端
- **简单易用**：无需编写复杂代码，通过配置即可完成数据流程

## 架构

框架采用分层架构设计，各层职责清晰，便于扩展和维护。整体架构如下：

```mermaid
graph TB
    subgraph "配置层"
        A1[主配置文件 config.yaml]
        A2[Workflow配置文件]
        A3[环境变量配置]
    end
    
    subgraph "调度层"
        B1[定时调度器]
        B2[条件检查器]
        B3[任务执行器]
    end
    
    subgraph "采集层"
        C1[akshare采集器]
        C2[tushare采集器]
        C3[自定义脚本采集器]
    end
    
    subgraph "处理层"
        D1[数据清洗]
        D2[数据转换]
        D3[自定义脚本处理]
    end
    
    subgraph "存储层"
        E1[SQLite存储]
        E2[MySQL存储]
        E3[CSV存储]
        E4[PostgreSQL存储]
    end
    
    subgraph "数据层"
        F1[原始数据源]
        F2[处理后的数据]
        F3[持久化存储]
    end
    
    A1 --> B1
    A2 --> B1
    A3 --> B1
    B1 --> B2
    B2 --> B3
    B3 --> C1
    B3 --> C2
    B3 --> C3
    C1 --> D1
    C2 --> D1
    C3 --> D1
    D1 --> D2
    D2 --> D3
    D3 --> E1
    D3 --> E2
    D3 --> E3
    D3 --> E4
    F1 --> C1
    F1 --> C2
    F1 --> C3
    E1 --> F3
    E2 --> F3
    E3 --> F3
    E4 --> F3
    D3 --> F2
```

**架构说明**：

1. **配置层**：
   - **主配置文件** (`config.yaml`)：定义全局的采集器、处理器和存储器配置。
   - **Workflow配置文件**：定义具体的数据处理流程，包括调度计划、采集器选择、处理器配置和存储目标。
   - **环境变量配置**：通过 `.env` 文件管理敏感信息，支持 `${ENV_VAR}` 变量替换。

2. **调度层**：
   - **定时调度器**：根据 Workflow 中的 `schedule` 字段（cron表达式）自动触发任务执行。
   - **条件检查器**：执行条件脚本，验证是否满足执行条件（如是否为交易日）。
   - **任务执行器**：协调整个数据处理流程，按顺序调用采集层、处理层和存储层。

3. **采集层**：
   - **akshare采集器**：通过 akshare 库获取股票数据，支持多种数据接口。
   - **tushare采集器**：通过 tushare 库获取专业金融数据，需要 token 认证。
   - **自定义脚本采集器**：执行用户自定义的 Python 脚本，实现灵活的数据采集逻辑。

4. **处理层**：
   - **数据清洗**：对原始数据进行去重、缺失值处理、格式标准化等操作。
   - **数据转换**：计算技术指标、数据聚合、字段映射等转换操作。
   - **自定义脚本处理**：通过用户自定义的 Python 脚本实现复杂的数据处理逻辑。

5. **存储层**：
   - **SQLite存储**：轻量级本地数据库，适合单机部署和小规模数据。
   - **MySQL存储**：关系型数据库，支持多用户并发访问和大规模数据存储。
   - **CSV存储**：文件存储格式，便于数据导出和外部工具处理。
   - **PostgreSQL存储**：高级关系型数据库，支持复杂查询和地理空间数据。

6. **数据层**：
   - **原始数据源**：外部数据源，如股票交易所、金融数据 API 等。
   - **处理后的数据**：经过清洗和转换的标准化数据，准备存储。
   - **持久化存储**：最终存储位置，数据可供后续分析和应用使用。

**数据处理流程**：
1. 调度层根据配置触发任务执行。
2. 采集层从原始数据源获取数据。
3. 处理层对数据进行清洗、转换和自定义处理。
4. 存储层将处理后的数据持久化到指定存储后端。
5. 整个流程完全由配置文件驱动，无需编写复杂代码。

## 安装
### 从源码安装
```bash
## 从源码安装
git clone https://gitee.com/panhuachao/stock-storage-frame.git
cd stock-storage-frame
python3 -m venv venv
source venv/bin/activate

## 国内源
pip install -i https://mirrors.aliyun.com/pypi/simple -r requirements.txt
## 官方源
pip install -i https://pypi.org/simple/ -r requirements.txt

```
### 从PyPI安装
```bash
# 使用pip安装
pip install stock-storage-frame
# 如使用postgresql数据库，pg数据库并不默认安装，需要额外安装
pip install stock-storage-frame[postgresql]
```

## 快速开始

### 1. 创建配置文件
创建 `config.yaml`：
```yaml
app:
  name: "stock-data-pipeline"
  version: "1.0.0"
  log_level: "INFO"
  log_dir: "./logs"

# 数据采集器配置
collectors:
  akshare1:
    type: "akshare"
    # method: "stock_zh_a_hist"  # 默认方法，可在workflow中设置
    config:
      timeout: 30
      retry_times: 3
  
  tushare1:
    type: "tushare"
    config:
      token: "${TUSHARE_TOKEN}" # 环境变量，.env文件中定义，参考.env.example，下述相同
      timeout: 30

  gainiancollector:
    type: "custom"
    # script: "./scripts/collector_demo.py" #默认配置脚本，可在workflow中设置
    config:
      host: "${MYSQL_HOST}" # 环境变量，.env文件中定义，参考.env.example，下述相同
      port: "${MYSQL_PORT}" # 环境变量，.env文件中定义，参考.env.example，下述相同
      database: "${MYSQL_DB}" # 环境变量，.env文件中定义，参考.env.example，下述相同
      username: "${MYSQL_USER}" # 环境变量，.env文件中定义，参考.env.example，下述相同
      password: "${MYSQL_PASSWORD}" # 环境变量，.env文件中定义，参考.env.example，下述相同

  customcollector:
    type: "custom"
    # script: "./scripts/collector_demo.py" #默认配置脚本，可在workflow中设置
    config:
      api_url: "http://api_url"
      api_key: "http_api_key"

# 数据存储配置，根据自身需要进行配置
storages:
  sqlite1:
    type: "sqlite"
    config:
      database: "./data/stock_data.db"
  
  csv1:
    type: "csv"
    config:
      directory: "./data/csv"
  
  mysql1:
    type: "mysql"
    config:
      host: "${MYSQL_HOST}"
      port: "${MYSQL_PORT}"
      database: "${MYSQL_DB}"
      username: "${MYSQL_USER}"
      password: "${MYSQL_PASSWORD}"
  
  postgresql1:
    type: "postgresql"
    config:
      host: "${POSTGRES_HOST}"
      port: "${POSTGRES_PORT}"
      database: "${POSTGRES_DB}"
      username: "${POSTGRES_USER}"
      password: "${POSTGRES_PASSWORD}"
      schema: "${POSTGRES_SCHEMA}"
```

### 2. 创建Workflow配置

创建 `workflows/daily_stock_data.yaml`：

```yaml
# workflow: stock_index_data.yaml
name: "stock_index_data"
description: "新浪财经-当天中国股票指数数据"
schedule: "40 20 * * *"  # 每天20:40执行，命令--schedule开启定时任务

# 条件配置（可选），如判定是否是交易日，只有定时任务才判断
condition:
  script: "./scripts/condition_trade_day.py"

# 数据采集配置 - 使用不同的akshare方法
collector:
  name: "akshare1" #对应config.yaml中定义的akshare1名称，用于支持不同yaml不同数据源
  type: "akshare"
  method: "stock_zh_index_spot_sina"  # 股票指数日线数据

# 数据处理配置（可选）
processor:
  # 使用Python脚本处理数据
  script: "./scripts/process_add_date.py"

# 数据存储配置
storage:
  name: "sqlite1" #对应config.yaml中定义的sqlite1名称，用于支持不同yaml使用相同类型但位置不同的数据存储
  type: "sqlite"
  config:
    table_name: "stock_index_data"
```

### 3. 创建自定义处理脚本

创建 `scripts/process_add_date.py`：

```python
import pandas as pd
from datetime import datetime

def process(df: pd.DataFrame) -> pd.DataFrame:
    """自定义数据处理逻辑"""
    # 计算技术指标，根据需要自身添加
    # df['ma5'] = df['close'].rolling(5).mean()
    # df['ma10'] = df['close'].rolling(10).mean()
    
    # 添加自定义字段，如日期，akshare中基本的数据都需要增加日期
    df['日期'] = datetime.now().strftime("%Y-%m-%d") # 添加日期
    
    return df
```

### 4. 执行Workflow
#### a. 库使用（通过pip安装）
如通过pip安装，则可以使用命令stock-storage-frame，如下：
```bash
# 执行单个workflow
stock-storage-frame --workflow workflows/stock_daily.yaml

# 对特定workflow开启定时任务
stock-storage-frame --workflow workflows/stock_daily.yaml --schedule

# 执行特定目录下所有workflow
stock-storage-frame --workflows-dir workflows

# 对目录下所有workflow执行定时任务
stock-storage-frame --workflows-dir workflows --schedule

# 执行所有workflow（默认目录）
stock-storage-frame --all

# 测试所有组件
stock-storage-frame --test

# 验证workflow配置
stock-storage-frame --validate workflows/stock_daily.yaml

# 查看调度器状态
stock-storage-frame --scheduler-status
```

#### b. 源码使用（从源码项目）
如直接下载本项目源码进行使用，则可以使用命令python -m src.stock_storage.main：
```bash
# 执行单个workflow
python -m src.stock_storage.main --workflow workflows/stock_daily.yaml

# 对特定workflow开启定时任务
python -m src.stock_storage.main --workflow workflows/stock_daily.yaml --schedule

# 执行特定目录下所有workflow
python -m src.stock_storage.main --workflows-dir workflows

# 对目录下所有workflow执行定时任务
python -m src.stock_storage.main --workflows-dir workflows --schedule

# 执行所有workflow（默认目录）
python -m src.stock_storage.main --all

# 测试所有组件
python -m src.stock_storage.main --test

# 验证workflow配置
python -m src.stock_storage.main --validate workflows/stock_daily.yaml

# 查看调度器状态
python -m src.stock_storage.main --scheduler-status
```

### 5. 定时器调度

框架提供了内置的定时器调度功能，可以根据workflow配置中的schedule字段自动执行任务。

```bash
# 启动调度器（后台运行）- 调度所有有schedule的workflow
python -m src.stock_storage.main --schedule

# 对特定workflow开启定时任务
python -m src.stock_storage.main --workflow workflows/stock_daily.yaml --schedule

# 对目录下所有workflow执行定时任务
python -m src.stock_storage.main --workflows-dir workflows --schedule

# 查看调度器状态
python -m src.stock_storage.main --scheduler-status

# 使用自定义配置和workflow目录
python -m src.stock_storage.main --schedule --config custom_config.yaml --workflows-dir custom_workflows
```

调度器功能：
- 自动加载所有包含schedule字段的workflow配置
- 根据cron表达式计算下一次执行时间
- 支持优雅关闭（Ctrl+C）
- 实时日志记录执行结果
- 支持多workflow并发调度
- 支持调度特定workflow或整个目录

## 项目结构

```
stock-storage-frame/
├── README.md
├── pyproject.toml
├── config.yaml                    # 主配置文件
├── workflows/                     # workflow配置目录
│   ├── daily_stock_data.yaml
│   ├── weekly_report.yaml
│   └── realtime_data.yaml
├── scripts/                       # 自定义处理脚本
│   ├── process_daily_data.py
│   └── calculate_indicators.py
├── src/
│   └── stock_storage/
│       ├── __init__.py
│       ├── main.py                # 主程序入口
│       ├── engine.py              # Workflow引擎
│       ├── models.py              # 数据模型
│       ├── factories.py           # 组件工厂
│       ├── collectors/            # 采集器实现
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── akshare.py
│       │   └── tushare.py
│       ├── processors/            # 处理器实现
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── pandas.py
│       │   └── custom.py
│       └── storages/              # 存储器实现
│           ├── __init__.py
│           ├── base.py
│           ├── sqlite.py
│           ├── mysql.py
│           └── csv.py
└── data/                          # 数据存储目录
    ├── stock_data.db              # SQLite数据库
    └── csv/                       # CSV文件
```

## 配置说明

### 主配置文件 (config.yaml)

主配置文件定义了全局的采集器、处理器和存储器配置。支持环境变量替换 `${ENV_VAR}`。

### Workflow配置文件

每个workflow配置文件定义了一个完整的数据处理流程，包括：
- **name**: workflow名称
- **description**: 描述信息
- **schedule**: 执行计划（cron表达式）
- **collector**: 数据采集配置
- **processor**: 数据处理配置
- **storage**: 数据存储配置

### 模板变量

workflow配置支持以下模板变量：
- `{ today:YYYYMMDD }}或{{ today:YYYY-MM-DD }}`: 今天日期
- `{{ yesterday:YYYYMMDD }}或{{ yesterday:YYYY-MM-DD }}`: 昨天日期
- `{{ now }}`: 当前时间

## 扩展开发

### 添加新的采集器

1. 在 `src/stock_storage/collectors/` 目录下创建新的采集器类
2. 继承 `BaseCollector` 类并实现必要的方法
3. 在 `factories.py` 中注册新的采集器

### 添加新的存储器

1. 在 `src/stock_storage/storages/` 目录下创建新的存储器类
2. 继承 `BaseStorage` 类并实现必要的方法
3. 在 `factories.py` 中注册新的存储器

### 添加新的处理器

1. 在 `src/stock_storage/processors/` 目录下创建新的处理器类
2. 继承 `BaseProcessor` 类并实现必要的方法
3. 在 `factories.py` 中注册新的处理器

## 许可证

MIT License

## 贡献

欢迎提交Issue和Pull Request！

## 联系方式

- 项目地址: https://gitee.com/panhuachao/stock-storage-frame
- 如有量化交易系统相关需求，可联系作者（ email: tzzjchao@126.com ）
- 作者: Pan Huachao
