Coverage for agentos/tests/test_hitl.py: 0%

95 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Tests for HITL (Human-in-the-Loop) approval module. 

3""" 

4 

5import pytest 

6from agentos.hitl.approver import ( 

7 HumanInTheLoop, 

8 ApprovalRequest, 

9 ApprovalDecision, 

10 ApprovalStatus, 

11 RiskLevel, 

12 ApprovalPolicy, 

13) 

14from agentos.hitl.presets import ( 

15 default_approval_policy, 

16 permissive_approval_policy, 

17 strict_approval_policy, 

18) 

19 

20 

21class TestApprovalRequest: 

22 def test_create_request(self): 

23 req = ApprovalRequest( 

24 request_id="abc-123", 

25 action="delete_file", 

26 description="Delete /tmp/test.txt", 

27 risk_level=RiskLevel.HIGH, 

28 tool_name="file.delete", 

29 ) 

30 assert req.request_id == "abc-123" 

31 assert req.risk_level == RiskLevel.HIGH 

32 assert req.tool_name == "file.delete" 

33 

34 

35class TestApprovalDecision: 

36 def test_approved(self): 

37 d = ApprovalDecision(request_id="x", status=ApprovalStatus.APPROVED) 

38 assert d.is_approved 

39 assert not d.is_rejected 

40 

41 def test_modified_is_approved(self): 

42 d = ApprovalDecision(request_id="x", status=ApprovalStatus.MODIFIED, modified_args={"force": True}) 

43 assert d.is_approved 

44 

45 def test_rejected(self): 

46 d = ApprovalDecision(request_id="x", status=ApprovalStatus.REJECTED, reason="too risky") 

47 assert d.is_rejected 

48 assert not d.is_approved 

49 

50 

51class TestHumanInTheLoop: 

52 def test_low_risk_auto_skipped(self): 

53 hitl = HumanInTheLoop(policy=default_approval_policy()) 

54 req, decision = hitl.request_and_decide( 

55 action="read_file", 

56 description="Read config.yaml", 

57 risk_level=RiskLevel.LOW, 

58 tool_name="file.read", 

59 ) 

60 assert decision.status == ApprovalStatus.SKIPPED 

61 

62 def test_high_risk_needs_approval(self): 

63 hitl = HumanInTheLoop(policy=default_approval_policy()) 

64 hitl.callback = lambda r: ApprovalDecision( 

65 request_id=r.request_id, 

66 status=ApprovalStatus.APPROVED, 

67 reason="OK", 

68 ) 

69 req, decision = hitl.request_and_decide( 

70 action="delete_all", 

71 description="Delete production database", 

72 risk_level=RiskLevel.HIGH, 

73 tool_name="db.drop", 

74 ) 

75 assert decision.is_approved 

76 

77 def test_auto_approve_domain(self): 

78 policy = permissive_approval_policy() 

79 hitl = HumanInTheLoop(policy=policy) 

80 req, decision = hitl.request_and_decide( 

81 action="search", 

82 description="Search web", 

83 risk_level=RiskLevel.MEDIUM, 

84 tool_name="read.web_search", 

85 ) 

86 assert decision.status == ApprovalStatus.APPROVED 

87 

88 def test_blocked_domain(self): 

89 policy = strict_approval_policy() 

90 hitl = HumanInTheLoop(policy=policy) 

91 req, decision = hitl.request_and_decide( 

92 action="format", 

93 description="Format disk", 

94 risk_level=RiskLevel.CRITICAL, 

95 tool_name="delete.format_disk", 

96 ) 

97 assert decision.status == ApprovalStatus.REJECTED 

98 

99 def test_rejected_decision(self): 

100 hitl = HumanInTheLoop() 

101 hitl.callback = lambda r: ApprovalDecision( 

102 request_id=r.request_id, 

103 status=ApprovalStatus.REJECTED, 

104 reason="User said no", 

105 ) 

106 _, decision = hitl.request_and_decide( 

107 action="delete", risk_level=RiskLevel.CRITICAL 

108 ) 

109 assert decision.is_rejected 

110 

111 def test_history(self): 

112 hitl = HumanInTheLoop() 

113 hitl.callback = lambda r: ApprovalDecision( 

114 request_id=r.request_id, 

115 status=ApprovalStatus.APPROVED, 

116 ) 

117 hitl.request_and_decide(action="a1", risk_level=RiskLevel.HIGH) 

118 hitl.request_and_decide(action="a2", risk_level=RiskLevel.LOW) 

119 assert len(hitl.get_history()) == 2 

120 

121 def test_pending_queue(self): 

122 hitl = HumanInTheLoop(policy=ApprovalPolicy(require_approval_for_risk={ 

123 RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL 

124 })) 

125 req = hitl.request_approval(action="x", risk_level=RiskLevel.LOW) 

126 assert len(hitl.get_pending()) == 1 

127 

128 decision = ApprovalDecision(request_id=req.request_id, status=ApprovalStatus.APPROVED) 

129 hitl.decide(req.request_id, decision) 

130 assert len(hitl.get_pending()) == 0 

131 assert hitl.get_decision(req.request_id).is_approved 

132 

133 def test_approval_cache(self): 

134 hitl = HumanInTheLoop(policy=ApprovalPolicy(cache_approval_seconds=60)) 

135 hitl.callback = lambda r: ApprovalDecision( 

136 request_id=r.request_id, 

137 status=ApprovalStatus.APPROVED, 

138 ) 

139 # First call triggers callback 

140 req1, d1 = hitl.request_and_decide( 

141 action="read", tool_name="file.read", risk_level=RiskLevel.HIGH 

142 ) 

143 assert d1.is_approved 

144 # Second call should use cache (same tool+action) 

145 req2, d2 = hitl.request_and_decide( 

146 action="read", tool_name="file.read", risk_level=RiskLevel.HIGH 

147 ) 

148 assert d2.is_approved 

149 assert len(hitl.get_history()) == 2 

150 

151 def test_critical_blocked_automatically(self): 

152 policy = strict_approval_policy() 

153 hitl = HumanInTheLoop(policy=policy) 

154 # No callback set, critical risk with blocked domain 

155 req, decision = hitl.request_and_decide( 

156 action="format", 

157 risk_level=RiskLevel.CRITICAL, 

158 tool_name="delete.format_disk", 

159 ) 

160 assert decision.status == ApprovalStatus.REJECTED 

161 

162 def test_max_pending(self): 

163 policy = ApprovalPolicy( 

164 require_approval_for_risk={RiskLevel.LOW}, 

165 max_pending_requests=2, 

166 ) 

167 hitl = HumanInTheLoop(policy=policy) 

168 hitl.request_approval(action="a1", risk_level=RiskLevel.LOW) 

169 hitl.request_approval(action="a2", risk_level=RiskLevel.LOW) 

170 req3 = hitl.request_approval(action="a3", risk_level=RiskLevel.LOW) 

171 d = hitl.get_decision(req3.request_id) 

172 assert d.status == ApprovalStatus.REJECTED 

173 assert "Max pending" in d.reason 

174 

175 

176class TestApprovalPresets: 

177 def test_default(self): 

178 p = default_approval_policy() 

179 assert RiskLevel.HIGH in p.require_approval_for_risk 

180 assert RiskLevel.LOW not in p.require_approval_for_risk 

181 

182 def test_permissive(self): 

183 p = permissive_approval_policy() 

184 assert RiskLevel.CRITICAL in p.require_approval_for_risk 

185 assert RiskLevel.HIGH not in p.require_approval_for_risk 

186 

187 def test_strict(self): 

188 p = strict_approval_policy() 

189 assert RiskLevel.MEDIUM in p.require_approval_for_risk 

190 assert "delete" in p.block_domains