Coverage for memory / compressor.py: 18%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 02:55 +0800

1""" 

2上下文压缩模块。 

3 

4压缩策略:递归摘要 + 目标范围控制 

5- 目标范围:压缩后(摘要 + 短期记忆)占上下文窗口的 20%~25% 

6- 保留窗口:从最新消息往前选,只限制 tokens(不限制条数) 

7- 递归摘要:检测旧摘要,有则合并一起压缩,保证信息不丢失 

8 

9Token 计算:使用 tiktoken 精确计算,支持 GPT-4o 等模型 

10""" 

11import tiktoken 

12from qrclaw.config import ( 

13 OPENAI_MODEL, 

14 COMPRESS_SUMMARY_MAX_TOKENS, COMPRESS_SUMMARY_TARGET_TOKENS, 

15 COMPRESS_RECENT_MAX_TOKENS, 

16 COMPRESS_TARGET_MIN_RATIO, COMPRESS_TARGET_MAX_RATIO, 

17 _MODEL_MAX_TOKENS, 

18) 

19from qrclaw.logger import get_logger 

20 

21logger = get_logger("qrclaw.memory.compressor") 

22 

23# 初始化 tiktoken encoder 

24# 对于未知模型,fallback 到 cl100k_base(GPT-4/4o 使用) 

25try: 

26 _encoding = tiktoken.encoding_for_model(OPENAI_MODEL) 

27except KeyError: 

28 _encoding = tiktoken.get_encoding("cl100k_base") 

29 logger.debug(f"模型 {OPENAI_MODEL} 无对应 encoder,使用 cl100k_base") 

30 

31 

32def count_tokens(messages: list[dict]) -> int: 

33 """ 

34 精确计算消息列表的 token 数。 

35 

36 Args: 

37 messages: OpenAI 格式的消息列表 

38 

39 Returns: 

40 int: token 总数 

41 """ 

42 tokens = 0 

43 for msg in messages: 

44 # 每条消息有固定开销 

45 tokens += 4 # {"role": "...", "content": "..."} 格式开销 

46 for key, value in msg.items(): 

47 if value is not None: 

48 tokens += len(_encoding.encode(str(value))) 

49 tokens += 2 # 对话开销 

50 return tokens 

51 

52 

53def count_text_tokens(text: str) -> int: 

54 """ 

55 精确计算文本的 token 数。 

56 

57 Args: 

58 text: 文本内容 

59 

60 Returns: 

61 int: token 数 

62 """ 

63 return len(_encoding.encode(text)) 

64 

65 

66SUMMARIZE_PROMPT = """请把下面的对话内容整理成结构化摘要,要求: 

671. 按以下分类输出,没有内容的分类可以省略 

682. 保留所有关键信息,不要遗漏重要细节 

693. 语言简洁,不要废话 

704. 控制摘要长度,目标约 {target_tokens} tokens 

71 

72格式: 

73【用户信息】用户的基本信息、身份、偏好等 

74【已完成任务】已经完成的操作和结果 

75【关键结论】重要的决策、发现、约定 

76【待处理事项】还没完成的任务 

77 

78对话内容: 

79{history} 

80""" 

81 

82 

83def _pick_recent(messages: list[dict], max_tokens: int = None) -> tuple[list[dict], list[dict]]: 

84 """ 

85 从最新消息往前选,只限制 tokens(不限制条数)。 

86 

87 返回 (recent, old):recent 是保留的,old 是要压缩的。 

88 """ 

89 if max_tokens is None: 

90 max_tokens = COMPRESS_RECENT_MAX_TOKENS 

91 

92 recent = [] 

93 token_count = 0 

94 

95 for msg in reversed(messages): 

96 t = count_tokens([msg]) 

97 if token_count + t > max_tokens: 

98 break 

99 recent.insert(0, msg) 

100 token_count += t 

101 

102 old = messages[:len(messages) - len(recent)] 

103 logger.debug(f"保留窗口: {len(recent)} 条, {token_count} tokens; 待压缩: {len(old)}") 

104 return recent, old 

105 

106 

107def summarize(session) -> None: 

108 """ 

109 递归摘要压缩: 

110 1. 从最新消息往前选,只限制 tokens 

111 2. 检测是否有旧摘要,有则合并进待压缩内容(递归摘要) 

112 3. 调用 LLM 生成新摘要 

113 4. 检查总长度是否在 20%~25% 范围内,必要时调整 

114 """ 

115 logger.info("开始压缩历史消息") 

116 

117 # 计算目标范围 

118 target_min = int(_MODEL_MAX_TOKENS * COMPRESS_TARGET_MIN_RATIO) 

119 target_max = int(_MODEL_MAX_TOKENS * COMPRESS_TARGET_MAX_RATIO) 

120 logger.debug(f"目标范围: {target_min} ~ {target_max} tokens ({COMPRESS_TARGET_MIN_RATIO*100:.0f}% ~ {COMPRESS_TARGET_MAX_RATIO*100:.0f}%)") 

121 

122 recent, old = _pick_recent(session.messages) 

123 

124 if not old: 

125 logger.warning("没有旧消息可压缩,跳过") 

126 return 

127 

128 # 检测 old 里是否有旧摘要(递归摘要:旧摘要一起参与本次压缩) 

129 has_old_summary = any( 

130 msg.get("role") == "assistant" and str(msg.get("content", "")).startswith("[SUMMARY]") 

131 for msg in old 

132 ) 

133 if has_old_summary: 

134 logger.info("检测到旧摘要,合并进行递归压缩") 

135 

136 history_text = "\n\n".join( 

137 f"[{msg.get('role', 'unknown')}]: {msg.get('content', '')}" 

138 for msg in old 

139 if msg.get("content") 

140 ) 

141 

142 try: 

143 from qrclaw.providers import provider 

144 logger.debug(f"调用 LLM 生成摘要,目标: {COMPRESS_SUMMARY_TARGET_TOKENS} tokens") 

145 resp = provider.chat([ 

146 {"role": "user", "content": SUMMARIZE_PROMPT.format( 

147 history=history_text, 

148 target_tokens=COMPRESS_SUMMARY_TARGET_TOKENS, 

149 )} 

150 ]) 

151 summary = resp.content 

152 summary_tokens = count_text_tokens(summary) 

153 

154 logger.info(f"摘要生成成功,长度: {len(summary)} 字符,{summary_tokens} tokens") 

155 logger.debug(f"摘要内容预览: {summary[:200]}...") 

156 

157 # 计算短期记忆的 token 数 

158 recent_tokens = count_tokens(recent) 

159 total_tokens = summary_tokens + recent_tokens 

160 

161 logger.info(f"压缩后总长度: {total_tokens} tokens (摘要 {summary_tokens} + 短期记忆 {recent_tokens})") 

162 

163 # 检查是否在目标范围内 

164 if total_tokens > target_max: 

165 logger.warning(f"压缩后 {total_tokens} tokens 超过上限 {target_max},减少短期记忆") 

166 # 减少短期记忆的 token 预算 

167 available_for_recent = target_max - summary_tokens 

168 if available_for_recent > 0: 

169 recent, old = _pick_recent(session.messages, max_tokens=available_for_recent) 

170 recent_tokens = count_tokens(recent) 

171 total_tokens = summary_tokens + recent_tokens 

172 logger.info(f"调整后: {total_tokens} tokens (摘要 {summary_tokens} + 短期记忆 {recent_tokens})") 

173 else: 

174 logger.warning("摘要本身已超过上限,保持现状") 

175 

176 elif total_tokens < target_min: 

177 logger.info(f"压缩后 {total_tokens} tokens 低于下限 {target_min},但这是好事,保持现状") 

178 

179 # 重建 session.messages 

180 session.messages = [ 

181 {"role": "assistant", "content": f"[SUMMARY] 以下是之前对话的结构化摘要:\n{summary}"}, 

182 *recent, 

183 ] 

184 session._save() 

185 

186 ratio = total_tokens / _MODEL_MAX_TOKENS * 100 

187 logger.info(f"压缩完成,消息数: {len(session.messages)},占比: {ratio:.1f}%") 

188 print(f" [系统] 上下文压缩完成,占比 {ratio:.1f}%") 

189 

190 except Exception as e: 

191 logger.error(f"压缩失败: {e}", exc_info=True) 

192 raise 

193 

194 

195def truncate(session, keep: int = 20) -> None: 

196 """ 

197 滚动截断策略:直接丢弃旧消息,只保留最近 keep 条。 

198 最简单,但会丢失早期信息。 

199 """ 

200 if len(session.messages) > keep: 

201 old_count = len(session.messages) 

202 session.messages = session.messages[-keep:] 

203 session._save() 

204 logger.info(f"滚动截断完成,保留最近 {keep} 条消息 (丢弃 {old_count - keep} 条)") 

205 else: 

206 logger.debug(f"消息数未超过阈值 ({len(session.messages)}/{keep}),无需截断")