Source code for jeevesagent.tools.builtin
"""Built-in tools for filesystem + shell agents.
Four tools that any :class:`~jeevesagent.Agent` can register to
gain the canonical "Claude-Code-shaped" capability set:
* :func:`read_tool` — read a text file
* :func:`write_tool` — create or overwrite a file
* :func:`edit_tool` — find-and-replace inside an existing file
* :func:`bash_tool` — run a shell command, capture output
All four are **factory functions**: they take a ``workdir`` (and
in the case of ``bash_tool`` a few safety knobs) and return a
ready-to-register :class:`Tool` instance. The closure captures the
workdir, so the resulting tool is workdir-scoped — the model can't
escape it via ``../`` traversal or absolute paths.
Usage::
from jeevesagent import (
Agent, read_tool, write_tool, edit_tool, bash_tool,
)
agent = Agent(
"You are a research agent.",
model="gpt-4.1-mini",
tools=[
read_tool(workdir="/tmp/agent_work"),
write_tool(workdir="/tmp/agent_work"),
edit_tool(workdir="/tmp/agent_work"),
bash_tool(workdir="/tmp/agent_work", timeout=30.0),
],
)
Or as a bundle::
from jeevesagent import filesystem_tools, bash_tool
agent = Agent(
"...",
model="...",
tools=filesystem_tools("/tmp/agent_work") + [bash_tool("/tmp/agent_work")],
)
Safety
------
* **Workdir-scoped by default.** Read / write / edit refuse paths
that resolve outside the workdir. ``bash_tool`` runs commands
with the workdir as ``cwd``.
* **Timeout on bash.** Default 30 seconds; override via the
``timeout`` kwarg. Commands that exceed the timeout are killed.
* **Destructive-command denylist.** ``bash_tool`` rejects a small
set of obviously-dangerous patterns (``rm -rf /``, ``sudo``,
``mkfs``, etc.) by default. Override via the ``allow_pattern``
callable for advanced use.
* **Edit requires unique match.** ``edit_tool``'s ``old_string``
must appear EXACTLY once in the file (unless ``replace_all=True``
is passed in the call) — forces the model to provide enough
context for unambiguous edits, the same approach Claude Code
takes.
These will be the foundation of the upcoming Deep Agent architecture
(planner + filesystem state + subagent registry).
"""
from __future__ import annotations
import re
import shutil
import tempfile
from collections.abc import Callable
from pathlib import Path
import anyio
from .registry import Tool
# ---------------------------------------------------------------------------
# Default workdir — lazily created on first use, shared across all
# built-in tool factories that don't get an explicit workdir. So
# ``read_tool()`` + ``write_tool()`` + ``edit_tool()`` + ``bash_tool()``
# all see the same tempdir without the caller wiring it up.
# ---------------------------------------------------------------------------
_DEFAULT_WORKDIR: Path | None = None
[docs]
def default_workdir() -> Path:
"""Return the framework's default workdir for built-in tools,
creating it lazily on first call.
The directory is a fresh tempdir under ``$TMPDIR/jeeves_agent_*``,
created once per process. All built-in tool factories share it
when called without an explicit ``workdir`` argument, so an
Agent that registers ``read_tool()`` and ``write_tool()`` (no
args) sees the same place.
The directory is NOT auto-cleaned at process exit — leave that
to the OS's tempdir cleanup so debug data survives a crash.
"""
global _DEFAULT_WORKDIR
if _DEFAULT_WORKDIR is None:
_DEFAULT_WORKDIR = Path(
tempfile.mkdtemp(prefix="jeeves_agent_")
).resolve()
return _DEFAULT_WORKDIR
def _resolve_workdir(workdir: Path | str | None) -> Path:
"""Resolve an explicit workdir or fall back to the shared default."""
if workdir is None:
return default_workdir()
return Path(workdir).resolve()
# ---------------------------------------------------------------------------
# Path safety
# ---------------------------------------------------------------------------
def _resolve_within(workdir: Path, rel: str) -> Path:
"""Resolve ``rel`` under ``workdir``; raise if it escapes."""
target = (workdir / rel).resolve()
workdir_resolved = workdir.resolve()
try:
target.relative_to(workdir_resolved)
except ValueError as exc:
raise PathEscapeError(
f"path {rel!r} escapes workdir {str(workdir_resolved)!r}"
) from exc
return target
[docs]
class PathEscapeError(ValueError):
"""Raised when a tool argument resolves outside its workdir."""
# ---------------------------------------------------------------------------
# read_tool
# ---------------------------------------------------------------------------
_DEFAULT_READ_LINE_LIMIT = 2000
[docs]
def read_tool(
workdir: Path | str | None = None,
*,
name: str = "read",
line_limit: int = _DEFAULT_READ_LINE_LIMIT,
) -> Tool:
"""Build a :class:`Tool` that reads a text file under ``workdir``.
The tool's signature seen by the model:
``read(path: str, offset: int = 0, limit: int | None = None)``
Returns the file's text with line numbers prefixed (one
line per output line), in the same format Claude Code's Read
tool uses — that lets the ``edit`` tool work without ambiguity
later. Long files are truncated to ``line_limit`` lines per
call; pass ``offset`` / ``limit`` to read further chunks.
Errors (file-not-found, path-escape) are returned as a string
starting with ``"ERROR: "`` rather than raising — the model
sees them as a tool result and can adjust.
``workdir`` is optional; ``None`` uses the framework's default
tempdir (shared with the other built-in tools called without
a workdir).
"""
workdir_path = _resolve_workdir(workdir)
async def _read(
path: str,
offset: int = 0,
limit: int | None = None,
) -> str:
try:
target = _resolve_within(workdir_path, path)
except PathEscapeError as exc:
return f"ERROR: {exc}"
if not target.exists():
return f"ERROR: file not found: {path}"
if not target.is_file():
return f"ERROR: not a regular file: {path}"
text = await anyio.to_thread.run_sync(target.read_text)
lines = text.splitlines()
end = (
min(offset + limit, len(lines))
if limit is not None
else min(offset + line_limit, len(lines))
)
chunk = lines[offset:end]
numbered = "\n".join(
f"{offset + i + 1:6d}\t{line}"
for i, line in enumerate(chunk)
)
if end < len(lines):
numbered += (
f"\n... ({len(lines) - end} more line(s); "
f"call read() again with offset={end})"
)
return numbered or "(empty file)"
return Tool(
name=name,
description=(
f"Read a text file under {str(workdir_path)}. Returns "
"the contents with 1-indexed line numbers prefixed "
"(format: ' N\\tline content'). Use these line "
"numbers when planning edits. For long files, pass "
"offset (default 0) and limit (default "
f"{_DEFAULT_READ_LINE_LIMIT}) to page through."
),
fn=_read,
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"Path relative to the workdir. Cannot "
"escape the workdir via '..' or absolute "
"paths."
),
},
"offset": {
"type": "integer",
"description": (
"Zero-indexed line offset to start reading "
"from. Default 0."
),
},
"limit": {
"type": "integer",
"description": (
"Max lines to return. Default "
f"{_DEFAULT_READ_LINE_LIMIT}."
),
},
},
"required": ["path"],
},
)
# ---------------------------------------------------------------------------
# write_tool
# ---------------------------------------------------------------------------
[docs]
def write_tool(
workdir: Path | str | None = None,
*,
name: str = "write",
create_parents: bool = True,
) -> Tool:
"""Build a :class:`Tool` that writes / overwrites a text file
under ``workdir``.
The tool's signature seen by the model:
``write(path: str, content: str)``
Overwrites existing files. With ``create_parents=True`` (the
default), missing parent directories are created automatically.
Returns a confirmation string with the byte count, or an
``"ERROR: "``-prefixed message on failure.
``workdir`` is optional; ``None`` uses the framework's default
tempdir (shared with the other built-in tools).
"""
workdir_path = _resolve_workdir(workdir)
async def _write(path: str, content: str) -> str:
try:
target = _resolve_within(workdir_path, path)
except PathEscapeError as exc:
return f"ERROR: {exc}"
if create_parents:
target.parent.mkdir(parents=True, exist_ok=True)
elif not target.parent.exists():
return (
f"ERROR: parent directory does not exist: "
f"{target.parent.relative_to(workdir_path)}"
)
await anyio.to_thread.run_sync(target.write_text, content)
return (
f"wrote {len(content)} bytes to {path} "
f"({content.count(chr(10)) + 1} line(s))"
)
return Tool(
name=name,
description=(
f"Create or OVERWRITE a text file under {str(workdir_path)}. "
"Use this for new files or full rewrites; use the edit "
"tool for in-place modifications. Parent directories "
"are created automatically. Returns byte count + line "
"count on success."
),
fn=_write,
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"Path relative to the workdir. Cannot "
"escape the workdir."
),
},
"content": {
"type": "string",
"description": "Full text content to write.",
},
},
"required": ["path", "content"],
},
destructive=True, # overwrites — flag for permission policies
)
# ---------------------------------------------------------------------------
# edit_tool
# ---------------------------------------------------------------------------
[docs]
def edit_tool(
workdir: Path | str | None = None,
*,
name: str = "edit",
) -> Tool:
"""Build a :class:`Tool` that does find-and-replace inside an
existing file under ``workdir``.
The tool's signature seen by the model:
``edit(path: str, old_string: str, new_string: str,
replace_all: bool = False)``
Behaviour matches Claude Code's Edit tool:
* ``old_string`` must be EXACTLY present in the file. Mismatch
(whitespace, indentation, line breaks) → error.
* ``old_string`` must appear EXACTLY once in the file unless
``replace_all=True`` is passed — forces the model to give
enough surrounding context for unambiguous matches.
* ``new_string`` replaces ``old_string`` (or every occurrence
if ``replace_all=True``).
``workdir`` is optional; ``None`` uses the framework's default
tempdir (shared with the other built-in tools).
"""
workdir_path = _resolve_workdir(workdir)
async def _edit(
path: str,
old_string: str,
new_string: str,
replace_all: bool = False,
) -> str:
try:
target = _resolve_within(workdir_path, path)
except PathEscapeError as exc:
return f"ERROR: {exc}"
if not target.exists():
return f"ERROR: file not found: {path}"
if not target.is_file():
return f"ERROR: not a regular file: {path}"
text = await anyio.to_thread.run_sync(target.read_text)
count = text.count(old_string)
if count == 0:
return (
f"ERROR: old_string not found in {path}. "
"It must match EXACTLY (whitespace, indentation, "
"line breaks)."
)
if count > 1 and not replace_all:
return (
f"ERROR: old_string appears {count} times in "
f"{path}; pass replace_all=True or provide more "
"surrounding context to make the match unique."
)
if replace_all:
new_text = text.replace(old_string, new_string)
replaced = count
else:
new_text = text.replace(old_string, new_string, 1)
replaced = 1
await anyio.to_thread.run_sync(target.write_text, new_text)
return (
f"edited {path}: replaced {replaced} occurrence(s) "
f"({len(text)} → {len(new_text)} bytes)"
)
return Tool(
name=name,
description=(
f"Modify an existing text file under {str(workdir_path)} "
"by replacing an exact string. ``old_string`` must match "
"the file's contents EXACTLY (including whitespace) and "
"must be unique in the file (unless ``replace_all=True``). "
"If you don't have enough context for a unique match, "
"read the file first to grab surrounding lines. Returns "
"the number of replacements + new byte count on success."
),
fn=_edit,
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"Path relative to the workdir. File must exist."
),
},
"old_string": {
"type": "string",
"description": (
"The exact string to replace, including "
"any surrounding whitespace needed for "
"uniqueness."
),
},
"new_string": {
"type": "string",
"description": (
"The replacement string. Can be empty to "
"delete the matched region."
),
},
"replace_all": {
"type": "boolean",
"description": (
"If true, replace every occurrence of "
"old_string. Default false (require "
"uniqueness)."
),
},
},
"required": ["path", "old_string", "new_string"],
},
destructive=True, # mutates files — flag for permission policies
)
# ---------------------------------------------------------------------------
# bash_tool
# ---------------------------------------------------------------------------
# Patterns that indicate a clearly-dangerous shell command. The
# default deny list is conservative — users can override entirely
# via ``allow_pattern``.
# Patterns that indicate a clearly-dangerous shell command. The
# ``rm -rf /`` pattern matches only when the trailing slash is
# directly followed by whitespace or end-of-line — so
# ``rm -rf /tmp/foo`` (a perfectly normal cleanup) is NOT flagged.
_DEFAULT_DENY_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(r"\brm\s+-r[fr]*\s+/(?:\s|$)"), # rm -rf / (root only)
re.compile(r"\bsudo\b"),
re.compile(r"\bmkfs\b"),
re.compile(r"\bdd\s+if=.*of=/dev/"),
re.compile(r":\(\)\{.*\}\s*;\s*:"), # fork bomb
re.compile(r">\s*/dev/sd"), # write to raw disk
re.compile(r"\bchmod\s+(?:777|-R\s+777)\s+/(?:\s|$)"), # chmod 777 /
)
[docs]
def bash_tool(
workdir: Path | str | None = None,
*,
name: str = "bash",
timeout: float = 30.0,
allow_pattern: Callable[[str], bool] | None = None,
extra_env: dict[str, str] | None = None,
) -> Tool:
"""Build a :class:`Tool` that runs a shell command with the
workdir as the current working directory.
Default safety:
* Commands matching the built-in destructive patterns
(``rm -rf /``, ``sudo``, ``mkfs``, fork bombs, ...) are
rejected before being executed.
* Commands run with a default ``timeout`` of 30 seconds; the
subprocess is killed on timeout.
* The shell is invoked via ``/bin/sh -c <command>``, so
pipelines + redirections work the way you'd expect.
Knobs:
* ``allow_pattern`` — a callable that takes the command string
and returns True if the command should run. When provided, it
OVERRIDES the default deny list — you take full responsibility.
* ``extra_env`` — extra environment variables merged into the
subprocess env.
* ``timeout`` — seconds before the command is killed.
``workdir`` is optional; ``None`` uses the framework's default
tempdir (shared with the other built-in tools).
"""
workdir_path = _resolve_workdir(workdir)
def _is_allowed(command: str) -> tuple[bool, str | None]:
if allow_pattern is not None:
if allow_pattern(command):
return True, None
return False, "blocked by user-supplied allow_pattern"
for pat in _DEFAULT_DENY_PATTERNS:
if pat.search(command):
return (
False,
f"blocked: matches denylist pattern {pat.pattern!r}",
)
return True, None
async def _bash(command: str, timeout_sec: float | None = None) -> str:
ok, reason = _is_allowed(command)
if not ok:
return f"ERROR: {reason}"
effective_timeout = (
float(timeout_sec) if timeout_sec is not None else timeout
)
# Build env: parent env + any extras the user passed in.
import os as _os
env = dict(_os.environ)
if extra_env:
env.update(extra_env)
# Use anyio's subprocess support so we don't block the loop.
try:
with anyio.fail_after(effective_timeout):
process = await anyio.run_process(
["/bin/sh", "-c", command],
cwd=str(workdir_path),
env=env,
check=False,
)
except TimeoutError:
return (
f"ERROR: command timed out after "
f"{effective_timeout:.1f}s: {command!r}"
)
stdout = (
process.stdout.decode("utf-8", errors="replace")
if process.stdout
else ""
)
stderr = (
process.stderr.decode("utf-8", errors="replace")
if process.stderr
else ""
)
rc = process.returncode
# Truncate huge outputs so the model isn't blown up
max_len = 10_000
if len(stdout) > max_len:
stdout = (
stdout[:max_len]
+ f"\n... (truncated, {len(stdout) - max_len} more bytes)"
)
if len(stderr) > max_len:
stderr = (
stderr[:max_len]
+ f"\n... (truncated, {len(stderr) - max_len} more bytes)"
)
sections = [f"$ {command}\n[exit={rc}]"]
if stdout:
sections.append("--- stdout ---\n" + stdout.rstrip())
if stderr:
sections.append("--- stderr ---\n" + stderr.rstrip())
if not stdout and not stderr:
sections.append("(no output)")
return "\n".join(sections)
return Tool(
name=name,
description=(
f"Run a shell command with cwd={str(workdir_path)}. "
f"Default timeout {timeout:.0f}s. Returns stdout + "
"stderr + exit code in a structured block. The default "
"deny list rejects obviously-dangerous patterns "
"(rm -rf /, sudo, mkfs, fork bombs, ...). For "
"long-running commands, pass timeout_sec to override."
),
fn=_bash,
input_schema={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": (
"Shell command to run via /bin/sh -c. "
"Pipelines and redirections work as expected."
),
},
"timeout_sec": {
"type": "number",
"description": (
"Override the default timeout for this "
"call (seconds). Optional."
),
},
},
"required": ["command"],
},
destructive=True, # arbitrary command execution
)
# ---------------------------------------------------------------------------
# Bundle helper
# ---------------------------------------------------------------------------
[docs]
def filesystem_tools(
workdir: Path | str | None = None,
) -> list[Tool]:
"""Return all three filesystem tools (read + write + edit)
bound to a single workdir. ``bash_tool`` is excluded — pair
them only when you want shell access too.
``workdir`` is optional; ``None`` uses the framework's default
tempdir (shared with bash_tool() called the same way)."""
resolved = _resolve_workdir(workdir)
return [
read_tool(resolved),
write_tool(resolved),
edit_tool(resolved),
]
__all__ = [
"PathEscapeError",
"bash_tool",
"default_workdir",
"edit_tool",
"filesystem_tools",
"read_tool",
"write_tool",
]
# Re-exports kept here so the module is independently importable
# (some users might prefer ``from jeevesagent.tools.builtin import
# read_tool``).
_ = shutil # noqa: F401 — reserved for potential future use (cp helper)