Metadata-Version: 2.1
Name: dwspark
Version: 2025.0.1
Summary: DataWhale用于星火杯2025的适配sdk
Author: datawhale
Author-email: datawhale@example.com
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: spark-ai-python
Requires-Dist: loguru
Requires-Dist: cffi
Requires-Dist: gevent
Requires-Dist: greenlet
Requires-Dist: pycparser
Requires-Dist: six
Requires-Dist: websockets
Requires-Dist: websocket-client
Requires-Dist: chardet
Requires-Dist: requests-toolbelt
Requires-Dist: numpy

# 项目介绍
本项目的目标是简化讯飞关于web socket的调用，并封装成python对象。我们希望学习者使用sdk后，可以从官方demo的几十行代码化简成几行代码。
目前基本简化适配了星火杯中提供的关于web socket的模型,其中包括：
1. 星火多语种大模型
2. 中文识别大模型
3. 超拟人语音合成
4. 一句话复刻
5. 图片理解
6. 图片生成
7. 角色模拟
8. 星火知识库
9. 文本向量化
10. 智能PPT生成
11. 智能简历生成
# SDK调用方式
## 1. 安装SDK
```bash
pip install dwspark==2025.0.1 -i https://pypi.tuna.tsinghua.edu.cn/simple --default-timeout=600
```

## 2. 加载配置
```python
from dwspark.config import Config
# 加载系统环境变量：SPARKAI_UID、SPARKAI_APP_ID、SPARKAI_API_KEY、SPARKAI_API_SECRET
config = Config()
# 自定义key写入
config = Config('14****93', 'eb28b****b82', 'MWM1MzBkOD****Mzk0')
```

## 3.调用模型
```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File  : test.py
# @Author: Richard Chiming Xu
# @Date  : 2025/5/15
# @Desc  : 一次性用dwspark的方式完成所有api调用
import os
import time

from loguru import logger
from dwspark.config import Config
# 加载配置
config = Config()

'''
多语种大模型
'''
from dwspark.models import MultiLang
# 模拟问题
message = [{"role": "system", "content": "あなたはとても専門的な日本語の国語の先生で、文才が華麗です。"},{"role": "user", "content": '100字の作文を書いてください。'}]
logger.info('----------批式调用对话----------')
model = MultiLang(config, stream=False)
logger.info(model.generate(message))
logger.info('----------流式调用对话----------')
model = MultiLang(config, stream=True)
for r in model.generate_stream(message):
    logger.info(r)
logger.info('done.')

'''
    语音合成模型
'''
from dwspark.models import Text2Audio
text2audio = Text2Audio(config)
result = text2audio.generate("你好，我的名字叫Amy", "./demo.mp3")
logger.info(f'-------语音合成结果：{result}-------')

'''
    中英语音识别模型
'''
from dwspark.models import Audio2Text
model = Audio2Text(config)
result = model.recognize('./demo.mp3')
logger.info(f'识别结果：{result}')

'''
    一句话语音克隆
'''
from dwspark.models import VoiceSynthesis
logger.info('----------批式调用----------')
model = VoiceSynthesis(config, stream=False)
params_with_training = {
    'text': "欢迎使用讯飞星火认知大模型。",
    'model_name': "new_model",
    'audio_path': './一句话克隆声音/test.wav',
    # 音频需要宝子们根据要求录制  参见“./dwspark/voice/语音文本.txt”
    # 'force_train': True,  # Force retraining
    'output_path': './new_output.mp3',
    'voice_model_config': 'voice_models.json'  # Add the voice_model_config to params
}

# Uncomment to run training and synthesis
result = model.generate(params_with_training)
logger.info(f"Training and synthesis result: {result}")

'''
    图片生成大模型
'''
from dwspark.models import Text2Picture
model = Text2Picture(config)
img_path = model.generate("请你生成一幅鲸鱼水墨画", "./")
logger.info(f'生成图片位置：{img_path}')


'''
    图片理解
'''
from dwspark.models import PictureUnderstanding
model = PictureUnderstanding(config)
result = model.understanding("请描述以下这幅画的内容", img_path)
logger.info(f'理解结果：{result}')

'''
    角色模拟
'''
from dwspark.models import CharacterSimulator
simulator = CharacterSimulator(config)
player_id = simulator.create_player("张三", "张三是一名学生")
agent_id = simulator.create_agent(player_id, "AI助手")

'''
    知识库服务
'''
from dwspark.models import KnowledgeBase


'''
星火知识库示例
'''
# 创建星火知识库实例
model = KnowledgeBase(config)

# 示例1：上传文档
logger.info('----------上传文档----------')
# 准备测试文件路径
test_file_path = os.path.join(os.path.dirname(__file__), './星火知识库/test_document.txt')

# 如果测试文件不存在，创建一个简单的测试文件
if not os.path.exists(test_file_path):
    with open(test_file_path, 'w', encoding='utf-8') as f:
        f.write("这是一个测试文档，用于演示星火知识库API的使用。\n星火大模型是讯飞推出的大语言模型，具有强大的自然语言处理能力。")
    logger.info(f"已创建测试文件: {test_file_path}")

# 上传文档
file_id = model.upload_document(test_file_path)
logger.info(f"上传文档结果，文件ID: {file_id}")

# 示例2：获取文档详情
if file_id:
    logger.info('----------获取文档详情----------')
    doc_info = model.get_document_info(file_id)
    logger.info(f"文档详情: {doc_info}")

# 示例3：获取文档分块内容
if file_id:
    logger.info('----------获取文档分块内容----------')
    # 等待2秒，确保文档已经处理完成
    time.sleep(2)
    chunks = model.get_document_chunks(file_id)
    if chunks:
        logger.info(f"文档分块数量: {len(chunks)}")
        for i, chunk in enumerate(chunks[:2]):  # 只显示前两个分块
            logger.info(f"分块 {i+1}: {chunk.get('content', '')[:50]}...")

# 示例4：创建知识库
logger.info('----------创建知识库----------')
# 添加时间戳避免重名
repo_name = f"测试知识库_{int(time.time())}"
repo_id = model.create_repository(repo_name, "这是一个测试用的知识库", "测试,知识库")
logger.info(f"创建知识库结果，知识库ID: {repo_id}")

# 示例5：将文档添加到知识库
if repo_id and file_id:
    logger.info('----------添加文档到知识库----------')
    # 注意：这里需要调用知识库的添加文件方法，这是一个示例实现
    # 实际使用时请参考API文档或SDK实现
    url = f"{model.REPO_API_URL}/repo/file/add"
    headers = model._get_headers()
    body = {
        "repoId": repo_id,
        "fileIds": [file_id]
    }

    import requests
    response = requests.post(url, json=body, headers=headers)
    result = model._handle_response(response, "添加文档到知识库")
    logger.info(f"添加文档到知识库结果: {result}")

# 示例6：获取知识库列表
logger.info('----------获取知识库列表----------')
# 注意：这里需要调用知识库的列表方法，这是一个示例实现
# 实际使用时请参考API文档或SDK实现
url = f"{model.REPO_API_URL}/repo/list"
headers = model._get_headers()
body = {
    "currentPage": 1,
    "pageSize": 10
}

response = requests.post(url, json=body, headers=headers)
result = model._handle_response(response, "获取知识库列表")
if result and "data" in result:
    repos = result["data"].get("rows", [])  # 修正字段名从list改为rows
    logger.info(f"知识库数量: {len(repos)}")
    for repo in repos[:2]:  # 只显示前两个知识库
        logger.info(f"知识库: {repo.get('repoName')} (ID: {repo.get('repoId')})")

# 示例7：与知识库对话
if repo_id:
    logger.info('----------与知识库对话----------')
    # 定义回调函数
    def on_message(data):
        payload = data.get("payload", {})
        content = payload.get("content", "")
        logger.info(f"收到回复: {content}")

    def on_error(error):
        logger.error(f"对话错误: {error}")

    def on_close():
        logger.info("对话结束")

    # 发起对话
    query = "星火大模型有什么特点？"
    logger.info(f"提问: {query}")
    model.chat(
        query=query,
        repo_id=repo_id,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )

    # 注意：由于chat方法是异步的，这里需要等待一段时间让回调函数有机会执行
    import time
    time.sleep(5)

logger.info('示例执行完成。')

'''
    Embedding
'''
from dwspark.models import LLMEmbedding
model = LLMEmbedding(config)
vectors = model.get_embeddings(["你好呀", "我叫amy"])
logger.info(f'Embedding vectors: {vectors}')

'''
    PPT生成
'''

from dwspark.models import PPTGenerator
logger.info("智能PPT生成器初始化成功.")
ppt_generator = PPTGenerator(config=config)


# 根据文本生成PPT
logger.info("\n--- 演示: 直接根据文本创建PPT ---")
direct_ppt_query = "请帮我制作一份关于可再生能源的科普PPT，包含太阳能、风能和水能"  # ppt主题
logger.info(f"PPT主题: \"{direct_ppt_query}\"")
sid_direct = ppt_generator.create_ppt_from_text(
    query_text=direct_ppt_query,    # ppt主题
    template_id="20240718489569D",  # 模板ID
    author="Datawhale团队", # ppt作者
    is_card_note=True,
    ai_image_type="normal" # 使用 "normal" 或 "advanced"
)
if sid_direct:
    logger.info(f"直接文本PPT创建任务已启动，SID: {sid_direct}")
    ppt_url_direct = ppt_generator.poll_for_result(sid_direct)
    if ppt_url_direct:
        logger.info(f"直接文本PPT已生成！下载链接: {ppt_url_direct}")

logger.info("\n--- 智能PPT生成器演示流程结束 ---")

'''
    简历生成
'''
# SDK引入模型
from dwspark.models import ResumeGenerator
import json


resume_gen = ResumeGenerator(config=config)
logger.info("智能简历生成器初始化成功.")

# --- 演示: 生成简历 ---
logger.info("\n--- 演示: 根据文本描述生成智能简历 ---")
# 从原始demo获取的示例描述
description = """姓名：张三，年龄：28岁，教育经历：2018年本科毕业于合肥工业大学；工作经历：java开发工程师..."""
logger.info(f"输入的简历描述: \n{description}")

generated_resume_bytes = resume_gen.generate(resume_description_text=description)

if generated_resume_bytes:
    logger.info(f"智能简历API调用成功，接收到 {len(generated_resume_bytes)} bytes 的数据。")
    response_str = generated_resume_bytes.decode('utf-8')
    response_json = json.loads(response_str)
    logger.info("API响应内容 (格式化):")
    # 使用logger.info打印格式化的JSON，Loguru能很好地处理多行消息
    logger.info(f"\n{json.dumps(response_json, indent=2, ensure_ascii=False)}")
    # 您可以根据需要取消注释以下代码，以提取并单独记录特定字段：
    if "links" in response_json and isinstance(response_json.get("links"), list):
        logger.info("提取到的链接详情:")
        for i, link_info in enumerate(response_json["links"]):
            img_url = link_info.get("img_url")
            word_url = link_info.get("word_url")
            logger.info(f"  链接组 {i+1}:")
            logger.info(f"    图片URL: {img_url}")
            logger.info(f"    文档URL: {word_url}")

logger.info("\n--- 智能简历生成器演示流程结束 ---")
```
