Coverage for heartbeat.py: 0%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 02:55 +0800

1""" 

2心跳机制 

3 

4定期触发 Agent 执行维护任务,如: 

5- 审查并清理中期记忆 

6- 归档旧会话 

7- 自我反思与学习 

8 

9Heartbeat.md 格式示例: 

10 

11# Heartbeat 任务 

12 

13每隔一段时间(默认 1 小时),Agent 会自动执行以下任务: 

14 

15## 记忆维护 

16- 审查 MEMORY.md,清理过时条目 

17- 合并重复信息 

18- 保留用户偏好 

19 

20## 自我反思 

21- 回顾最近的对话 

22- 记录学到的新知识 

23- 总结常见的错误和解决方案 

24""" 

25import threading 

26import time 

27from datetime import datetime 

28from pathlib import Path 

29from typing import Callable, Optional 

30from qrclaw.logger import get_logger 

31 

32logger = get_logger("qrclaw.heartbeat") 

33 

34# 默认心跳间隔(秒) 

35DEFAULT_INTERVAL = 3600 # 1 小时 

36 

37 

38class Heartbeat: 

39 """心跳管理器""" 

40 

41 def __init__(self, interval: int = DEFAULT_INTERVAL): 

42 self.interval = interval 

43 self.running = False 

44 self.thread: Optional[threading.Thread] = None 

45 self.last_heartbeat: Optional[datetime] = None 

46 self.on_trigger: Optional[Callable] = None # 触发时的回调函数 

47 self._stop_event = threading.Event() 

48 

49 def start(self, on_trigger: Callable = None): 

50 """ 

51 启动心跳线程。 

52  

53 Args: 

54 on_trigger: 触发时的回调函数 

55 """ 

56 if self.running: 

57 logger.warning("Heartbeat 已在运行中") 

58 return 

59 

60 self.on_trigger = on_trigger 

61 self.running = True 

62 self._stop_event.clear() 

63 

64 self.thread = threading.Thread( 

65 target=self._run_loop, 

66 name="heartbeat", 

67 daemon=True 

68 ) 

69 self.thread.start() 

70 logger.info(f"Heartbeat 已启动,间隔 {self.interval}") 

71 

72 def stop(self): 

73 """停止心跳线程""" 

74 if not self.running: 

75 return 

76 

77 self.running = False 

78 self._stop_event.set() 

79 

80 if self.thread: 

81 self.thread.join(timeout=5) 

82 logger.info("Heartbeat 已停止") 

83 

84 def _run_loop(self): 

85 """心跳主循环""" 

86 while self.running and not self._stop_event.wait(self.interval): 

87 try: 

88 self._do_heartbeat() 

89 except Exception as e: 

90 logger.error(f"Heartbeat 执行失败: {e}", exc_info=True) 

91 

92 def _do_heartbeat(self): 

93 """执行心跳任务""" 

94 self.last_heartbeat = datetime.now() 

95 logger.info(f"Heartbeat 触发: {self.last_heartbeat}") 

96 

97 if self.on_trigger: 

98 try: 

99 self.on_trigger() 

100 except Exception as e: 

101 logger.error(f"Heartbeat 回调执行失败: {e}", exc_info=True) 

102 

103 

104# 全局心跳实例 

105_heartbeat: Optional[Heartbeat] = None 

106 

107 

108def get_heartbeat() -> Optional[Heartbeat]: 

109 """获取全局心跳实例""" 

110 return _heartbeat 

111 

112 

113def start_heartbeat( 

114 interval: int = DEFAULT_INTERVAL, 

115 on_trigger: Callable = None 

116) -> Heartbeat: 

117 """ 

118 启动全局心跳。 

119  

120 Args: 

121 interval: 心跳间隔(秒) 

122 on_trigger: 触发时的回调函数 

123  

124 Returns: 

125 Heartbeat 实例 

126 """ 

127 global _heartbeat 

128 

129 if _heartbeat and _heartbeat.running: 

130 logger.warning("Heartbeat 已在运行,跳过启动") 

131 return _heartbeat 

132 

133 _heartbeat = Heartbeat(interval=interval) 

134 _heartbeat.start(on_trigger=on_trigger) 

135 return _heartbeat 

136 

137 

138def stop_heartbeat(): 

139 """停止全局心跳""" 

140 global _heartbeat 

141 

142 if _heartbeat: 

143 _heartbeat.stop() 

144 _heartbeat = None 

145 

146 

147def execute_heartbeat_tasks(workspace) -> str: 

148 """ 

149 执行心跳任务(后台子 agent)。 

150  

151 Args: 

152 workspace: 当前工作空间 

153 Returns: 

154 str: 执行结果摘要 

155 """ 

156 from qrclaw.agent import run_sub_agent 

157 from qrclaw.workspace import Workspace 

158 

159 heartbeat_file = workspace.heartbeat_file 

160 if not heartbeat_file.exists(): 

161 logger.warning("HEARTBEAT.md 不存在,跳过心跳任务") 

162 return "HEARTBEAT.md 不存在" 

163 

164 # 读取心跳任务 

165 heartbeat_content = heartbeat_file.read_text(encoding="utf-8") 

166 

167 # 构造任务 prompt 

168 task = f"""请执行以下心跳任务。静默执行,完成后简要汇报结果。 

169 

170{heartbeat_content} 

171 

172注意: 

1731. 不要打扰用户,静默执行 

1742. 如果使用 review_memory,先 analyze 再决定是否 cleanup 

1753. 完成后简要汇报:做了什么、结果如何 

176""" 

177 

178 logger.info("心跳任务开始执行") 

179 

180 try: 

181 # 创建心跳专用子 agent 

182 sub_workspace = workspace.sub_agent("heartbeat") 

183 result = run_sub_agent(task, sub_workspace) 

184 logger.info(f"心跳任务执行完成: {result[:100]}...") 

185 return result 

186 except Exception as e: 

187 logger.error(f"心跳任务执行失败: {e}", exc_info=True) 

188 return f"执行失败: {e}" 

189 

190 

191def get_default_heartbeat_content() -> str: 

192 """生成默认的 HEARTBEAT.md 内容""" 

193 return """# Heartbeat 任务 

194 

195每隔一段时间,Agent 会自动执行以下维护任务。 

196 

197--- 

198 

199## 记忆维护 

200 

201- 审查 `MEMORY.md`,识别过时、重复内容 

202- 如有过时内容,调用 `review_memory(action='cleanup')` 清理 

203- 保留用户偏好和重要配置 

204 

205--- 

206 

207## 自我反思(可选) 

208 

209- 回顾最近的对话,是否有值得记录的知识 

210- 如有,调用 `write_memory()` 记录 

211 

212--- 

213 

214**注意**:静默执行,完成后简要汇报即可。 

215"""