Metadata-Version: 2.4
Name: cmdwall
Version: 0.1.3
Summary: A lightweight shell execution firewall for LLM agents. Intercepts, classifies, and gatekeeps shell commands before execution.
License: MIT
Keywords: ai,anthropic,claude,command,langchain,llm,openai,security,shell
Requires-Python: >=3.9
Requires-Dist: aiofiles>=23.0.0
Requires-Dist: httpx>=0.25.0
Requires-Dist: pydantic-settings>=2.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pyyaml>=6.0
Requires-Dist: structlog>=23.1.0
Requires-Dist: tenacity>=8.2.0
Requires-Dist: typing-extensions>=4.5.0
Provides-Extra: anthropic
Requires-Dist: anthropic>=0.18.0; extra == 'anthropic'
Provides-Extra: dev
Requires-Dist: black; extra == 'dev'
Requires-Dist: isort; extra == 'dev'
Requires-Dist: mypy; extra == 'dev'
Requires-Dist: ruff; extra == 'dev'
Provides-Extra: google
Requires-Dist: google-generativeai>=0.3.0; extra == 'google'
Provides-Extra: langchain
Requires-Dist: langchain-core>=0.1.0; extra == 'langchain'
Provides-Extra: mcp
Requires-Dist: mcp>=0.1.0; extra == 'mcp'
Provides-Extra: openai
Requires-Dist: openai>=1.0.0; extra == 'openai'
Provides-Extra: test
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'test'
Requires-Dist: pytest-mock>=3.10.0; extra == 'test'
Requires-Dist: pytest>=7.0.0; extra == 'test'
Description-Content-Type: text/markdown

# cmdwall

**A lightweight shell execution firewall for LLM agents.**

`cmdwall` intercepts, classifies, and gatekeeps shell commands before they execute — sitting between your agent loop and the OS. Every command passes through a multi-stage security pipeline: path sandboxing, pattern-based risk classification, config-level allowlists/blocklists, and optional LLM-backed reasoning review. Commands that pass all stages are executed asynchronously with output capping and timeout enforcement. Every decision — allow or deny — is asynchronously logged to a rotating JSONL audit trail.

---

## Table of Contents

- [Installation](#installation)
- [Quick Start](#quick-start)
- [Architecture Overview](#architecture-overview)
- [Security Pipeline](#security-pipeline)
  - [Stage 1 — Sandbox Validation](#stage-1--sandbox-validation)
  - [Stage 2 — Blocklist / Allowlist](#stage-2--blocklist--allowlist)
  - [Stage 3 — Risk Classification (Safety Tiers)](#stage-3--risk-classification-safety-tiers)
  - [Stage 4 — Gatekeeper LLM](#stage-4--gatekeeper-llm)
  - [Stage 5 — Execution](#stage-5--execution)
- [Configuration](#configuration)
  - [Environment Variables](#environment-variables)
  - [Allowlists and Blocklists](#allowlists-and-blocklists)
- [Custom Risk Rules](#custom-risk-rules)
- [Implementing a Provider](#implementing-a-provider)
- [OpenAI Tool Call Integration](#openai-tool-call-integration)
- [Data Models](#data-models)
- [Audit Logging](#audit-logging)
- [Graceful Shutdown](#graceful-shutdown)
- [Testing](#testing)
- [Optional Dependencies](#optional-dependencies)
- [Project Layout](#project-layout)

---

## Installation

**Core package (no LLM provider):**

```bash
pip install cmdwall
```

**With a specific LLM provider:**
The core package has no LLM SDK dependency — you supply the provider.

```bash
pip install "cmdwall[openai]"
pip install "cmdwall[anthropic]"
pip install "cmdwall[google]"
pip install "cmdwall[langchain]"
pip install "cmdwall[mcp]"
```


## Quick Start

```python
import asyncio
from cmdwall import CmdWall

async def main():
    wall = CmdWall(allowed_paths=["/home/user/project"])

    result = await wall.execute(
        command="ls -la",
        reasoning="Listing the project directory to understand its structure."
    )

    if result.success:
        print(result.stdout)
    else:
        print("Blocked:", result.denial_reason)

    await wall.shutdown()

asyncio.run(main())
```

If you want an LLM to review ambiguous commands (tiers 2 and 3), pass a `provider` — any object implementing `BaseLLM`:

```python
from cmdwall import CmdWall
from my_providers import MyOpenAIProvider

wall = CmdWall(
    provider=MyOpenAIProvider(),
    allowed_paths=["/home/user/project"],
    blocked_paths=["/home/user/project/.secrets"]
)
```

Without a provider, tier 2+ commands that reach the gatekeeper stage are blocked with a `Configuration Error` denial rather than failing silently.

---

## Architecture Overview

```
execute(command, reasoning)
        │
        ▼
┌─────────────────────┐
│  SandboxValidator   │  ← path containment check
└─────────┬───────────┘
          │ pass
          ▼
┌─────────────────────┐
│  Config Blocklist   │  ← exact command-type match
└─────────┬───────────┘
          │ not blocked
          ▼
┌─────────────────────┐
│  Config Allowlist   │  ← exact command-type match → fast-allow (skip gatekeeper)
└─────────┬───────────┘
          │ not allowlisted
          ▼
┌─────────────────────────┐
│  SafetyLevelClassifier  │  ← regex-based tier assignment (1–4)
└─────────┬───────────────┘
          │
          ├─ tier 1 ──► execute directly
          ├─ tier 4 ──► block immediately
          │
          ▼ tier 2 or 3
┌─────────────────────┐
│   GatekeeperLLM     │  ← async LLM review
└─────────┬───────────┘
          │
          ├─ ALLOW   ──► execute
          ├─ DENY    ──► block
          └─ CHALLENGE ► block + request clarification
                │
                ▼
          AuditLogger (every path, async JSONL)
```

---

## Security Pipeline

### Stage 1 — Sandbox Validation

`SandboxValidator` inspects every token in the command string for path-shaped arguments (containing `/` or `\`). For each candidate path it:

1. Strips `=`-prefixed values (handles `--output=/some/path` patterns).
2. Resolves the path to an absolute canonical form using `Path.resolve()`.
3. Checks the resolved path against `blocked_paths` first, then `allowed_paths`.
4. Flags that begin with `-` are skipped entirely as non-path tokens.

A command is blocked if any resolved path falls outside `allowed_paths` or inside `blocked_paths`.

```python
wall = CmdWall(
    allowed_paths=["/workspace"],
    blocked_paths=["/workspace/.env", "/workspace/secrets"]
)
```

Paths outside `allowed_paths` that have no `/` or `\` (plain command names) pass through to later stages — the sandbox is a path-containment layer, not a command whitelist.

---

### Stage 2 — Blocklist / Allowlist

After the sandbox, cmdwall consults two lists configured via `CMDWallConfig`. Matching is done against the **first token** of the parsed command (the command type), extracted via `shlex.split`.

- **Blocklist:** If the command type matches any entry, the command is denied immediately with a `Config Blocked` denial — no gatekeeper consulted.
- **Allowlist:** If the command type matches any entry, the command is executed immediately — bypassing risk classification and the gatekeeper entirely.

These lists are evaluated in the order: sandbox → blocklist → allowlist → classifier.

---

### Stage 3 — Risk Classification (Safety Tiers)

`SafetyLevelClassifier` applies a ranked set of regex rules to the full command string. Every rule carries a tier (1–4); the classifier returns the **highest matching tier**. If no rule matches, the default is tier 2.

| Tier | Meaning | Default behaviour |
|------|---------|-------------------|
| 1 | Read-only, harmless | Execute immediately, no gatekeeper |
| 2 | Potentially impactful (network, package install, simple `rm`) | Sent to gatekeeper |
| 3 | Destructive or privilege-escalating | Sent to gatekeeper |
| 4 | Critically dangerous | Blocked immediately, no gatekeeper |

**Default tier 4 patterns (hard-blocked):**

| Pattern | Description |
|---------|-------------|
| `:\(\)\{.*:\|:.*&.*\}.*;` | Fork bomb |
| `mkfs` | Filesystem formatting |
| `dd\s+if=` | Low-level disk write |
| `> /dev/sd[a-z]` | Raw device write |

**Default tier 3 patterns (gatekeeper required):**

`rm -r`, `rm -f`, `rm --recursive`, `rm --force`, `chmod`, `chown`, `sudo`, `git push --force` / `git push -f`, `systemctl stop|disable|mask`

**Default tier 2 patterns (gatekeeper required):**

`rm` (without `-r`/`-f`), `curl`, `wget`, `git push` (without `--force`), `ssh`, `npm install|publish`, `pip install`

**Default tier 1 patterns (auto-allow):**

`ls`, `pwd`, `echo`, `cat`, `git status`, `git log`, `grep`, `find`

Classification is case-insensitive. Patterns are matched with `re.search` against the full command string (not just the first token), so `sudo apt install vim` matches both the `sudo` tier-3 rule and would return tier 3 as the highest hit.

---

### Stage 4 — Gatekeeper LLM

For tier 2 and 3 commands that pass earlier stages, `GatekeeperLLM` calls the configured `BaseLLM` provider's `evaluate_command` method. The provider receives:

- `command` — the raw command string
- `reasoning` — the agent's stated justification
- `risk_tier` — the integer tier from the classifier
- `context` — a dict containing at minimum `{"os": "<platform info>"}`, plus anything passed via `execute(context=...)`

The provider must return a `GatekeeperResponse` with one of three decisions:

| Decision | Effect |
|----------|--------|
| `ALLOW` | Command proceeds to execution |
| `DENY` | Command blocked; `denial_reason` is set to the explanation |
| `CHALLENGE` | Command blocked; `denial_reason` includes `"Clarification needed: "` + `required_clarification` (or `explanation`) |

If no `reasoning` is supplied for a tier 2 or 3 command, the gatekeeper returns `DENY` without contacting the LLM provider.

If the LLM provider raises an exception, the gatekeeper fails closed — returning `DENY` with the error message.

---

### Stage 5 — Execution

Commands that clear all stages are executed via `asyncio.create_subprocess_shell`. Two safety measures apply at runtime:

**Timeout:** Controlled by `CMDWallConfig.default_timeout_seconds` (default: 300s). On timeout, the process receives `SIGTERM`, then `SIGKILL` after a 2-second grace period. The returned `ExecutionResult` has `success=False` and a `denial_reason` describing the timeout.

**Output capping:** `stdout` and `stderr` are read in 4096-byte chunks. If either stream exceeds `CMDWallConfig.max_output_bytes` (default: 1 MB), reading stops and `\n... [TRUNCATED]` is appended. Both streams are decoded as UTF-8 with `errors='replace'`.

The `ExecutionResult.success` flag mirrors the process exit code: `returncode == 0` → `True`, anything else → `False`.

---

## Configuration

`CMDWallConfig` is a `pydantic-settings` `BaseSettings` model. It resolves values in priority order: explicit constructor kwargs → environment variables → `.env` file → field defaults.

### Environment Variables

All variables use the `CMDWALL_` prefix.

| Variable | Type | Default | Description |
|---|---|---|---|
| `CMDWALL_DEBUG_MODE` | `bool` | `False` | Print a formatted debug summary for every `execute()` call to stdout |
| `CMDWALL_PROVIDER` | `str` | `"openai"` | Informational default provider name |
| `CMDWALL_OPENAI_API_KEY` | `str` | `None` | OpenAI API key (used by your provider implementation) |
| `CMDWALL_ANTHROPIC_API_KEY` | `str` | `None` | Anthropic API key |
| `CMDWALL_DEFAULT_TIMEOUT_SECONDS` | `int` | `300` | Maximum shell execution time in seconds |
| `CMDWALL_MAX_OUTPUT_BYTES` | `int` | `1000000` | Max combined stdout/stderr capture (1 MB) |
| `CMDWALL_AUDIT_LOG_PATH` | `str` | `"cmdwall_audit.jsonl"` | Path to write audit log entries |
| `CMDWALL_AUDIT_QUEUE_SIZE` | `int` | `1000` | Max in-memory audit queue depth before drops |
| `CMDWALL_VERIFY_SSL` | `bool` | `True` | SSL verification flag (available for provider use) |

**Using a `.env` file:**

```ini
CMDWALL_DEBUG_MODE=true
CMDWALL_DEFAULT_TIMEOUT_SECONDS=60
CMDWALL_ANTHROPIC_API_KEY=sk-ant-...
```

**Passing config programmatically:**

```python
from cmdwall import CMDWallConfig, CmdWall

config = CMDWallConfig(
    debug_mode=True,
    default_timeout_seconds=30,
    blocklist=["rm", "sudo"],
    allowlist=["ls", "echo", "git"]
)
wall = CmdWall(config=config, allowed_paths=["/workspace"])
```

---

## Custom Risk Rules

You can extend or override the default tier ruleset via the `custom_rules` constructor argument on `SafetyLevelClassifier`, or by calling `_add_rule` directly after instantiation. Custom rules are additive — they append to the existing rules list. The classifier always returns the highest matching tier.

```python
from cmdwall.security.safety_level import SafetyLevelClassifier

# Via constructor — dict of {tier: [regex_patterns]}
classifier = SafetyLevelClassifier(custom_rules={
    4: [r"^shred", r"^wipefs"],
    3: [r"^docker\s+run", r"^kubectl\s+delete"],
    1: [r"^docker\s+ps", r"^kubectl\s+get"]
})

# Or after construction
classifier._add_rule(r"^terraform\s+destroy", level=3, description="Terraform destroy")
```

Patterns are compiled with `re.IGNORECASE` and matched with `re.search` against the full command string.

To supply a custom classifier to `CmdWall`, subclass or monkey-patch `self.risk_classifier` after construction:

```python
wall = CmdWall(allowed_paths=["/workspace"])
wall.risk_classifier = SafetyLevelClassifier(custom_rules={4: [r"^shred"]})
```

---

## Implementing a Provider

Create a class that inherits from `BaseLLM` and implements `evaluate_command` and `provider_name`:

```python
from cmdwall.base.llm import BaseLLM
from cmdwall.models.models import GatekeeperResponse
from typing import Dict, Any

class MyOpenAIProvider(BaseLLM):

    @property
    def provider_name(self) -> str:
        return "openai"

    async def evaluate_command(
        self,
        command: str,
        reasoning: str,
        risk_tier: int,
        context: Dict[str, Any]
    ) -> GatekeeperResponse:
        # Call your LLM here. The prompt should ask for a structured
        # ALLOW / DENY / CHALLENGE decision with an explanation.
        # Must return a GatekeeperResponse.

        # Minimal example (stub):
        return GatekeeperResponse(
            decision="ALLOW",
            explanation="Approved by provider."
        )
```

`evaluate_command` must be an `async` method. If it raises, the gatekeeper catches the exception and returns `DENY`.

The `GatekeeperResponse` model is:

```python
class GatekeeperResponse(BaseModel):
    decision: Literal["ALLOW", "DENY", "CHALLENGE"]
    explanation: str
    suggested_alternative: Optional[str] = None  # provider may offer a safer alternative
    required_clarification: Optional[str] = None  # used with CHALLENGE decisions
```

---

## OpenAI Tool Call Integration

`CmdWall` includes a convenience method `handle_tool_call` for direct integration into OpenAI-compatible agent loops. It accepts an OpenAI `tool_call` object (anything with `.id` and `.function.arguments`), deserialises the JSON arguments, calls `execute()`, and returns a tool-role message dict ready to append to the conversation history.

```python
# Inside your agent loop:
for tool_call in response.choices[0].message.tool_calls:
    tool_message = await wall.handle_tool_call(tool_call)
    messages.append(tool_message)
```

The tool arguments must include `"command"` and optionally `"reasoning"`. Invalid JSON arguments return a `{"success": False, "error": "Failed to parse arguments: ..."}` response without raising.

**Expected tool schema** to send to the model:

```json
{
  "type": "function",
  "function": {
    "name": "bash",
    "description": "Execute a shell command.",
    "parameters": {
      "type": "object",
      "properties": {
        "command": {
          "type": "string",
          "description": "The shell command to execute."
        },
        "reasoning": {
          "type": "string",
          "description": "Why this command is needed. Required for risk assessment."
        }
      },
      "required": ["command", "reasoning"]
    }
  }
}
```

---

## Data Models

### `ExecutionResult`

Returned by every `execute()` call.

| Field | Type | Description |
|---|---|---|
| `success` | `bool` | `True` if the command ran and exited with code 0 |
| `stdout` | `str` | Captured stdout (empty string if blocked or on error) |
| `stderr` | `str` | Captured stderr |
| `denial_reason` | `Optional[str]` | Set for every blocked result; `None` for allowed commands |
| `risk_tier` | `Optional[int]` | The tier assigned by the classifier (1–4); `None` if blocked at sandbox/config stage |
| `gatekeeper_response` | `Optional[GatekeeperResponse]` | Full gatekeeper response when the LLM was consulted |

### `GatekeeperResponse`

| Field | Type | Description |
|---|---|---|
| `decision` | `Literal["ALLOW", "DENY", "CHALLENGE"]` | Gatekeeper verdict |
| `explanation` | `str` | Human-readable reason |
| `suggested_alternative` | `Optional[str]` | A safer command the gatekeeper suggests |
| `required_clarification` | `Optional[str]` | Question to ask the user on `CHALLENGE` |

---

## Audit Logging

Every call to `execute()` whether allowed or blocked, results in an audit entry written to a JSONL file. 

**Log entry structure (one JSON object per line):**

```json
{
  "timestamp": "2024-01-15T12:34:56.789012",
  "command": "curl https://api.example.com/data",
  "reasoning": "Fetching API response for analysis.",
  "context": {"os": "CmdWall Node (Linux)"},
  "result": {
    "success": false,
    "stdout": "",
    "stderr": "CMDWall Blocked: ...",
    "denial_reason": "Gatekeeper DENY: Network access not permitted.",
    "risk_tier": 2,
    "gatekeeper_response": {
      "decision": "DENY",
      "explanation": "Network access not permitted.",
      "suggested_alternative": null,
      "required_clarification": null
    }
  }
}
```


---

## Graceful Shutdown

Call `await wall.shutdown()` before your process exits. This signals the audit worker to stop, drains the remaining queue, and cancels the background task cleanly.

```python
try:
    result = await wall.execute(command="ls", reasoning="listing")
finally:
    await wall.shutdown()
```

Without calling `shutdown()`, in-flight audit entries may not be flushed to disk.

---

## Testing

The test suite uses `pytest` with `pytest-asyncio` in auto mode.

**Install test dependencies:**

```bash
pip install "cmdwall[test]"
```

**Run all tests:**

```bash
pytest
```

**Test modules:**

| File | Coverage |
|---|---|
| `tests/test_cmdwall.py` | End-to-end `CmdWall.execute()` and `handle_tool_call()` paths including gatekeeper mock providers, blocklist/allowlist config, timeout behaviour, and sandbox violations |
| `tests/test_classifier.py` | `SafetyLevelClassifier` — all four tiers, unknown-command defaults, custom rules, and highest-tier-wins logic |
| `tests/test_sandbox.py` | `SandboxValidator` — allowed/blocked paths, nested paths, flag token handling, `=`-prefixed paths, `$ENV`-style tokens, and symlink-adjacent cases |

### Test Results

![Test Results](assets/test_result.png)

---

## Project Layout

```
cmdwall/
├── cmdwall/
│   ├── __init__.py              # Exports: CmdWall, CMDWallConfig
│   ├── main.py                  # CmdWall class — orchestrates the full pipeline
│   ├── base/
│   │   └── llm.py               # BaseLLM abstract class
│   ├── core/
│   │   └── config.py            # CMDWallConfig (pydantic-settings)
│   ├── logging/
│   │   └── audit.py             # AuditLogger — async queue + JSONL writer
│   ├── models/
│   │   └── models.py            # GatekeeperResponse, ExecutionResult
│   └── security/
│       ├── gatekeeper.py        # GatekeeperLLM — wraps BaseLLM for command review
│       ├── safety_level.py      # SafetyLevelClassifier — regex tier engine
│       └── sandbox_validator.py # SandboxValidator — path containment
├── tests/
│   ├── test_cmdwall.py
│   ├── test_classifier.py
│   └── test_sandbox.py
└── pyproject.toml
```

---

## License

MIT

## Author

Made by **anky** — [codebyanika@gmail.com](mailto:codebyanika@gmail.com)

