Dependency Graph
▶agent.py425 LOC
Core agent loop: neutral message format, multi-provider streaming.
Classes (5)
class AgentState
Mutable session state. messages use the neutral provider-independent format.
class ToolStart
class ToolEnd
class TurnDone
class PermissionRequest
Functions (4)
def _interruptible_stream(gen)
Run a generator in a daemon thread, yield events via Queue.
Ctrl+C (KeyboardInterrupt) is always deliverable because the main
thread only blocks on queue.get(timeout=0.1) — never on a raw socket.
def run(user_message: str, state: AgentState, config: dict, system_prompt: str, depth: int = 0, cancel_check = None)
Multi-turn agent loop (generator).
Yields: TextChunk | ThinkingChunk | ToolStart | ToolEnd |
PermissionRequest | TurnDone
Args:
depth: sub-agent nesting depth, 0 for top-level
cancel_check: callable returning True to abort the loop early
def _check_permission(tc: dict, config: dict)
Return True if operation is auto-approved (no need to ask user).
def _permission_desc(tc: dict)
Imports
__future__compactiondataclassesgovernanceospathlibprovidersqueuethreadingtimetool_registrytoolstypinguuid
▶backend/__init__.py63 LOC
Dulus — Backend + Smart Context + Plugins + Personas + MemPalace.
Imports
backend.compressorbackend.contextbackend.marketplacebackend.mempalace_bridgebackend.personasbackend.pluginsbackend.tasks
▶backend/agents_bridge.py80 LOC
Agent info bridge — transforms personas / sub-agent tasks into AgentInfo format.
Functions (1)
def build_agent_info_list()
Return agents in AgentInfo format for the sandbox AgentMonitor.
Tries real SubAgentManager tasks first, then falls back to personas
so the UI always has something to show.
Imports
__future__
▶backend/compressor.py261 LOC
Hybrid Context Compressor (#29) — qwen2.5:3b via Ollama + rule-based fallback.
Zero mandatory dependencies. Uses urllib (stdlib) to probe Ollama.
If Ollama is unavailable, falls back to intelligent rule-based compression.
Functions (12)
def _ollama_available(timeout: float = 2.0)
Probe Ollama /api/tags to see if the server is up.
def _qwen_loaded(timeout: float = 3.0)
Check if qwen2.5:3b is available in Ollama.
def summarize_with_qwen(text: str, max_tokens: int = 100)
Call Ollama qwen2.5:3b to summarize a memory or text block.
def _remove_redundant_whitespace(text: str)
def _collapse_lists(text: str)
Turn bullet lists into comma-separated when possible.
def _strip_stopwords(text: str)
Aggressively remove common stopwords from sentences.
def _abbreviate_status(text: str)
Shorten common status words inside brackets only — avoid damaging names.
def _deduplicate_lines(text: str)
Remove exact duplicate lines.
def compress_with_rules(text: str, target_tokens: int = 200)
Intelligent rule-based compression — no LLM required.
Strategy: preserve all IDs, names, and statuses. Only remove fluff.
def compress(text: str, max_tokens: int = 200)
Compress context using rule-based method.
qwen2.5:3b is reserved for memory summarization (summarize_with_qwen)
because full-context compression is too destructive.
Returns dict with:
- compressed: str
- method: "rules"
- before_tokens: int
- after_tokens: int
- saved_tokens: i
def compress_compact_context(text: str, max_tokens: int = 200)
One-liner: returns just the compressed string.
def summarize_memory(name: str, body: str)
Use qwen2.5:3b to summarize a single memory body if Ollama is available.
Falls back to truncating to 120 chars.
Imports
jsonretextwraptypingurllib.request
▶backend/context.py329 LOC
Smart Context Manager (#23 + #28) — generates optimized context for LLM sessions.
Functions (14)
def run_git(args: list[str])
def get_recent_commits(n: int = 5)
def get_changed_files()
def get_repo_stats()
def get_active_tasks_summary()
def build_context()
Build comprehensive session context with real MemPalace memories.
def load_context()
def get_user_max_tokens()
def estimate_tokens(text: str)
Rough token estimation: ~4 chars per token for English/code.
def get_context_mode(token_pct: float)
def record_compaction(reason: str, before_tokens: int, after_tokens: int)
def build_smart_context()
Build context with token estimation and mode detection.
When mode is compact or emergency, applies rule-based compression
to keep context under budget. qwen2.5:3b is used for memory
summarization via mempalace_bridge, not for full-context compression.
def force_compaction()
Manually force compression of the context.
def get_compact_context(max_tokens_estimate: int = 800)
Generate ultra-dense text context for LLM prompt injection.
Imports
backend.compressorbackend.mempalace_bridgebackend.personasbackend.tasksjsonospathlibsubprocesstimetyping
▶backend/githook.py166 LOC
Git hook management for Dulus.
Functions (5)
def _hook_path()
def is_dulus_hook(path: Path)
def install()
def uninstall()
def status()
Imports
ospathlibsubprocesssys
▶backend/marketplace.py141 LOC
Plugin Marketplace — esqueleto y registry de plugins disponibles. (#20)
Este módulo maneja:
- Registry local de plugins conocidos
- Metadatos de plugins del marketplace
- Instalación simulada/remota de plugins
Functions (7)
def load_registry()
def save_registry(registry: list[dict[str, Any]])
def get_plugin_by_id(plugin_id: str)
def install_plugin(plugin_id: str)
def uninstall_plugin(plugin_id: str)
def search_plugins(query: str = '', tag: str = '')
def get_stats()
Imports
jsonpathlibtyping
▶backend/mempalace_bridge.py182 LOC
MemPalace Bridge (#28) — connects Dulus Context Manager with real MemPalace memories.
Design: The bridge reads from a JSON cache maintained by the AI runtime.
When the AI has tool access, it refreshes the cache with real memories.
When running standalone (server.py, dulus.py), it reads the cached data.
This avoids requiring tool-injected globals inside Python subprocesses.
Functions (7)
def _parse_memory_document(doc: str)
Parse a memory markdown document with YAML frontmatter.
def refresh_cache(raw_memories: list[dict[str, Any]], wings: list[str] | None = None)
Called by the AI runtime when tools are available to refresh memory cache.
Args:
raw_memories: List of memory items from wakeup_context/search_memory tools.
wings: Optional list of wing names discovered.
def load_cache()
Load memory cache from disk. Returns empty-safe dict.
def get_memories(max_items: int = 10)
Get deduplicated, ranked memories for context injection.
def _get_summary(name: str, body: str)
Get summary for a memory body — uses qwen if available, else truncates.
def get_mempalace_compact_text(max_memories: int = 6)
Generate ultra-dense MemPalace context for prompt injection.
Uses qwen2.5:3b via Ollama to summarize memory bodies when available.
Falls back to truncation if Ollama is offline.
def get_mempalace_context_block()
Structured block for JSON context (used by build_context).
Imports
jsonospathlibtimetyping
▶backend/personas.py297 LOC
Sistema de Personas (#19 + #22) — perfiles de agente con identidad visual y comportamiento.
Cada persona define:
- Identidad: nombre, avatar, color, rol
- Comportamiento: estilo de respuesta, tono, fragmento de system prompt
- Metadatos: creador, versión, tags
Uso:
from backend.personas import get_persona, get_all_personas, set_active_persona
persona = get_persona("kimi-code3")
print(persona.avatar) # 🦅
Functions (14)
def _ensure_defaults()
Seed personas if none exist.
def load_personas()
def save_personas(personas: list[dict[str, Any]])
def get_persona(pid: str)
def get_all_personas()
def create_persona(data: dict[str, Any])
def update_persona(pid: str, data: dict[str, Any])
def delete_persona(pid: str)
def get_active_persona()
Return the currently active persona, defaulting to Dulus.
def set_active_persona(pid: str)
Set active persona by ID, ensuring only one is active.
def get_personas_summary()
Lightweight list for context injection and dashboards.
def get_persona_context_block()
Structured block for JSON context (used by build_context).
def get_personas_for_context()
Return persona list for context.py compatibility.
def get_persona_compact_text(max_chars: int = 200)
Ultra-dense active persona text for prompt injection.
Imports
backend.mempalace_bridgejsonpathlibtimetyping
▶backend/plugins.py222 LOC
Hot-loadable plugin system for Dulus.
Functions (17)
def register_hook(name: str, fn: Callable)
def unregister_plugin_hooks(name: str)
Remove all hooks registered by a given plugin name.
def trigger_hook(name: str, *args, **kwargs)
def discover_plugins()
def load_plugin(path: Path)
def unload_plugin(name: str)
Unload a plugin by name, removing hooks and registry entry.
def reload_plugin(path: Path)
def load_all_plugins()
def get_plugin_info()
Return serializable plugin metadata (no module objects).
def get_plugin_registry()
Return raw registry (includes module objects; not JSON-safe).
def _take_snapshot()
def _scan_changes()
Return (added, modified, removed) plugin names.
def _watcher_loop(broadcast_fn: Callable | None = None)
Daemon thread loop: poll plugins/ dir for changes.
def start_watcher(broadcast_fn: Callable | None = None)
Start the plugins directory watcher. Returns False if already running.
def stop_watcher()
Stop the plugins directory watcher.
def watcher_status()
def create_example_plugin()
Imports
importlib.utiljsonpathlibsysthreadingtimetyping
▶backend/server.py595 LOC
Zero-dependency HTTP server for Dulus Dashboard + API + SSE Live Updates.
Classes (1)
class DulusHandler
↳ log_message(self, fmt, *args)
↳ _safe_handle(self, handler_fn)
Wrap request handlers so unhandled exceptions return 500 instead of killing the server thread.
↳ _json_response(self, data, status = 200)
↳ _text_response(self, text, status = 200, content_type = 'text/plain; charset=utf-8')
↳ _error(self, msg, status = 400)
↳ _parse_query(self)
↳ _sse_stream(self, client_q: queue.Queue)
Send SSE headers and stream from queue until client disconnects.
↳ _do_GET(self)
↳ _do_POST(self)
↳ do_GET(self)
↳ do_POST(self)
↳ do_OPTIONS(self)
Functions (5)
def _add_sse_client(q: queue.Queue)
def _remove_sse_client(q: queue.Queue)
def broadcast_event(event_type: str, payload: dict)
Broadcast JSON event to all connected SSE clients.
def _sse_heartbeat()
Send periodic ping to keep connections alive.
def run_server(port: int = 8000)
Imports
backend.agents_bridgebackend.contextbackend.personasbackend.pluginsbackend.taskshttp.serverjsonospathlibqueuethreadingtimeurllib.parse
▶backend/tasks.py213 LOC
Task storage with JSON persistence.
Functions (5)
def load_tasks()
def save_tasks(tasks: list[dict[str, Any]])
def get_task(tid: str)
def update_task(tid: str, data: dict[str, Any])
def create_task(data: dict[str, Any])
Imports
jsonpathlibtimetyping
▶batch_api.py472 LOC
Dulus Batch API — provider-agnostic OpenAI-compatible batch processing.
Works with any provider that supports the OpenAI Batch API format:
- OpenAI (api.openai.com)
- Kimi/Moonshot (api.moonshot.ai)
- Any OpenAI-compatible endpoint
Usage:
mgr = BatchManager(api_key="sk-...", base_url="https://api.openai.com")
jsonl = mgr.prepare_jsonl(["prompt1", "prompt2"], model="gpt-4o-mini")
file_id = mgr.upload_file(jsonl)
batch_id = mgr.create_batch(file_id)
Classes (2)
class BatchManager
Provider-agnostic manager for the OpenAI-compatible Batch API.
↳ __init__(self, api_key: str, base_url: str = OPENAI_BASE_URL)
↳ _headers(self, content_type: str = 'application/json')
↳ prepare_jsonl(self, prompts: List[str], model: str = 'gpt-4o-mini', system_prompt: str = None, endpoint: str = '/v1/chat/completions')
Convert a list of prompts into JSONL content for the Batch API.
Args:
prompts: List of user prompts.
model: Model name (provider-specific).
system_prompt: Defaults to BATCH_
↳ upload_file(self, jsonl_content: str, filename: str = 'batch_input.jsonl')
Upload JSONL content and return the file_id.
↳ create_batch(self, file_id: str, endpoint: str = '/v1/chat/completions', completion_window: str = '24h')
Create a batch from an uploaded file. Returns batch_id.
↳ retrieve_batch(self, batch_id: str)
Get batch status/info.
↳ cancel_batch(self, batch_id: str)
Cancel a running batch.
↳ get_file_content(self, file_id: str)
Download file content (e.g. batch results).
class AnthropicBatchManager
Manager for Anthropic's native batch API (claude messages).
Same surface as BatchManager (prepare / create / retrieve / results /
cancel) so the rest of dulus can treat it interchangeably.
↳ __init__(self, api_key: str)
↳ prepare_requests(self, prompts: List[str], model: str = None, system_prompt: str = None, max_tokens: int = 1024)
Build the requests array for batches.create().
↳ create_batch(self, requests: List[Dict[str, Any]])
Create a batch inline. Returns batch_id.
↳ retrieve_batch(self, batch_id: str)
Get batch status. Normalizes field names to match BatchManager.
↳ cancel_batch(self, batch_id: str)
Cancel a running batch.
↳ results(self, batch_id: str)
Fetch all results for a completed batch.
Returns: [{custom_id, type, text, error?, usage}]
where type in {succeeded, errored, canceled, expired}
Functions (4)
def save_batch_job(batch_id: str, description: str = '', file_id: str = '', provider: str = 'unknown')
Save a batch job record locally in ~/.dulus/jobs/.
def list_batch_jobs(include_pollers: bool = True, **_kw)
List saved batch jobs from ~/.dulus/jobs/.
def update_batch_job_status(batch_id: str, status_info: Dict[str, Any])
Update a batch job's status in its local file.
def get_batch_job_by_id(batch_id: str)
Get a batch job by ID (checks both batch and poller files).
Imports
jsonostimetypingurllib.request
▶checkpoint/__init__.py27 LOC
Checkpoint system: automatic file snapshots with rewind support.
Imports
hooksstoretypes
▶checkpoint/hooks.py90 LOC
Checkpoint hooks: intercept Write/Edit/NotebookEdit to back up files before modification.
Import this module after tools are registered to install the hooks.
Functions (5)
def set_session(session_id: str)
def get_tracked_edits()
Return the current interval's tracked edits (for make_snapshot).
def reset_tracked()
Clear tracked edits after a snapshot is created.
def _backup_before_write(file_path: str)
Back up a file before it is modified (first-write-wins per snapshot interval).
def install_hooks()
Wrap Write/Edit/NotebookEdit tool functions to call backup before execution.
Imports
__future__pathlib
▶checkpoint/store.py314 LOC
Checkpoint store: file-level backup + snapshot persistence.
Directory layout:
~/.dulus/checkpoints/<session_id>/
snapshots.json # list of Snapshot metadata
backups/
<hash>@v<N> # actual file copies
Functions (17)
def _checkpoints_root()
def _session_dir(session_id: str)
def _backups_dir(session_id: str)
def _snapshots_file(session_id: str)
def _path_hash(file_path: str)
Deterministic short hash from file path (not content).
def _next_version(file_path: str)
def _load_snapshots(session_id: str)
def _save_snapshots(session_id: str, snapshots: list[Snapshot])
def track_file_edit(session_id: str, file_path: str)
Back up a file before it is edited (first-write-wins per snapshot interval).
Returns the backup filename, or None if the file doesn't exist yet.
def make_snapshot(session_id: str, state: Any, config: dict, user_prompt: str, tracked_edits: dict[str, str | None] | None = None)
Create a snapshot after a user prompt has been processed.
tracked_edits: dict mapping file_path → backup_filename (or None if new file).
Populated by hooks.py during the turn.
def list_snapshots(session_id: str)
Return lightweight summaries of all snapshots.
def get_snapshot(session_id: str, snapshot_id: int)
def rewind_files(session_id: str, snapshot_id: int)
Restore files to their state at the given snapshot.
Returns list of restored/deleted file paths.
def files_changed_since(session_id: str, snapshot_id: int)
List files that have been changed in snapshots after the given one.
def delete_session_checkpoints(session_id: str)
Delete all checkpoints for a session.
def cleanup_old_sessions(max_age_days: int = 30)
Remove checkpoint sessions older than max_age_days. Returns count removed.
def reset_file_versions()
Reset per-file version counters (for testing).
Imports
__future__datetimehashlibjsonospathlibshutiltimetypestyping
▶checkpoint/types.py80 LOC
Checkpoint system types: FileBackup and Snapshot dataclasses.
Classes (2)
class FileBackup
A single file's backup reference within a snapshot.
backup_filename: hash@vN name in the backups/ dir, or None if the file
did not exist before (meaning restore = delete).
version: monotonically increasing per-file version counter.
backup_time: ISO timestamp of when the backup was
↳ to_dict(self)
↳ from_dict(cls, data: dict)
class Snapshot
A checkpoint snapshot — metadata about conversation + file state.
↳ to_dict(self)
↳ from_dict(cls, data: dict)
Imports
__future__dataclassestyping
▶claude_code_watcher.py215 LOC
claude_code_watcher.py
Watches a Claude Code session JSONL file and extracts assistant responses
in real time. Can print to stdout or POST to a Dulus/webhook endpoint.
v2: Groups multi-part assistant turns (text + tool_use + text) into one
complete message before sending. Fixes the bug where text after a
tool call was sent as a separate/missing message.
Usage:
python claude_code_watcher.py
python claude_code_watcher.py --session <path_to.jsonl>
python claude_code_wat
Functions (7)
def find_latest_session()
Find the most recently modified JSONL session file.
def extract_text_blocks(entry: dict)
Return all text strings from an assistant entry's content blocks.
def has_tool_use(entry: dict)
True if this entry contains a tool_use block (mid-turn, more may follow).
def is_assistant(entry: dict)
def post_message(text: str, post_url: str)
def watch(session_path: Path, post_url: str | None = None, poll_interval: float = 0.5)
Tail the JSONL file and emit complete assistant turns.
def main()
Imports
argparsejsonospathlibsystime
▶clipboard_utils.py246 LOC
Classes (1)
class ClipboardResult
Result of reading media from the clipboard.
Both fields may be non-empty when the clipboard contains a mix of
image files and non-image files (videos, PDFs, etc.).
Functions (6)
def is_clipboard_available()
Check if the Pyperclip text clipboard is available.
def is_media_clipboard_available()
Check if the media clipboard (xclip/wl-paste) is available.
On headless Linux (e.g. SSH remote), pyperclip may fail because
DISPLAY is not set, but images can still be read through xclip or
wl-paste (e.g. via clipboard bridging tools like cc-clip that shim
xclip over an SSH tunnel).
def grab_media_from_clipboard()
Read media from the clipboard.
Inspects the clipboard once and returns all detected media.
Image files are returned as loaded PIL images; non-image files
(videos, PDFs, etc.) are returned as file paths.
On macOS the native pasteboard API is tried first to avoid
misidentifying a file's thumbnail as
def _grab_image_linux()
Read image from Linux clipboard with session-aware tool fallback.
Tries the backend matching the current session type first to avoid
reading stale data from the wrong clipboard (e.g. XWayland vs
Wayland). On headless systems with no session type, xclip is tried
first since clipboard bridges (e.g. c
def _classify_file_paths(paths: Iterable[os.PathLike[str] | str])
Classify clipboard file paths into images and non-image files.
Returns ``(images, non_image_paths)`` where *images* contains loaded
PIL images and *non_image_paths* contains paths to videos, documents,
and other non-image files.
def _read_clipboard_file_paths_macos_native()
Imports
PIL__future__collections.abcdataclassesimportlibioospathlibpyperclipshutilsubprocesssystyping
▶cloudsave.py159 LOC
Cloud sync for dulus sessions via GitHub Gist.
Supported provider: GitHub Gist
- No extra cloud account needed beyond a GitHub Personal Access Token
- Sessions stored as private Gists (JSON), browsable in GitHub UI
- Zero extra dependencies (uses urllib from stdlib)
Config keys (stored in ~/.dulus/config.json):
gist_token — GitHub Personal Access Token (needs 'gist' scope)
cloudsave_auto — bool: auto-upload on /exit
cloudsave_last_gist_id — last uploaded gist ID (for in-place
Functions (6)
def _request(method: str, path: str, token: str, body: dict | None = None)
def _request_safe(method: str, path: str, token: str, body: dict | None = None)
Like _request but returns (result, error_str).
def validate_token(token: str)
Check token is valid and has gist scope. Returns (ok, message).
def upload_session(session_data: dict, token: str, description: str = '', gist_id: str | None = None)
Create or update a Gist with the session JSON.
Returns (gist_id, error). On success gist_id is the Gist ID.
def list_sessions(token: str, max_results: int = 20)
List Gists tagged as dulus sessions.
Returns (list of {id, description, updated_at, url}), error).
def download_session(token: str, gist_id: str)
Fetch a Gist and return the parsed session JSON.
Returns (session_data, error).
Imports
__future__datetimejsonurllib.errorurllib.request
▶common.py210 LOC
Functions (11)
def _rgb(hex_str: str)
Convert '#rrggbb' → ANSI 24-bit foreground escape.
def apply_theme(name: str)
Mutate the global ANSI color map in-place to a named theme.
Themes carry 4 semantic roles (accent / ok / warn / err) that map onto
Dulus's ANSI key set. `ok` is intentionally distinct from `accent` so
info() (cyan-keyed) and ok() (green-keyed) stay visually separable.
A theme with `disable_color: T
def clr(text: str, *keys)
def info(msg: str)
def ok(msg: str)
def warn(msg: str)
def err(msg: str)
def stream_thinking(chunk: str, verbose: bool)
def print_tool_start(name: str, inputs: dict)
def print_tool_end(name: str, result: str, success: bool = True, verbose: bool = False, auto_show: bool = True)
def sanitize_text(text: str)
Remove invalid UTF-16 surrogates and ensure valid UTF-8.
On Windows consoles (cp1252) pasted emojis often become stray surrogates
(e.g. \ud83d\udcec) which later explode with:
'utf-8' codec can't encode characters: surrogates not allowed
This helper cleans them *once* at the boundary before the
Imports
jsonsys
▶compaction.py378 LOC
Context window management: two-layer compression for long conversations.
Functions (10)
def estimate_tokens(messages: list, model: str = '', config: dict | None = None)
Estimate token count.
For Kimi/Moonshot models, uses the native Kimi API token estimation endpoint
if API key is available. Otherwise falls back to character-based estimation.
Args:
messages: list of message dicts with "content" field (str or list of dicts)
model: model string (optional, e
def get_context_limit(model: str)
Look up context window size for a model.
Args:
model: model string (e.g. "claude-opus-4-6", "ollama/llama3.3")
Returns:
context limit in tokens
def snip_old_tool_results(messages: list, max_chars: int = 2000, preserve_last_n_turns: int = 6)
Truncate tool-role messages older than preserve_last_n_turns from end.
For old tool messages whose content exceeds max_chars, keep the first half
and last quarter, inserting '[... N chars snipped ...]' in between.
Mutates in place and returns the same list.
Args:
messages: list of message dict
def _score_message_priority(message: dict)
Score a message by importance (higher = more important to preserve).
Returns an integer priority score. Messages with score >= 3 are
considered 'high priority' and should be preserved during compaction.
def _is_safe_split(messages: list, idx: int)
A split is safe only if messages[idx] is not a `tool` message
(which would be orphaned from its assistant tool_calls partner).
def find_split_point(messages: list, keep_ratio: float = 0.3, model: str = '', config: dict | None = None)
Find index that splits messages so ~keep_ratio of tokens are in the recent portion.
Walks backwards from end, accumulating token estimates, and returns the
index where the recent portion reaches ~keep_ratio of total tokens.
Args:
messages: list of message dicts
keep_ratio: fraction of toke
def compact_messages(messages: list, config: dict, focus: str = '')
Compress old messages into a summary via LLM call.
Splits at find_split_point, summarizes old portion, returns
[summary_msg, ack_msg, *recent_messages].
Smart behavior: messages with high priority score (errors, decisions,
file references) are preserved verbatim instead of being summarized away.
def maybe_compact(state, config: dict)
Check if context window is getting full and compress if needed.
Runs snip_old_tool_results first, then auto-compact if still over threshold.
Args:
state: AgentState with .messages list
config: agent config dict (must contain "model")
Returns:
True if compaction was performed
def _restore_plan_context(config: dict)
If in plan mode, return messages that restore plan file context.
def manual_compact(state, config: dict, focus: str = '')
User-triggered compaction via /compact. Not gated by threshold.
Returns (success, info_message).
Imports
__future__providers
▶config.py186 LOC
Configuration management for Dulus (multi-provider).
Functions (9)
def _encrypt(value: str)
Encrypt a string with XOR + base64.
def _decrypt(value: str)
Decrypt a string encrypted with _encrypt.
def _secure_keys(cfg: dict)
Encrypt all *_api_key values before saving.
def _unsecure_keys(cfg: dict)
Decrypt all *_api_key values after loading.
def load_config()
def save_config(cfg: dict)
def current_provider(cfg: dict)
def has_api_key(cfg: dict)
Check whether the active provider has an API key configured.
def calc_cost(model: str, in_tokens: int, out_tokens: int)
Imports
jsonospathlib
▶context.py360 LOC
System context: DULUS.md, git info, cwd injection.
NOTE on prompt caching: this module is the source of the system prompt sent
to every provider call. To get prefix caching (Anthropic explicit + OpenAI-
compat automatic), the rendered prompt MUST be byte-stable across turns of
the same session. Anything that changes per turn (date with sub-day grain,
`git status` modified-file counts, `datetime.now()`, etc.) belongs OUTSIDE
this prompt. Disk reads (DULUS.md, MEMORY.md) are cached by mtime so a
Functions (11)
def get_git_info(config: dict | None = None)
Return ONLY the branch name — stable across turns within a session.
Previous versions also embedded `git status --short` modified-file count
and the last commit hash; both change as the user works, which trashed
prefix caching on every turn. The agent can call `git status` itself
when it actually n
def _resolve_dulus_md_paths()
def get_dulus_md()
def _resolve_memory_index_path()
def get_project_memory_index()
Auto-load project-scope memories from .dulus-context/memory/MEMORY.md.
Looks in cwd and parents (first match wins). Returns the index so the model
knows what memories exist and can Read individual files on demand. Cached
by mtime so unchanged indexes don't bust the prompt cache.
def _detect_shell_type(config: dict | None = None)
Resolve which shell family to advertise: 'bash', 'powershell', or 'cmd'.
def get_platform_hints(config: dict | None = None)
def _build_ollama_system_prompt(config: dict | None = None)
def _normalize_thinking_level(config: dict | None)
def _resolve_reply_language(config: dict | None)
def build_system_prompt(config: dict | None = None)
Imports
ospathlibsubprocess
▶data/__init__.py1 LOC
▶data/plugins/__init__.py1 LOC
▶data/plugins/composio/__init__.py1 LOC
▶data/plugins/composio/composio_plugin/__init__.py5 LOC
Composio plugin helpers for Falcon.
Imports
session_managertool_generator
▶data/plugins/composio/composio_plugin/session_manager.py71 LOC
Session manager for Composio integration.
Functions (4)
def _load_api_key()
Load Composio API key from Dulus config (with Falcon fallback) or env.
def get_client()
Get or create Composio client.
def get_or_create_session(user_id: str, toolkits: List[str], connected_accounts: Optional[Dict[str, str]] = None)
Create a Composio session with given toolkits.
def list_accounts()
List all connected accounts.
Imports
jsonospathlibtyping
▶data/plugins/composio/composio_plugin/tool_generator.py156 LOC
Tool generator - creates native Falcon .py files from Composio tool schemas.
Functions (4)
def _slug_to_func_name(slug: str)
Convert a Composio tool slug to a valid Python function name.
def _build_param_signature(params: Dict[str, Any])
Build Python function parameter signature from JSON schema.
def generate_tool_py(tool_slug: str, schema: Dict[str, Any], output_dir: Path, session_id: Optional[str] = None, user_id: Optional[str] = None)
Generate a standalone Falcon tool .py file for a Composio tool.
def generate_plugin_tool_py(tool_defs: list, output_path: Path)
Generate a plugin_tool.py exporting multiple ToolDefs.
Imports
jsonpathlibretyping
▶data/plugins/composio/plugin_tool.py434 LOC
Composio plugin for Falcon - native ToolDefs.
Connects to Composio Tool Router and exposes tools natively.
Functions (10)
def _serialize_result(result)
Serialize Composio result to JSON string.
def _get_session(params: dict)
Get or create session from params.
def composio_create_session(params: dict, config: dict)
Create a new Composio Tool Router session.
def composio_search_tools(params: dict, config: dict)
Search for available Composio tools by use case.
def composio_manage_connections(params: dict, config: dict)
Manage connections to apps (initiate OAuth/API key auth).
def composio_execute_tool(params: dict, config: dict)
Execute a Composio tool by slug with given arguments.
def composio_list_accounts(params: dict, config: dict)
List all connected Composio accounts and their status.
def composio_get_tool_schemas(params: dict, config: dict)
Get input schemas for Composio tools by slug.
def composio_generate_tool_py(params: dict, config: dict)
Generate a standalone .py file for a Composio tool.
Creates a native Falcon tool file that wraps a Composio tool.
def composio_generate_plugin_tool_py(params: dict, config: dict)
Generate a full plugin_tool.py exporting multiple Composio tools as native Falcon tools.
Imports
composio_plugin.session_managercomposio_plugin.tool_generatorjsonpathlibsystool_registrytyping
▶demos/make_brainstorm_demo.py367 LOC
Generate animated GIF demo of cheetahclaws /brainstorm command using PIL.
Simulates the full brainstorm session: agent count prompt → persona generation
→ multi-agent debate → synthesis.
Functions (19)
def make_font(size = FONT_SIZE, bold = False)
def seg(t, c = TEXT, b = False)
def render_line(draw, y, segments, x_start = PAD_X)
def blank_frame()
def draw_frame(lines_segments)
def prompt_line(text = '', cursor = False)
def ok_line(msg)
def info_line(msg)
def agent_thinking(icon, role)
def agent_done()
def claude_header()
def claude_sep()
def text_line(t, indent = 2)
def dim_line(t, indent = 2)
def tool_line(icon, name, arg)
def tool_ok(msg)
def build_scenes()
def _build_palette()
def render_gif(output_path)
Imports
PILos
▶demos/make_demo.py416 LOC
Generate animated GIF demo of cheetahclaws using PIL.
Simulates a realistic terminal session with tool calls.
Functions (18)
def make_font(size = FONT_SIZE, bold = False)
def seg(t, c = TEXT, b = False)
def segs(*args)
def render_line(draw, y, segments, x_start = PAD_X)
def blank_frame()
def draw_frame(lines_segments)
lines_segments: list of either
- list[Seg] → rendered as a line
- None → blank line
Returns PIL Image.
def prompt_line(text = '', cursor = False)
def claude_header()
def claude_sep()
def tool_line(icon, name, arg, color = CYAN)
def tool_ok(msg)
def tool_err(msg)
def text_line(t, indent = 2)
def dim_line(t, indent = 4)
def build_scenes()
Return list of (frame_content, duration_ms).
def _build_explicit_palette()
Build a 256-entry palette from our exact theme colors.
Returns flat list of 768 ints (R,G,B, R,G,B, ...) suitable for putpalette().
def render_gif(output_path = 'demo.gif')
def render_screenshot(output_path = 'screenshot.png')
Single high-quality screenshot showing a complete session.
Imports
PILostextwrap
▶demos/make_proactive_demo.py359 LOC
Generate animated GIF demo of cheetahclaws proactive / background-event feature.
Shows: timer reminder set → idle at prompt → [Background Event Triggered] →
Claude fires reminder → user asks again → second reminder fires.
Functions (16)
def make_font(size = FONT_SIZE, bold = False)
def seg(t, c = TEXT, b = False)
def render_line(draw, y, segments, x_start = PAD_X)
def blank_frame()
def draw_frame(lines_segments)
def prompt_line(text = '', cursor = False)
def ok_line(msg)
def claude_header()
def claude_sep()
def text_line(t, indent = 2)
def tool_line(icon, name, arg)
def tool_ok(msg)
def bg_event_line()
def build_scenes()
def _build_palette()
def render_gif(output_path)
Imports
PILos
▶demos/make_ssj_demo.py491 LOC
Generate animated GIF demo of cheetahclaws SSJ Developer Mode.
Shows: /ssj menu → Brainstorm → TODO viewer → Worker → Exit
Functions (18)
def make_font(size = FONT_SIZE, bold = False)
def seg(t, c = TEXT, b = False)
def render_line(draw, y, segments, x_start = PAD_X)
def draw_frame(lines_segments)
def prompt_line(text = '', cursor = False)
def ssj_prompt(text = '', cursor = False)
def claude_header()
def claude_sep()
def tool_line(icon, name, arg, color = CYAN)
def tool_ok(msg)
def text_line(t, indent = 2)
def dim_line(t, indent = 4)
def ok_line(t)
def info_line(t)
def err_line(t)
def build_scenes()
def _build_palette()
def render_gif(output_path = 'ssj_demo.gif')
Imports
PILos
▶demos/make_telegram_demo.py531 LOC
Generate animated GIF demo of cheetahclaws Telegram Bridge.
Shows: setup → auto-start → incoming messages → tool calls → response → stop
Functions (20)
def make_font(size = FONT_SIZE, bold = False)
def seg(t, c = TEXT, b = False)
def render_line(draw, y, segments, x_start = PAD_X)
def draw_phone(img, chat_messages)
Draw a minimal phone-style Telegram chat panel on the right.
chat_messages: list of (sender, text, color)
sender = "user" | "bot"
def draw_frame(lines_segments, chat_messages = None)
def prompt_line(text = '', cursor = False)
def ok_line(t)
def info_line(t)
def warn_line(t)
def dim_line(t, indent = 4)
def claude_header()
def claude_sep()
def tool_line(icon, name, arg, color = CYAN)
def tool_ok(msg)
def text_line(t, indent = 2)
def tg_incoming(text)
Telegram incoming message line shown in terminal.
def tg_sent(preview)
def build_scenes()
def _build_palette()
def render_gif(output_path = 'telegram_demo.gif')
Imports
PILos
▶display_blocks.py529 LOC
Display Blocks System - Visual representations of agent actions.
Provides a structured way to represent agent actions visually across multiple
frontends: CLI (dulus.py), WebChat (webchat_server.py), and Telegram.
Display blocks are dict-based and can be rendered in various formats.
Classes (1)
class DisplayBlockRenderer
Render display blocks in various formats for different frontends.
Supports CLI (terminal), HTML (WebChat), and Telegram (Markdown) output formats.
Each render method takes a display block dict and returns a formatted string.
↳ render_cli(block: dict)
Render a display block for CLI (terminal) output.
Args:
block: Display block dict with at least a 'type' key.
Returns:
Formatted string suitable for terminal display.
↳ render_html(block: dict)
Render a display block as HTML for WebChat.
Args:
block: Display block dict with at least a 'type' key.
Returns:
HTML string suitable for web display.
↳ render_telegram(block: dict)
Render a display block for Telegram (Markdown V2).
Args:
block: Display block dict with at least a 'type' key.
Returns:
Formatted string using Telegram Markdown V2 syntax.
↳ render(block: dict, format: Literal['cli', 'html', 'telegram'] = 'cli')
Render a display block in the specified format.
Args:
block: Display block dict with at least a 'type' key.
format: Target format - "cli", "html", or "telegram".
Returns:
Formatted strin
Functions (27)
def _render_diff_cli(block: dict)
Render a diff display block for CLI.
Shows a unified diff between old and new text.
def _render_todo_cli(block: dict)
Render a todo display block for CLI.
def _render_shell_cli(block: dict)
Render a shell command display block for CLI.
def _render_bg_task_cli(block: dict)
Render a background task display block for CLI.
def _render_think_cli(block: dict)
Render a think display block for CLI.
def _render_code_cli(block: dict)
Render a code display block for CLI.
def _render_table_cli(block: dict)
Render a table display block for CLI.
def _render_error_cli(block: dict)
Render an error display block for CLI.
def _render_unknown_cli(block: dict)
Render an unknown display block type for CLI.
def _render_diff_html(block: dict)
Render a diff display block as HTML.
def _render_todo_html(block: dict)
Render a todo display block as HTML.
def _render_shell_html(block: dict)
Render a shell command display block as HTML.
def _render_bg_task_html(block: dict)
Render a background task display block as HTML.
def _render_think_html(block: dict)
Render a think display block as HTML.
def _render_code_html(block: dict)
Render a code display block as HTML.
def _render_table_html(block: dict)
Render a table display block as HTML.
def _render_error_html(block: dict)
Render an error display block as HTML.
def _render_unknown_html(block: dict)
Render an unknown display block type as HTML.
def _render_diff_telegram(block: dict)
Render a diff display block for Telegram (Markdown V2).
Note: Telegram has limited formatting for diffs, so we keep it simple.
def _render_todo_telegram(block: dict)
Render a todo display block for Telegram (Markdown V2).
def _render_shell_telegram(block: dict)
Render a shell command display block for Telegram.
def _render_bg_task_telegram(block: dict)
Render a background task display block for Telegram.
def _render_think_telegram(block: dict)
Render a think display block for Telegram.
def _render_code_telegram(block: dict)
Render a code display block for Telegram.
def _render_table_telegram(block: dict)
Render a table display block for Telegram.
def _render_error_telegram(block: dict)
Render an error display block for Telegram.
def _render_unknown_telegram(block: dict)
Render an unknown display block type for Telegram.
Imports
__future__difflibhtmltyping
▶docs/__init__.py1 LOC
▶dulus.py11833 LOC
Dulus — Next-gen Python Autonomous Agent.
Usage:
python dulus.py [options] [prompt]
dulus [options] [prompt] (if dulus.bat is in PATH)
Options:
-p, --print Non-interactive: run prompt and exit (also --print-output)
-m, --model MODEL Override model (e.g., -m kimi/kimi-k2.5, -m gpt-4o)
--accept-all Never ask permission (dangerous)
--verbose Show thinking + token counts
--version Print version and exit
-h, --help Show t
Functions (141)
def _eager_extract_sandbox()
def _rl_safe(prompt: str)
Wrap ANSI escape sequences with \001/\002 so readline ignores them
when calculating visible prompt width. Fixes duplicate-on-scroll and
cursor-misalignment bugs in terminals that use readline.
def render_diff(text: str)
Print diff text with ANSI colors: red for removals, green for additions.
def _has_diff(text: str)
Check if text contains a unified diff.
def _make_renderable(text: str)
Return a Rich renderable: Markdown if text contains markup, else plain.
def _use_bubbles()
Whether to use bubblewrap chat-bubble mode (requires NerdFont + Rich).
def _wrap_in_bubble(renderable, raw_text: str = '')
Wrap a Rich renderable in a rounded Panel for chat-bubble effect.
Calculates a snug width from the raw text to prevent the Panel from
taking up 100% of the screen width when rendering Markdown rules/tables.
def _start_live()
Start a Rich Live block for in-place Markdown streaming (no-op if not Rich).
def stream_text(chunk: str)
Buffer chunk; update Live in-place when Rich available, else print directly.
Safety: if accumulated text exceeds _LIVE_LINE_LIMIT lines, auto-switch
from Rich Live to plain streaming to prevent terminal re-render duplication
on terminals that can't handle large Live areas (Windows Terminal, etc.).
def _count_visual_lines(text: str, width: int)
How many terminal rows did `text` occupy when streamed plain?
Counts wraps for long logical lines, ignores ANSI for length math.
Approximate (doesn't track double-width emoji exactly) but good
enough for the bubble re-render erase trick.
def flush_response()
Commit buffered text to screen: stop Live (freezes rendered Markdown in place).
def _run_tool_spinner()
Background spinner on a single line using carriage return.
In split-input mode stdout is redirected to _OutputRedirector (which
line-buffers and strips \r), so each spinner frame would eventually
accumulate into the output area. Skip writes in that case — the split
layout has its own visual afforda
def _start_tool_spinner(phrase: str | None = None)
def _change_spinner_phrase()
Change the spinner phrase without stopping it.
def _stop_tool_spinner()
def print_tool_start(name: str, inputs: dict, verbose: bool)
Show tool invocation.
def print_tool_end(name: str, result: str, verbose: bool, config: dict = None)
def _tool_desc(name: str, inputs: dict)
def ask_permission_interactive(desc: str, config: dict)
def _proactive_watcher_loop(config)
Background daemon that fires a wake-up prompt after a period of inactivity.
def _render_toggle_footer(config)
Print the toggle status block. Called at the bottom of every /help page
so the user always sees current state without scrolling.
def _render_help_page_telegram(config)
Telegram-friendly rendering: full categorized dump, no pagination.
Telegram users can scroll the message; pagination would need extra UX
wiring through the bot. Toggles are appended at the end once.
def cmd_help(_args: str, _state, config)
def cmd_model(args: str, _state, config)
def _generate_personas(topic: str, curr_model: str, config: dict, count: int = 5)
Ask the LLM to generate `count` topic-appropriate expert personas as a dict.
def _interactive_ollama_picker(config: dict)
Prompt the user to select from locally available Ollama models.
def cmd_brainstorm(args: str, state, config)
Run a multi-persona iterative brainstorming session on the project.
Usage: /brainstorm [topic]
def _save_synthesis(state, out_file: str)
Append the last assistant response as the synthesis section of the brainstorm file.
def _print_dulus_banner(config: dict, with_logo: bool = True)
Reprint the Dulus logo + info box (used by startup and /clear).
def cmd_clear(_args: str, state, config)
def _redact_secret(value)
Mask all but last 4 chars of a secret value.
def _is_secret_key(key: str)
def cmd_config(args: str, _state, config)
def _atomic_write_json(path: Path, data)
Write JSON atomically: write to .tmp sibling, then rename. Prevents
half-written files when the process is killed mid-save.
def _save_roundtable_session(log: list, save_path = None)
Save the full roundtable session log to a JSON file.
Sessions go under config.SESSIONS_DIR (~/.dulus/sessions/),
consistent with /save and other session artifacts. Pass an explicit
save_path to override (used to keep all turns of one debate in one file).
def cmd_save(args: str, state, config)
def save_latest(args: str, state, config = None, mode: str = 'full')
Save session on exit.
mode="full" → session_latest.json + daily/ copy + append to history.json (REPL default)
mode="daemon"→ only overwrite SESSIONS_DIR/session_<sid>.json, skip latest/history/daily.
In ``full`` mode we skip persistence entirely when the session has fewer
than ``MIN_AUTO_SA
def cmd_load(args: str, state, config)
def cmd_resume(args: str, state, config)
def cmd_history(_args: str, state, config)
def cmd_context(_args: str, state, config)
def cmd_cost(_args: str, state, config)
def cmd_verbose(_args: str, _state, config)
def cmd_brave(_args: str, _state, config)
def cmd_bocha(_args: str, _state, config)
def cmd_rtk(args: str, _state, config)
Toggle RTK transparent shell command rewriting (token-optimized output).
def cmd_git(_args: str, _state, config)
def cmd_daemon(args: str, _state, config)
def cmd_bg(args: str, _state, config)
Background Dulus via tmux session.
/bg start [--web-port PORT] — create detached tmux session running daemon
/bg stop — kill tmux session
/bg kill — alias of stop
/bg status — tmux session alive? IPC responding?
/bg attach
def cmd_webchat(args: str, state, config)
Start the in-process webchat mirror. /webchat stop kills it.
def cmd_sandbox(args: str, state, config)
Open the Dulus Sandbox OS in the browser.
/sandbox — Ensure webchat is running, open /sandbox in browser
/sandbox stop — Alias for /webchat stop
def cmd_gui(_args: str, _state, config)
Launch the desktop GUI from the REPL.
def cmd_max_fix(args: str, _state, config)
def cmd_thinking(_args: str, _state, config)
Set or toggle extended thinking.
/thinking — toggle between OFF and the last non-zero level (default 2)
/thinking 0|off — disable thinking entirely
/thinking 1|min — minimal: low budget + "think briefly" prompt hint
/thinking 2|med|medium — mod
def _normalize_thinking_level(value)
Coerce legacy bool/int/str thinking config into an int 0-4.
def cmd_lang(args: str, state, config)
Switch the language Dulus replies in.
Usage:
/lang — show current language and a curated picker
/lang <iso> — switch by ISO-639 code (en, es, zh, pt, ja, …)
/lang <free description> — any natural-language descriptor passed verbatim
def cmd_soul(args: str, state, config)
List available souls or switch the active one mid-session.
/soul — list souls + show active
/soul <name> — switch to <name> (e.g. chill, forensic) by injecting it
as an assistant message (same mechanism as startup load)
def cmd_schema(args: str, _state, _config)
Inspect tool schemas (human-facing; model doesn't see this command).
/schema — list all registered tools, grouped
/schema <tool> — show full input_schema + description for one tool
/schema --json <t> — raw JSON dump of the tool's schema
Useful for telling the agent
def cmd_deep_override(_args: str, _state, config)
def cmd_deep_tools(_args: str, _state, config)
def cmd_autojob(_args: str, _state, config)
def cmd_auto_show(_args: str, _state, config)
def cmd_schema_autoload(_args: str, _state, config)
Toggle auto-injection of the full tool schema inventory at startup.
ON → at boot, the agent sees a system message listing every registered
tool (name + description, grouped). Helps the model pick the right
tool instead of reinventing via Bash. Costs ~3-5k chars per session.
OFF → no in
def cmd_mem_palace(args: str, _state, config)
Toggle MemPalace per-turn memory injection.
/mem_palace → toggle the injection ON/OFF
/mem_palace print → toggle visibility: print to console what's being
injected to the model (debug — see klk pasa)
/mem_palace reset → clear the per-session dedup cache (allows
def cmd_harvest(_args: str, _state, config)
Harvest fresh cookies from claude.ai using Playwright.
Opens a visible Chrome window with a persistent profile.
If already logged in, cookies are collected automatically.
If not, log in manually then press ENTER in the terminal.
Cookies are saved to ~/.dulus/claude_cookies.json and any
active claud
def cmd_harvest_kimi(_args: str, _state, config)
Harvest fresh gRPC tokens from kimi.com (Consumer) using Playwright.
Opens a visible Chrome window and navigates to kimi.com.
You must send a single message in the browser chat for the script
to intercept the necessary gRPC-Web (Connect) headers and payloads.
Data is saved to ~/.dulus/kimi_consumer
def cmd_harvest_gemini(_args: str, _state, config)
Harvest fresh session data from gemini.google.com using Playwright.
Opens a visible Chrome window and navigates to gemini.google.com.
You must send a single message in the browser chat for the script
to intercept the necessary internal API headers/cookies.
Data is saved to ~/.dulus/gemini_web.json
def cmd_harvest_deepseek(_args: str, _state, config)
Harvest fresh session data from chat.deepseek.com using Playwright.
Opens a visible Chrome window and navigates to chat.deepseek.com.
The script intercepts the Authorization Bearer token and cookies
automatically on the first chat response.
Data is saved to ~/.dulus/deepseek_web.json for use by dee
def cmd_harvest_qwen(_args: str, _state, config)
Harvest fresh session data from chat.qwen.ai using Playwright.
Opens a visible Chrome window and navigates to chat.qwen.ai. The
script intercepts the JWT `token` cookie and POST headers/cookies the
first time you send a message in the chat. Data is saved to
~/.dulus/qwen_web.json for the qwen-web p
def cmd_gemini_chats(args: str, _state, config)
Manage Gemini Web conversations.
/gemini_chats — show current conversation IDs
/gemini_chats new — start a fresh conversation
def cmd_kimi_chats(args: str, _state, config)
List and select Kimi.com chats.
/kimi_chats — show last 20 chats (numbered)
/kimi_chats all — show up to 200 chats
/kimi_chats use <N> — switch to chat #N from the list
/kimi_chats use <id> — switch to chat by id prefix
/kimi_chats new — clear current chat
def cmd_claude_chats(args: str, _state, config)
List and select Claude.ai conversations.
/claude_chats — show last 20 conversations (numbered)
/claude_chats all — show all conversations
/claude_chats use <N> — switch to conversation #N from the list
/claude_chats use <uuid> — switch to conversation by UUID prefix
def cmd_hide_sender(_args: str, _state, config)
Toggle echoing your typed message above the sticky input bar.
ON → message disappears on send; output area shows only Dulus's responses
(use /history to recall what you typed).
OFF → your message stays visible above as `» <msg>`.
def cmd_history(args: str, state, _config)
Show previous user messages from this session.
/history → last 20 user messages
/history N → last N user messages
/history all → all user messages
def cmd_sticky_input(_args: str, _state, config)
Toggle the prompt_toolkit anchored input bar.
ON → input line stays pinned at the bottom; background notifications
flow above it (can jitter on Windows consoles).
OFF → plain input() — native terminal behavior, zero redraws.
Background notifications land where they land.
def cmd_theme(args: str, _state, config)
Switch the Dulus color palette. `/theme` lists, `/theme <name>` applies.
def cmd_ultra_search(_args: str, _state, config)
def cmd_permissions(args: str, _state, config)
def _import_dulus_module(mod_name: str)
Import a module from the dulus_tools/ package, handling dulus.py name shadow.
def cmd_afk(_args: str, _state, config)
Toggle AFK mode - auto-dismiss AskUserQuestion and auto-approve tool calls.
def cmd_yolo(_args: str, _state, config)
Toggle YOLO mode - auto-approve ALL actions without prompts.
def cmd_cwd(args: str, _state, config)
def _build_session_data(state, session_id: str | None = None)
Serialize current conversation state to a JSON-serializable dict.
def cmd_cloudsave(args: str, state, config)
Sync sessions to GitHub Gist.
/cloudsave setup <token> — configure GitHub Personal Access Token
/cloudsave — upload current session to Gist
/cloudsave push [desc] — same as above with optional description
/cloudsave auto on|off — toggle auto-upload on /exit
/cloudsav
def cmd_exit(_args: str, _state, config)
def cmd_memory(args: str, _state, config)
def cmd_agents(_args: str, _state, config)
def _print_background_notifications(state = None)
Print notifications and inject completions into state messages.
Returns True if any NEW completion/failure was handled.
def _ipc_server_loop(config, state)
Tiny TCP server: accepts one JSON request per connection, runs it on
the live session, and writes the assistant reply back as JSON.
Robust to port-already-in-use (we just exit silently — another instance
is the listener and that's fine).
def _try_ipc_dispatch(prompt: str, timeout: float = 0.4)
Client side: probe the IPC server, send a prompt, print the response,
return True if it succeeded. Returns False if no server is listening,
so callers can fall back to the in-process --print path.
def _job_sentinel_loop(config, state)
Background daemon that triggers run_query as soon as a job finishes.
SAFETY: Only fires if the chat has been idle for at least 10 seconds.
This prevents background notifications from colliding with active
conversation turns (user typing, model streaming, Telegram messages).
If a job finishes during
def cmd_skills(_args: str, _state, config)
def _pager(header: str, lines: list, page_size: int = 30)
Simple terminal pager: shows page_size lines, waits for n/q.
def cmd_skill(args: str, state, config)
Browse and install skills from Anthropic marketplace or ClawHub.
/skill — list installed skills + show help
/skill list — list installed skills
/skill list local [q] — browse/search Anthropic skills on disk
/skill list dump [path] — dump ALL skills (awesom
def cmd_mcp(args: str, _state, config)
Show MCP server status, or manage servers.
/mcp — list all configured servers and their tools
/mcp reload — reconnect all servers and refresh tools
/mcp reload <name> — reconnect a single server
/mcp add <name> <command> [args...] — add a stdio server to user
def cmd_plugin(args: str, _state, config)
Manage plugins.
/plugin — list installed plugins
/plugin install name@url [--main-agent] — install a plugin; with --main-agent, hand off to the main agent after install
/plugin uninstall name — uninstall a plugin
/plugin enable name
def cmd_tasks(args: str, _state, config)
Show and manage tasks.
/tasks — list all tasks
/tasks create <subject> — quick-create a task
/tasks done <id> — mark task completed
/tasks start <id> — mark task in_progress
/tasks cancel <id> — mark task cancelled
/tasks delete <id>
def cmd_ssj(args: str, state, config)
SSJ Developer Mode — Interactive power menu for project workflows.
Usage: /ssj
def cmd_kill_tmux(_args: str, _state, config)
Kill all tmux and psmux sessions.
Usage: /kill_tmux
Useful when tmux/psmux sessions are stuck or causing problems.
def cmd_worker(args: str, state, config)
Auto-implement pending tasks from a todo_list.txt file.
Usage:
/worker — all pending tasks, default path
/worker 1,4,6 — specific task numbers, default path
/worker --path /some/todo.txt — all tasks from custom path
/worker --path /
def _tg_api(token: str, method: str, params: dict = None)
Call Telegram Bot API. Returns parsed JSON or None on error.
def _tg_register_commands(token: str)
Register slash commands with Telegram so the native UI suggests them as
the user types '/'. Called once when the bridge starts.
Telegram rules: command name must be 1-32 chars, lowercase letters/digits/
underscores; description up to 256 chars; max 100 commands per bot.
def _tg_send(token: str, chat_id: int, text: str)
Send a message to a Telegram chat, splitting if too long.
def _tg_typing_loop(token: str, chat_id: int, stop_event: threading.Event, config: dict = None)
Send 'typing...' indicator every 4 seconds until stop_event is set.
def _parse_chat_ids(value)
Accept int, list, or comma-separated string ('123,456,,') → list[int].
Empty parts (from trailing commas) are dropped.
def _tg_get_chat_ids(config: dict)
Read configured chat ids from config. Supports legacy single int and
new comma-separated string / list.
def _tg_poll_loop(token: str, chat_ids, config: dict)
Long-polling loop. chat_ids: int (legacy) or list[int].
All listed users are authorized; replies go back to whoever sent the msg.
Authorization is cached for the fast-path (in-set lookup). On a cache
MISS — i.e. someone the bot has never seen messages it — we re-read
`_tg_get_chat_ids(config)` once
def _run_daemon(config: dict)
Daemon mode — keep Dulus alive in the background for Telegram bridges.
No REPL, no GUI. Just a persistent state + callback loop so external
triggers (Telegram) can wake the agent at any time.
def cmd_telegram(args: str, _state, config)
Telegram bot bridge — receive and respond to messages via Telegram.
Usage: /telegram <bot_token> <chat_id> — start bridge
/telegram stop — stop bridge
/telegram status — show current status
/telegram add_id <chat_
def cmd_proactive(args: str, state, config)
Manage proactive background polling.
/proactive — show current status
/proactive 5m — enable, trigger after 5 min of inactivity
/proactive 30s / 1h — enable with custom interval
/proactive off — disable
def cmd_lite(args: str, state, config)
Toggle LITE mode - reduces system prompt from ~10K to ~500 tokens.
/lite — toggle ON/OFF
/lite on — force ON (minimal rules)
/lite off — force OFF (full rules with all examples)
LITE mode keeps only essential rules:
- TmuxOffload for >5 seconds
- SearchLastOutput for truncated
def cmd_tts(args: str, state, config)
TTS: toggle automatic voice output, or set language / provider / auto-listen.
/tts — toggle TTS ON/OFF
/tts lang <code> — set language (es, en, fr, pt, ja…)
/tts lang — show current language
/tts provider — show current TTS provider
/t
def cmd_say(args: str, state, config)
TTS: speak the provided text immediately.
/say <text> — speak the given text using the best available backend
def cmd_voice(args: str, state, config)
Voice input: record → STT → auto-submit as user message.
/voice — record once, transcribe, submit
/voice status — show backend availability
/voice lang <code> — set STT language (e.g. zh, en, ja; 'auto' to reset)
/voice device — list and select input microphone
def cmd_wake(args: str, state, config)
Wake-word (hotword) detection — hands-free voice activation.
/wake on — start listening for "Hey Dulus" in background
/wake off — stop the background listener
/wake status — show whether listener is active
/wake phrases — list recognised wake phrases
/wake threshold <0.01-0.20>
def cmd_image(args: str, state, config)
Grab image from clipboard and send to vision model with optional prompt.
def cmd_video(args: str, state, config)
Attach a video and send it to a Kimi K2.5 / K2.6 vision model.
/video <path> — a local video file (mp4/webm/mov/mkv/...)
/video <url> — an http(s) URL to a video
/video <src> :: ask — attach <src> and send "ask" as the prompt
The video is base64-
def cmd_budget(args: str, state, config)
Show or set the per-session resource budget (governance ledger).
/budget — show current usage vs limits
/budget tokens 200000 — set the token limit (enables the budget)
/budget tool_calls 300 — set the tool-call limit
/budget cost_micro 5000000 — set a cost ceili
def cmd_ocr(args: str, state, config)
Extract text from an image WITHOUT calling a vision model.
Input order:
/ocr <path> — OCR a file on disk (jpg/png/webp/bmp/tiff)
/ocr — OCR the clipboard image
/ocr <prompt> — OCR clipboard, then submit `<prompt>\n\n<text>`
def cmd_checkpoint(args: str, state, config)
List or restore checkpoints.
/checkpoint — list all checkpoints
/checkpoint <id> — restore to checkpoint #id
/checkpoint clear — delete all checkpoints for this session
def cmd_plan(args: str, state, config)
Enter/exit plan mode or show current plan.
/plan <description> — enter plan mode and start planning
/plan — show current plan file contents
/plan done — exit plan mode, restore permissions
/plan status — show plan mode status
def cmd_compact(args: str, state, config)
Manually compact conversation history.
/compact — compact with default summarization
/compact <focus> — compact with focus instructions
def cmd_news(args: str, state, config)
Show the latest news from docs/news.md.
def cmd_init(args: str, state, config)
Initialize a DULUS.md file in the current directory.
/init — create DULUS.md with a starter template
def cmd_export(args: str, state, config)
Export conversation history to a file.
/export — export as markdown to .dulus/exports/
/export <filename> — export to a specific file (.md or .json)
def cmd_fork(args: str, state, config)
Fork the current session at a given turn.
/fork - list turns and prompt for which to fork at
/fork <turn_index> - fork at the specified turn (0-based)
def cmd_undo(_args: str, state, config)
Undo the last turn by forking at the second-to-last turn.
/undo - remove the most recent turn
def cmd_add_dir(args: str, _state, config)
Manage additional workspace directories.
/add-dir <path> - add a directory to the workspace
/add-dir list - list added directories
/add-dir remove <path> - remove a directory
def cmd_import(args: str, state, config)
Import conversation data from a file or another session.
/import <file_path> - import from .json, .md, or .txt
/import <session_id> - import from another Dulus session
def cmd_copy(args: str, state, config)
Copy the last assistant response or file content to clipboard.
/copy - copy last assistant message
/copy <file> - copy file contents
def cmd_shell(args: str, state, config)
Toggle or use shell mode for direct command execution.
/shell - toggle shell mode
/shell on|off - activate/deactivate
/shell <command> - execute directly
def cmd_status(args: str, state, config)
Show current session status.
/status — model, provider, permissions, session info
def cmd_doctor(args: str, state, config)
Diagnose installation health and connectivity.
/doctor — run all health checks
def cmd_roundtable(args: str, _state, config)
Start a roundtable discussion among different models.
/roundtable - Enter setup mode to define models
/roundtable stop - Exit roundtable mode
/roundtable proactive 3m - Auto-send 'ok ok' every 3m to keep the table alive
/roundtable proactive off - Disable roundtable proacti
def cmd_batch(args: str, _state, config)
Manage Kimi Batch tasks.
/batch status [id] — check progress
/batch list — list recent batch jobs
/batch fetch [id] — download results when completed
def cmd_claude_batch(args: str, _state, config)
Manage Anthropic (Claude) Batch tasks — parallel to /batch (Kimi).
Uses Anthropic's native batch endpoint (50% discount, <24h SLA).
Submits requests INLINE in one API call; no JSONL upload step.
/claude_batch create <prompt1> | <prompt2> | ... — create a batch
[--mod
def cmd_webbridge(args: str, state, config)
Control the Dulus WebBridge browser automation.
def handle_slash(line: str, state, config)
Handle /command [args]. Returns True if handled, tuple (skill, args) for skill match.
def setup_readline(history_file: Path)
def repl(config: dict, initial_prompt: str = None)
def main()
Imports
__future__argparseatexitcommondatetimeinputjsonlicense_managerospathlibrespinnersystextwrapthreadingtimetoolstracebacktypinguuidwarnings
▶dulus_gui.py387 LOC
Dulus GUI Entry Point — professional desktop interface.
Usage:
python dulus_gui.py
python dulus.py --gui
Classes (1)
class _PermissionDialog
Modal permission request dialog centered on the parent.
↳ __init__(self, parent: ctk.CTk, description: str, on_resolve: Callable[[bool], None])
↳ _create_ui(self, description: str)
↳ _setup_window(self, parent: ctk.CTk)
↳ _allow(self)
↳ _deny(self)
Functions (3)
def _center_on_parent(dialog: ctk.CTkToplevel, parent: ctk.CTk)
Center a Toplevel over its parent window.
def launch_gui(config: dict | None = None, initial_prompt: str | None = None)
Launch the Dulus desktop GUI.
Args:
config: Dulus configuration dict (loaded from disk if None).
initial_prompt: Optional initial user message to send on startup.
def main()
CLI entry point.
Imports
__future__configdatetimeguigui.session_utilsgui.themesjsonpathlibqueuesysthreadingtracebacktyping
▶dulus_mcp/__init__.py43 LOC
mcp package — Model Context Protocol client for dulus.
Usage
-----
MCP servers are configured in one of two JSON files:
~/.dulus/mcp.json (user-level, all projects)
.mcp.json (project-level, current dir, overrides user)
Format:
{
"mcpServers": {
"my-git-server": {
"type": "stdio",
"command": "uvx",
"args": ["mcp-server-git"]
},
"my-remote": {
"type": "sse",
"url": "http://localh
Imports
clientconfigtoolstypes
▶dulus_mcp/client.py546 LOC
MCP client: stdio and HTTP/SSE transports, JSON-RPC 2.0 protocol.
Classes (4)
class StdioTransport
Bidirectional JSON-RPC over a subprocess's stdin/stdout.
Messages are newline-delimited JSON objects (one per line).
Responses are matched to requests by 'id'.
↳ __init__(self, config: MCPServerConfig)
↳ start(self)
↳ _read_loop(self)
↳ _stderr_loop(self)
↳ _send_raw(self, msg: dict)
↳ request(self, method: str, params: Optional[dict] = None, timeout: Optional[int] = None)
Send a JSON-RPC request and wait for the response.
↳ notify(self, method: str, params: Optional[dict] = None)
Send a JSON-RPC notification (no response expected).
↳ stop(self)
↳ alive(self)
↳ stderr_output(self)
class HttpTransport
HTTP-based MCP transport (POST-based streamable HTTP or SSE endpoint).
For SSE servers: sends messages via POST to the SSE session endpoint.
For HTTP servers: sends messages via POST and reads response directly.
↳ __init__(self, config: MCPServerConfig)
↳ _get_client(self)
↳ start(self)
For SSE transport: connect to the /sse endpoint and get session URL.
↳ _start_sse(self)
Open SSE stream to get session endpoint, then start background reader.
↳ request(self, method: str, params: Optional[dict] = None, timeout: Optional[int] = None)
↳ notify(self, method: str, params: Optional[dict] = None)
↳ stop(self)
↳ alive(self)
class MCPClient
Manages the lifecycle of one MCP server connection.
Protocol flow:
connect() → initialize handshake → notifications/initialized
list_tools() → tools/list
call_tool() → tools/call
disconnect() → cleanup
↳ __init__(self, config: MCPServerConfig)
↳ connect(self)
↳ _make_transport(self)
↳ _handshake(self)
↳ disconnect(self)
↳ reconnect(self)
↳ alive(self)
↳ list_tools(self)
Fetch tool list from server and cache as MCPTool objects.
↳ _parse_tool(self, raw: dict)
↳ call_tool(self, tool_name: str, arguments: dict)
Call a tool by its original (non-qualified) name.
Returns the text content from the response, or an error string.
↳ status_line(self)
class MCPManager
Singleton that manages all configured MCP server connections.
↳ __init__(self)
↳ add_server(self, config: MCPServerConfig)
Register a server. Replaces any existing client with the same name.
↳ connect_all(self)
Connect to all registered servers. Returns {name: error_or_None}.
↳ connect_server(self, name: str)
Connect (or reconnect) a single server by name.
↳ all_tools(self)
Return all tools from all connected servers.
↳ call_tool(self, qualified_name: str, arguments: dict)
Dispatch a tool call by qualified name (mcp__server__tool).
↳ list_servers(self)
↳ disconnect_all(self)
↳ reload_server(self, name: str)
Functions (1)
def get_mcp_manager()
Imports
__future__jsonossubprocessthreadingtimetypestyping
▶dulus_mcp/config.py133 LOC
Load MCP server configs from .mcp.json files (project + user level).
Config search order (project-level overrides user-level by server name):
1. ~/.dulus/mcp.json — user-level, lowest priority
2. <cwd>/.mcp.json — project-level, highest priority
File format (matches Claude Code's .mcp.json format):
{
"mcpServers": {
"my-server": {
"type": "stdio",
"command": "uvx",
"args": ["mcp-server-git", "--repository", "."
Functions (6)
def _load_file(path: Path)
Read a single mcp.json file and return the mcpServers dict.
def load_mcp_configs()
Return all MCP server configs, project-level overriding user-level.
def save_user_mcp_config(servers: Dict[str, dict])
Write (or update) the user-level MCP config file.
def add_server_to_user_config(name: str, raw: dict)
Append or update one server entry in the user MCP config.
def remove_server_from_user_config(name: str)
Remove a server from the user MCP config. Returns True if found.
def list_config_files()
Return paths of all mcp.json config files that exist.
Imports
__future__jsonpathlibtypestyping
▶dulus_mcp/tools.py131 LOC
Register MCP tools into the central tool_registry.
Importing this module:
1. Loads .mcp.json config files
2. Connects to each configured MCP server
3. Discovers tools from each server
4. Registers each tool into tool_registry so Claude can use them
MCP tool qualified names follow the pattern:
mcp__<server_name>__<tool_name>
This matches the Claude Code convention (mcp__serverName__toolName).
Functions (7)
def _make_mcp_func(qualified_name: str)
Return a tool func that calls the MCP server for a given qualified name.
def _register_tool(tool: MCPTool)
def initialize_mcp(verbose: bool = False)
Load configs, connect servers, register tools. Idempotent.
Returns a dict of {server_name: error_message_or_None}.
def reload_mcp()
Force a full reload: re-read configs, reconnect, re-register all tools.
def refresh_server(server_name: str)
Reconnect a single server and re-register its tools. Returns error or None.
def get_connect_errors()
def _background_init()
Imports
__future__clientconfigthreadingtool_registrytypestyping
▶dulus_mcp/types.py124 LOC
MCP type definitions: server configs, tool descriptors, connection state.
Classes (4)
class MCPTransport
class MCPServerConfig
Configuration for a single MCP server.
Mirrors the Claude Code schema (types.ts) for the two most useful transports.
Stdio example:
{"type": "stdio", "command": "uvx", "args": ["mcp-server-git"]}
SSE/HTTP example:
{"type": "sse", "url": "http://localhost:8080/sse",
"headers": {"Autho
↳ from_dict(cls, name: str, d: dict)
class MCPServerState
class MCPTool
A tool provided by an MCP server, ready to register in tool_registry.
↳ to_tool_schema(self)
Convert to the schema format expected by the Claude API.
Functions (2)
def make_request(method: str, params: Optional[dict], req_id: int)
def make_notification(method: str, params: Optional[dict] = None)
Imports
__future__dataclassesenumtyping
▶dulus_tools/__init__.py1 LOC
Dulus Tools — Utility modules for Dulus CLI.
▶dulus_tools/add_dir_manager.py109 LOC
AddDirManager - Manage additional workspace directories.
Allows users to add directories *outside* the current working directory
into the workspace so that tools (file search, git, etc.) can see them.
This is the backend for the ``/add-dir`` slash command.
Classes (1)
class AddDirManager
Manages additional directories in the workspace.
Each added directory is stored as an absolute path and validated so
that:
* it actually exists and is readable,
* it is not already inside the working directory,
* it is not a duplicate or nested inside another added directory.
↳ __init__(self, work_dir: str)
↳ add(self, path: str)
Add a directory to the workspace.
Args:
path: Absolute or relative path. ``~`` is expanded.
Returns:
``(success, message)`` tuple.
↳ list(self)
Return a shallow copy of the added directory paths.
↳ remove(self, path: str)
Remove a directory from the workspace.
Args:
path: The path to remove (will be normalised).
Returns:
``True`` if the path was found and removed.
↳ _is_within_directory(path: str, directory: str)
Return ``True`` when *path* is the same as, or inside, *directory*.
↳ get_combined_file_listing(self)
Human-readable listing of working dir + additional dirs.
Imports
ospathlibtyping
▶dulus_tools/afk_mode.py34 LOC
AFK Mode - Auto-dismiss AskUserQuestion and auto-approve tool calls.
Classes (1)
class AFKMode
AFK mode state manager.
When enabled, auto-dismisses AskUserQuestion prompts and auto-approves
tool calls, allowing the agent to run unattended.
↳ enabled(self)
Return whether AFK mode is currently enabled.
↳ toggle(self)
Toggle AFK mode on/off. Returns the new state.
↳ enable(self)
Enable AFK mode.
↳ disable(self)
Disable AFK mode.
↳ __repr__(self)
Imports
dataclasses
▶dulus_tools/approval_runtime.py285 LOC
ApprovalRuntime - Full approval system with events, subscribers, and timeout support.
Classes (3)
class ApprovalRequest
Represents a request for user approval.
Attributes:
id: Unique identifier for this request.
tool_call_id: The ID of the tool call being approved.
sender: Who/what is requesting approval.
action: The action being requested.
description: Human-readable description of the action.
class ApprovalResponse
Represents a response to an approval request.
Attributes:
request_id: The ID of the request being responded to.
response: The response type ("approve" or "reject").
feedback: Optional feedback text from the approver.
resolved_at: Unix timestamp when the response was given.
class ApprovalRuntime
Runtime for managing approval requests with events and subscribers.
Supports:
- Creating and tracking approval requests
- Waiting for responses with optional timeout
- Event subscribers for "request_created" and "request_resolved"
- Cancelling requests by source
- Listing pending requests
↳ __init__(self)
↳ create_request(self)
Create a new approval request and notify subscribers.
Args:
tool_call_id: The ID of the tool call being approved.
sender: Who/what is requesting approval.
action: The action being request
↳ resolve(self, request_id: str, response: str, feedback: str = '')
Resolve an approval request with a response.
Args:
request_id: The ID of the request to resolve.
response: "approve" or "reject".
feedback: Optional feedback text.
Returns:
True if t
↳ subscribe(self, callback: Callable[[str, ApprovalRequest | ApprovalResponse], None])
Subscribe to approval events.
The callback receives (event_type, data) where event_type is
"request_created" or "request_resolved".
Args:
callback: Function to call when events occur.
Returns:
↳ unsubscribe(self, token: str)
Unsubscribe from approval events.
Args:
token: The subscription token returned by subscribe().
↳ _notify(self, event_type: str, data: ApprovalRequest | ApprovalResponse)
Notify all subscribers of an event.
↳ list_pending(self)
Return all pending (unresolved) approval requests.
Returns:
List of pending ApprovalRequest objects.
↳ cancel_by_source(self, source_kind: str, source_id: str)
Cancel all pending requests from a given source.
Args:
source_kind: The source kind to match (e.g., "background_agent").
source_id: The source ID to match.
Returns:
Number of requests ca
↳ get_request(self, request_id: str)
Get a request by ID.
Args:
request_id: The request ID.
Returns:
The ApprovalRequest or None if not found.
↳ clear_resolved(self)
Remove all resolved requests and responses from memory.
Returns:
Number of items cleared.
Imports
__future__asynciodataclassestimetypinguuid
▶dulus_tools/background_tasks.py847 LOC
BackgroundTaskManager - Manage background tasks with persistence and recovery.
Features 14-15: Background Tasks (BackgroundTaskManager, BackgroundAgentRunner)
This module provides:
- BackgroundTaskStore: Persistent on-disk storage for task specs, runtime state,
and output logs.
- BackgroundTaskManager: Create, monitor, kill, and recover background bash/agent
tasks with configurable limits and heartbeats.
- BackgroundAgentRunner: Async runner for sub-agent tasks in the b
Classes (7)
class TaskSpec
Static specification for a background task.
Attributes:
id: Unique task identifier (e.g. ``bash-a1b2c3d4``).
kind: Task kind — ``"bash"`` or ``"agent"``.
session_id: Session directory name this task belongs to.
description: Human-readable description of the task.
command: Shell
class TaskRuntime
Mutable runtime state for a background task.
Attributes:
status: Current lifecycle status.
worker_pid: OS PID of the worker process.
child_pid: OS PID of the child process (if different from worker).
child_pgid: Process group ID of the child.
started_at: Unix timestamp when exec
class TaskView
Combined read-only view of a task's spec and runtime.
class TaskOutputChunk
A chunk of task output read from the log file.
Attributes:
text: The output text content.
offset: Byte offset where this chunk starts.
next_offset: Byte offset where the next chunk should start.
class BackgroundTaskStore
Persistent store for background tasks.
Each task gets its own directory under *root* containing:
- ``spec.json`` – frozen :class:`TaskSpec`
- ``runtime.json`` – mutable :class:`TaskRuntime` (rewritten on every state change)
- ``output.log`` – combined stdout/stderr from the worker
Parameters:
↳ __init__(self, root: Path)
↳ task_dir(self, task_id: str)
Return the directory path for *task_id*.
↳ create_task(self, spec: TaskSpec)
Create a new task directory with initial spec, runtime, and empty log.
↳ read_spec(self, task_id: str)
Load the :class:`TaskSpec` for *task_id*.
↳ read_runtime(self, task_id: str)
Load the :class:`TaskRuntime` for *task_id*.
↳ write_runtime(self, task_id: str, runtime: TaskRuntime)
Persist updated :class:`TaskRuntime` for *task_id*.
↳ read_output(self, task_id: str, offset: int = 0, max_bytes: int = 32 * 1024)
Read up to *max_bytes* of output starting at *offset*.
Returns:
A :class:`TaskOutputChunk` with the text and next offset.
↳ tail_output(self, task_id: str, max_bytes: int = 32 * 1024, max_lines: int = 50)
Return the last *max_lines* lines of output, bounded by *max_bytes*.
This is useful for notifications where only the tail is relevant.
↳ output_path(self, task_id: str)
Return the :class:`Path` to the output log for *task_id*.
↳ list_task_ids(self)
Return all task IDs stored on disk.
↳ list_views(self)
Return a list of :class:`TaskView` for all tasks, sorted newest first.
↳ merged_view(self, task_id: str)
Return a :class:`TaskView` for *task_id*.
Raises:
FileNotFoundError: If the task does not exist.
class BackgroundTaskManager
Manages background tasks with persistence and recovery.
Parameters:
session_dir: Root directory for the current session. Task data is
stored in ``<session_dir>/tasks/``.
config: Optional override for default configuration values.
notifications: Optional callback/queue for
↳ __init__(self, session_dir: str, config: dict[str, int] | None = None, notifications: Any | None = None)
↳ store(self)
The underlying :class:`BackgroundTaskStore`.
↳ completion_event(self)
Event that is set whenever a task reaches a terminal state.
↳ _active_task_count(self)
↳ has_active_tasks(self)
Return ``True`` if any non-terminal tasks exist.
↳ create_bash_task(self, command: str, description: str)
Create and launch a background bash task.
Parameters:
command: Shell command to execute.
description: Human-readable description.
timeout_s: Maximum execution time in seconds.
tool_ca
↳ _launch_worker(self, task_dir: Path, command: str, cwd: str, timeout_s: int)
Launch a worker subprocess and return its PID.
The worker runs the command via the current Python interpreter so that
environment variables (``DULUS_BG_TASK``, ``DULUS_BG_TIMEOUT``) are
injected and
↳ list_tasks(self)
List task views.
Parameters:
active_only: If ``True``, filter out terminal-status tasks.
limit: Maximum number of results.
Returns:
Sorted list of :class:`TaskView` (newest first).
↳ get_task(self, task_id: str)
Return the :class:`TaskView` for *task_id*, or ``None`` if not found.
↳ kill(self, task_id: str)
Kill a running task and mark it as ``killed``.
Parameters:
task_id: The task to kill.
reason: Human-readable reason for the kill.
Returns:
Updated :class:`TaskView` after the kill operat
↳ kill_all_active(self)
Kill all tasks that are not in a terminal state.
Parameters:
reason: Human-readable reason for the mass kill.
Returns:
List of task IDs that were killed.
↳ read_output(self, task_id: str, offset: int = 0, max_bytes: int | None = None)
Read task output starting at *offset*.
↳ tail_output(self, task_id: str, max_bytes: int | None = None, max_lines: int | None = None)
Return the tail of the task output log.
↳ resolve_output_path(self, task_id: str)
Return the filesystem path to the output log for *task_id*.
↳ recover(self)
Mark stale non-terminal tasks as ``lost``.
A task is considered stale when no heartbeat (or start/update) has
been seen for longer than ``worker_stale_after_ms``.
↳ reconcile(self)
Run recovery and publish terminal notifications.
Returns:
List of task IDs that were published as terminal.
↳ _publish_terminal_notifications(self)
Signal the completion event for all terminal tasks.
class BackgroundAgentRunner
Runs a sub-agent task in the background.
This class encapsulates the lifecycle of an agent-type background task:
marking it as running, executing the agent, and recording the result.
The :meth:`_execute_agent` method is a placeholder that should be
overridden or replaced with a real agent invocati
↳ __init__(self, task_id: str, prompt: str, model: str | None = None, timeout_s: int = 300, session_dir: str | None = None)
Functions (5)
def is_terminal_status(status: str)
Return ``True`` if *status* is a terminal (finished) state.
def generate_task_id(kind: str)
Generate a short unique task ID like ``bash-a1b2c3d4``.
def format_task(view: TaskView)
Format a single task view as a human-readable string.
Parameters:
view: The task view to format.
include_command: Whether to include the shell command line.
def format_task_list(views: list[TaskView])
Format a list of task views as a human-readable string.
Parameters:
views: List of task views to format.
active_only: Whether the list contains only active tasks (affects header).
def list_task_views(manager: BackgroundTaskManager)
Convenience wrapper to list task views from a manager.
Parameters:
manager: The :class:`BackgroundTaskManager` instance.
active_only: If ``True``, only return non-terminal tasks.
limit: Maximum number of tasks to return.
Imports
__future__asynciodataclassesjsonospathlibsignalsubprocesssystimetypinguuid
▶dulus_tools/clipboard_utils.py166 LOC
ClipboardUtils - Advanced clipboard operations for Dulus.
Classes (1)
class ClipboardUtils
Advanced clipboard operations for cross-platform text handling.
Provides methods to copy text to and paste text from the system
clipboard, copy file contents, and detect image data in clipboard.
Automatically detects the correct clipboard tool for the platform.
Example:
ClipboardUtils.copy_tex
↳ _get_clipboard_command()
Get the appropriate clipboard command for the current platform.
Returns:
Tuple of (copy_command, paste_command) or (None, None) if
no suitable clipboard tool is found.
↳ copy_text(text: str)
Copy text to the clipboard. Cross-platform.
Args:
text: The text to copy.
Returns:
True if the copy succeeded, False otherwise.
↳ paste_text()
Paste text from the clipboard.
Returns:
The clipboard text, or empty string if unavailable.
↳ copy_file_content(file_path: str, line_start: int = 1, line_end: Optional[int] = None)
Copy a range of lines from a file to the clipboard.
Args:
file_path: Path to the file to read.
line_start: First line to copy (1-indexed).
line_end: Last line to copy (1-indexed), or None
↳ is_image_in_clipboard()
Check if the clipboard contains image data.
Performs a basic signature check on clipboard content.
Returns:
True if image data signatures are detected.
Functions (2)
def copy_to_clipboard(text: str)
Copy text to the clipboard. Convenience wrapper.
Args:
text: The text to copy.
Returns:
True if the copy succeeded.
def paste_from_clipboard()
Paste text from the clipboard. Convenience wrapper.
Returns:
The clipboard text, or empty string.
Imports
__future__ospathlibsubprocesssystempfiletyping
▶dulus_tools/diff_visualizer.py147 LOC
DiffVisualizer - Render diffs in various formats.
Classes (1)
class DiffVisualizer
Render display blocks in CLI, HTML, and Telegram formats.
Provides multiple renderers for code diffs, supporting ANSI-colored
terminal output, styled HTML for web clients, and compact summaries
for Telegram. Also generates standard unified diff text.
Example:
block = {"path": "test.py", "old_t
↳ render_cli(diff_block: Dict[str, str])
Render diff for CLI output with ANSI colors.
Args:
diff_block: Dict with keys 'path', 'old_text', 'new_text'.
Returns:
ANSI-colored diff string for terminal display.
↳ render_html(diff_block: Dict[str, str])
Render diff as HTML for WebChat.
Args:
diff_block: Dict with keys 'path', 'old_text', 'new_text'.
Returns:
HTML string with styled diff spans.
↳ render_telegram(diff_block: Dict[str, str])
Render diff summary for Telegram.
Args:
diff_block: Dict with keys 'path', 'old_text', 'new_text'.
Returns:
Compact diff summary with emoji indicators.
↳ generate_unified_diff(old_text: str, new_text: str, path: str)
Generate unified diff format text.
Args:
old_text: Original file content.
new_text: Modified file content.
path: File path to display in the diff header.
Returns:
Unified diff as a m
Imports
__future__difflibhtmltyping
▶dulus_tools/export_import.py206 LOC
Import/Export System - Session import and export.
Provides exporters that convert Dulus conversation history into various
external formats (Markdown, JSON, plain text) and importers that can
read those formats back, or pull in another session's wire log.
Classes (2)
class SessionExporter
Export session conversation history to various file formats.
↳ export_markdown(self, history: List[Dict[str, Any]], output_path: Path, session_id: str = '', token_count: int = 0)
Export conversation history to a Markdown file.
Args:
history: List of message dicts, each with at least ``role``
and ``content`` keys.
output_path: Destination file path. Parent dir
↳ export_json(self, history: List[Dict[str, Any]], output_path: Path, session_id: str = '', token_count: int = 0)
Export conversation history to a JSON file.
The JSON structure contains metadata (session_id, export time,
message count, token count) plus the raw messages array.
Returns:
``(output_path, messa
↳ export_text(self, history: List[Dict[str, Any]], output_path: Path)
Export conversation history to a plain-text file.
Each line is formatted as ``[role] content``.
Returns:
``(output_path, message_count)`` tuple.
class SessionImporter
Import session data from various external sources.
↳ import_from_file(self, file_path: str, max_context_size: Optional[int] = None)
Import from a file on disk.
Supports ``.json`` (Dulus export format), ``.md`` / ``.markdown``
and plain text files.
Args:
file_path: Absolute or relative path to the file.
max_context_size:
↳ import_from_session_id(self, session_id: str, sessions_root: Optional[str] = None)
Import from another Dulus session's wire log.
Args:
session_id: The session directory name.
sessions_root: Parent directory that contains session dirs.
Defaults to ``~/.dulus/sessions
Imports
datetimejsonospathlibretyping
▶dulus_tools/hook_engine.py379 LOC
HookEngine - Configurable hook engine with event matching and shell execution.
Classes (3)
class HookDef
Hook definition that matches events and executes commands.
Attributes:
event: Event type to match (e.g., "tool_call", "approval_request").
matcher: Regex pattern for matching against the target.
command: Shell command to execute (supports {target} and {event} placeholders).
timeout:
↳ compiled_matcher(self)
Return the compiled regex pattern, or None if empty.
class HookResult
Result of executing a hook.
Attributes:
hook: The hook definition that was executed.
matched: Whether the hook matched the event/target.
returncode: Exit code of the command (None if not executed).
stdout: Standard output from the command.
stderr: Standard error from the command
class HookEngine
Hook engine that loads and executes matching hooks.
Supports:
- Loading hooks from TOML config file (~/.dulus/hooks.toml)
- Event matching with regex patterns
- Shell command execution with timeout
- Template substitution ({target}, {event})
↳ __init__(self, hooks: list[HookDef] | None = None, cwd: str | None = None)
Initialize the hook engine.
Args:
hooks: Optional list of hook definitions.
cwd: Working directory for command execution.
↳ add_hook(self, hook: HookDef)
Add a hook definition.
↳ remove_hook(self, event: str)
Remove all hooks matching an event type.
Args:
event: The event type to remove hooks for.
Returns:
Number of hooks removed.
↳ list_hooks(self)
Return all registered hook definitions.
↳ from_config(cls, config_path: str | Path | None = None)
Load hooks from a TOML config file.
Args:
config_path: Path to the hooks.toml file.
Defaults to ~/.dulus/hooks.toml
Returns:
A HookEngine with loaded hooks.
↳ _parse_hooks_toml_simple(cls, text: str)
Simple TOML parser for [[hooks]] tables (fallback).
Handles basic TOML syntax for hooks definitions without
requiring external dependencies.
↳ _make_hook_from_dict(cls, data: dict[str, Any])
Create a HookDef from a parsed dict, with safe defaults.
↳ ensure_default_config(self)
Create a default hooks.toml config if it doesn't exist.
Imports
__future__asynciodataclassesospathlibreshutiltimetyping
▶dulus_tools/notification_manager.py344 LOC
NotificationManager - Notifications with deduplication, claim/ack flow, and persistence.
Classes (2)
class NotificationEvent
Represents a notification event.
Attributes:
id: Unique identifier for this notification.
category: Category of the notification (e.g., "task", "approval").
type: Specific type within the category.
source_kind: Source type (e.g., "background_task", "agent").
source_id: Identifie
↳ to_dict(self)
Serialize to a plain dictionary.
↳ from_dict(cls, data: dict[str, Any])
Deserialize from a plain dictionary.
class NotificationManager
Manages notifications with deduplication and multi-sink delivery.
Features:
- Publish notifications with optional deduplication
- Claim notifications for specific sinks
- Acknowledge claimed notifications
- Persist notifications to JSON files
- Recover stale claimed notifications
↳ __init__(self, root: Path, config: dict | None = None)
Initialize the notification manager.
Args:
root: Base directory for notification storage.
config: Optional configuration dict.
↳ new_id(self)
Generate a new unique notification ID.
↳ publish(self, event: NotificationEvent)
Publish a notification event.
If the event has a dedupe_key and a matching notification already
exists, the existing notification is returned instead.
Args:
event: The notification event to publ
↳ claim_for_sink(self, sink: str, limit: int = 8)
Claim pending notifications for a sink.
Notifications are claimed in order of creation (oldest first).
Args:
sink: The sink name claiming notifications.
limit: Maximum number of notification
↳ ack(self, sink: str, notification_id: str)
Acknowledge a claimed notification.
Args:
sink: The sink name acknowledging.
notification_id: The notification ID to ack.
Returns:
A dict with "id" and "status" fields.
↳ has_pending_for_sink(self, sink: str)
Check if there are pending notifications for a sink.
Args:
sink: The sink name to check.
Returns:
True if there are pending notifications for this sink.
↳ recover(self, stale_seconds: float = 300.0)
Recover stale claimed notifications.
Notifications that were claimed but never acked within the stale
threshold are released back to the pending pool.
Args:
stale_seconds: Time in seconds after
↳ list_all(self)
Return all notifications as dicts.
↳ list_pending(self)
Return all pending (unclaimed, unacked) notifications.
↳ get(self, notification_id: str)
Get a notification by ID.
↳ delete(self, notification_id: str)
Delete a notification by ID.
Returns:
True if the notification was found and deleted.
↳ _file_path(self, notification_id: str)
Get the file path for a notification.
↳ _persist(self, notification: NotificationEvent)
Save a notification to disk.
↳ _load_all(self)
Load all persisted notifications from disk.
Imports
__future__dataclassesjsonpathlibtimetypinguuid
▶dulus_tools/session_fork.py230 LOC
SessionFork - Fork and undo session functionality.
Provides the ability to fork a session at any given turn (creating a new
session branched from that point) and to undo the last turn (forking at
the second-to-last turn).
Inspired by kimi-cli's checkpoint / undo / fork workflow.
Classes (2)
class TurnInfo
Lightweight descriptor for a single conversation turn.
class SessionFork
Session forking and undo functionality.
Scans the ``wire.jsonl`` file that Dulus uses to persist the raw
wire-format conversation stream, enumerates turns (defined by
``TurnBegin`` / ``TurnEnd`` boundaries), and can create a new session
directory containing a truncated copy of the wire file.
↳ __init__(self, session_dir: str)
↳ enumerate_turns(self, wire_path: Optional[Path] = None)
Scan session history and return all turns.
A *turn* begins when a ``TurnBegin`` message is encountered on
the wire. The user text is extracted from the ``user_input``
payload (first line, max 80 cha
↳ _extract_user_text(self, user_input)
Extract a short preview of the user input.
* ``str`` – take the first line, max 80 chars.
* ``list`` – concatenate text parts, max 80 chars.
* other – empty string.
↳ truncate_at_turn(self, wire_path: Path, turn_index: int)
Return every wire line up to and including the given turn.
The walk is stateful: metadata lines are always included,
``TurnBegin`` increments the turn counter, and we stop as
soon as we have passed t
↳ _read_all_lines(self, path: Path)
Return every non-empty stripped line in *path*.
Imports
asynciodataclassesjsonospathlibreshutiltimetyping
▶dulus_tools/shell_mode.py130 LOC
ShellMode - Toggle between shell commands and agent mode.
Classes (1)
class ShellMode
Shell mode for Dulus - execute commands directly.
Provides a toggleable shell execution mode that allows the agent
to run shell commands directly via subprocess. Tracks command
history and supports configurable shell executable and timeouts.
Example:
shell = ShellMode()
shell.activate()
↳ active(self)
Whether shell mode is currently active.
↳ toggle(self)
Toggle shell mode on/off. Returns the new state.
↳ activate(self)
Activate shell mode.
↳ deactivate(self)
Deactivate shell mode.
↳ get_history(self)
Return a copy of the command history.
Functions (2)
def _default_shell()
def _default_shell_name()
Imports
__future__dataclassesossubprocesstyping
▶dulus_tools/todo_visualizer.py131 LOC
TodoVisualizer - Render todo lists in various formats.
Classes (1)
class TodoVisualizer
Render todo lists in CLI, HTML, and Telegram formats.
Provides rich formatting for todo items across different output
targets: ANSI-colored terminal output, interactive HTML checkboxes
for web clients, and emoji-based checklists for Telegram.
Example:
block = {
"items": [
{
↳ render_cli(todo_block: Dict[str, Any])
Render todos with rich formatting for CLI.
Args:
todo_block: Dict with 'items' key, each item having
'status' and 'title' keys.
Returns:
ANSI-formatted todo list string.
↳ render_html(todo_block: Dict[str, Any])
Render todos as interactive HTML checkboxes for WebChat.
Args:
todo_block: Dict with 'items' key, each item having
'status' and 'title' keys.
Returns:
HTML string with checkbo
↳ render_telegram(todo_block: Dict[str, Any])
Render todos as emoji checklist for Telegram.
Args:
todo_block: Dict with 'items' key, each item having
'status' and 'title' keys.
Returns:
Markdown-formatted todo list with s
Imports
__future__htmltyping
▶dulus_tools/wire_events.py208 LOC
Wire Protocol Events - Event system for multi-client sync.
Classes (9)
class WireEvent
Base wire event for the multi-client sync protocol.
All wire events inherit from this base class and share common
attributes like event_type, timestamp, payload, and event_id.
class TurnBeginEvent
Signals the start of a new agent turn.
↳ __post_init__(self)
class TurnEndEvent
Signals the end of the current agent turn.
↳ __post_init__(self)
class StepBeginEvent
Signals the start of a new agent step.
↳ __post_init__(self)
class StepEndEvent
Signals the end of the current agent step.
↳ __post_init__(self)
class ToolCallEvent
Signals that a tool call was executed.
↳ __post_init__(self)
class CompactionBeginEvent
Signals that context compaction has started.
↳ __post_init__(self)
class CompactionEndEvent
Signals that context compaction has completed.
↳ __post_init__(self)
class WireEventBus
Event bus for wire protocol events.
Manages event publication, subscription, and persistent logging
for the multi-client wire protocol. Supports async subscribers,
wildcard subscriptions, and JSONL-based event persistence.
Example:
bus = WireEventBus(session_dir="/tmp/session")
def on_tur
↳ __init__(self, session_dir: Optional[str] = None)
↳ subscribe(self, event_type: str, callback: Callable)
Subscribe to events of a given type.
Args:
event_type: The event type to subscribe to, or "*" for all events.
callback: Function or coroutine to call when events are published.
Returns:
↳ unsubscribe(self, token: str)
Unsubscribe using the token returned from subscribe().
↳ get_recent_events(self, event_type: Optional[str] = None, limit: int = 50)
Get recent events, optionally filtered by type.
Args:
event_type: Filter to this event type, or None for all.
limit: Maximum number of events to return.
Returns:
List of WireEvent object
↳ get_event_count(self)
Return the total number of events processed.
Imports
__future__asynciodataclassesjsonpathlibtimetypinguuid
▶dulus_tools/yolo_mode.py35 LOC
YOLO Mode - Auto-approve ALL actions without prompts.
Classes (1)
class YOLOMode
YOLO mode state manager.
When enabled, auto-approves ALL actions without any prompts.
This is more aggressive than --accept-all and is independent from AFK mode.
Use with extreme caution.
↳ enabled(self)
Return whether YOLO mode is currently enabled.
↳ toggle(self)
Toggle YOLO mode on/off. Returns the new state.
↳ enable(self)
Enable YOLO mode.
↳ disable(self)
Disable YOLO mode.
↳ __repr__(self)
Imports
dataclasses
▶file_filter.py91 LOC
Lightweight file-listing utilities for @-mention path completion.
Provides two strategies:
- ``list_files_git`` — fast, uses ``git ls-files`` when inside a repo.
- ``list_files_walk`` — fallback, walks the directory tree manually.
- ``detect_git`` — returns True if *root* is inside a git work-tree.
Functions (3)
def detect_git(root: Path | str)
Return True if *root* is inside a Git work-tree.
def list_files_git(root: Path | str, scope: Optional[str] = None)
List tracked files via ``git ls-files``.
*scope* optionally restricts to a sub-directory (relative to *root*).
def list_files_walk(root: Path | str, scope: Optional[str] = None)
Walk the directory tree and return relative paths (up to *limit*).
Imports
__future__ospathlibsubprocesstyping
▶governance.py277 LOC
governance.py — Dulus governance layer: budgets, capabilities, hooks.
A lightweight, opt-in per-session governance layer for safe, auditable,
cost-capped agent runs. Three independent pieces you can use together or
on their own:
Ledger Per-session resource budgets across dimensions
(tokens, cost, tool_calls, ...). Atomic charge that
reports over-limit and first-breach so a supervisor can act.
Capabilities Least-privilege grants — which tools
Classes (6)
class ChargeResult
↳ remaining(self)
class Ledger
Per-session resource budget. Thread-safe; atomic charges.
limits: {"tokens": 200_000, "cost_micro": 5_000_000, "tool_calls": 300}
A dimension absent from `limits` (or set to a negative value) is
treated as UNLIMITED. warn_at is the fraction of the limit at which
a one-time w
↳ __init__(self, limits: Optional[dict] = None, warn_at: float = 0.8)
↳ granted(self, dim: str)
↳ used(self, dim: str)
↳ remaining(self, dim: str)
↳ would_exceed(self, dim: str, amount: int)
↳ charge(self, dim: str, amount: int)
Record usage atomically. Always succeeds (never blocks); the caller
inspects `over_limit` / `first_breach` and decides what to do.
↳ set_limit(self, dim: str, amount: Optional[int])
Change a limit live (preserves already-charged usage). A negative /
None amount removes the limit (unlimited). Re-arms warn/breach flags.
↳ snapshot(self)
class Capabilities
Least-privilege grants for an agent. Empty/None lists with allow_all=True
means "everything"; otherwise grants are explicit allow-lists with optional
deny-lists that win. Patterns use fnmatch globbing.
A sub-agent gets `parent.derive_child(...)` which can only NARROW the parent
(children ⊆ parent)
↳ allows_tool(self, name: str)
↳ allows_path(self, path: str)
↳ allows_net(self, host: str)
↳ derive_child(self, tools = None, fs_paths = None, net_hosts = None)
Return a child capability set that is a SUBSET of self. The child may
only request things the parent already allows; anything broader is
silently clamped to the parent's grant.
class HookVeto
Raised/returned by a pre_tool hook to block an operation.
↳ __init__(self, reason: str = 'blocked by hook')
class Hooks
↳ __init__(self)
↳ register(self, event: str, fn: Callable)
↳ fire(self, event: str, **ctx)
Fire all callbacks for `event`. For pre_tool, returns (allowed, reason):
any callback returning False / raising HookVeto blocks the op. Other
events always return (True, ""). Callback exceptions never
class Governance
Bundles the three pieces for one agent session. Any may be None.
↳ child(self, **caps)
Governance for a spawned sub-agent: SHARES the ledger (one budget for
the whole tree), NARROWS capabilities (child ⊆ parent), shares hooks.
Functions (1)
def from_config(config: dict)
Build a Governance from a config dict, or None if not enabled.
Expected (all optional) under config["governance"]:
{
"limits": {"tokens": 200000, "cost_micro": 5000000, "tool_calls": 300},
"warn_at": 0.8,
"tools": ["Read", "Grep", "Glob", "Bash", "Web*"], # allow-list
Imports
__future__collectionsdataclassesfnmatchosthreadingtimetyping
▶gui/__init__.py45 LOC
Dulus GUI package — professional desktop interface.
GUI-heavy modules are loaded LAZILY. Headless environments (server,
Docker without X11, WSL without python3-tk, Termux) need to be able to
do `from gui.session_utils import ...` without crashing on the
tkinter/customtkinter import chain. We wrap the lazy attribute lookup
in try/except so importing `gui` itself is always safe; only touching
an attribute triggers the heavy imports, and they raise a clearer error
when tkinter is missing.
Functions (1)
def __getattr__(name: str)
Imports
__future__importlibtyping
▶gui/agent_bridge.py289 LOC
Bridge between the GUI and Dulus's core agent engine.
Handles AgentState, config, threaded execution, MemPalace injection,
skill injection, and permission requests. Based on Nayeli's design.
Classes (1)
class DulusBridge
Thread-safe bridge between GUI and Dulus core.
Runs the agent loop in a background thread and streams events
back to the UI via an internal event queue (poll from GUI thread).
↳ __init__(self, config: dict | None = None)
↳ start(self)
Start the background worker thread.
↳ stop(self)
Clean shutdown of the bridge worker thread.
↳ send_message(self, text: str)
Enqueue a user message. Pre-loads pending history if needed.
↳ stop_generation(self)
Signal the current generation to stop as soon as possible.
↳ grant_permission(self, granted: bool)
Respond to a pending permission request.
↳ get_context_usage(self)
Return (tokens_used, token_limit).
↳ save_current_session(self)
Manually save the current active state to disk. Returns session_id.
↳ clear_session(self)
Reset the agent state (new conversation).
↳ load_session(self, messages: list[dict], session_id: str | None = None)
Load a previous session's messages into the current state.
↳ inject_skill(self, skill_body: str)
Inject skill context into the next user message (one-shot).
↳ set_model(self, model: str)
Change the active model.
↳ _worker_loop(self)
↳ _process_turn(self, user_message: str)
↳ _apply_mempalace(self, user_input: str)
Copy of dulus.py MemPalace injection logic.
↳ _emit(self, event_type: str, **kwargs)
Put an event into the public event queue.
Imports
__future__agentcommonconfigcontextdulus_mcp.toolsgui.session_utilsmemory.toolsmulti_agent.toolspathlibqueueskill.toolstask.toolsthreadingtools
▶gui/chat_widget.py448 LOC
Chat display widget for Dulus GUI.
Provides a scrollable chat view with message bubbles, markdown-like rendering,
code blocks with copy buttons, tool execution pills, and a typing indicator.
Classes (1)
class ChatWidget
Scrollable chat widget with message bubbles and rich formatting.
↳ __init__(self, master, on_copy_callback: Callable | None = None, **kwargs)
↳ add_user_message(self, text: str)
Add a user message bubble on the right.
↳ add_assistant_message(self, text: str)
Start a new assistant message bubble on the left.
↳ append_to_last_message(self, text: str)
Append text to the current assistant bubble (streaming).
↳ add_tool_indicator(self, name: str, status: str = 'running')
Add a small inline pill showing a tool execution.
↳ show_thinking(self)
Show the 'thinking' indicator at the bottom.
↳ hide_thinking(self)
Hide the thinking indicator.
↳ clear_chat(self)
Remove all messages and reset state.
↳ load_messages(self, messages: list[dict])
Bulk load messages into the chat view without repetitive scrolling.
↳ apply_theme(self)
Re-apply current theme colors to existing widgets.
↳ _hide_thinking(self)
↳ _finish_current_stream(self)
Lock the current bubble so future appends start a new one.
↳ _scroll_to_bottom(self)
Auto-scroll to the latest message.
↳ _create_bubble(self, text: str, is_user: bool, timestamp: str)
Create a message bubble frame with formatted text widget inside.
↳ _adjust_text_height(self, txt: ctk.CTkTextbox)
Dynamic height based on content lines.
↳ _render_formatted(self, txt: ctk.CTkTextbox, text: str)
Parse and insert markdown-like formatting into a CTkTextbox.
NOTE: CTkTextbox forbids 'font' in tag_config, so we use colors only.
↳ _insert_code_block(self, txt: ctk.CTkTextbox, code: str, lang: str = '')
Insert a code block with a dark background and copy button.
↳ _insert_inline_formatted(self, txt: ctk.CTkTextbox, text: str)
Process inline bold, italic, and inline code within a text segment.
Functions (1)
def _sanitize_markdown(text: str)
Escape HTML-like chars so tkinter Text widget stays safe.
Imports
__future__datetimegui.themesretkintertyping
▶gui/main_window.py1529 LOC
Dulus Main Window — customtkinter desktop GUI.
Provides a professional dark-themed interface with sidebar, chat area,
input bar, and top controls. Designed to be wired to a backend bridge
by another agent.
Classes (4)
class QualityMonitor
Monitors backend health and computes an overall quality score (0-100).
↳ __init__(self)
↳ set_connected(self, connected: bool)
↳ set_model_status(self, loaded: bool, name: str = '')
↳ record_response_time(self, ms: float)
Record a model response time in milliseconds.
↳ record_error(self)
↳ set_feature(self, name: str, available: bool)
↳ compute_score(self)
Compute overall quality score and return breakdown dict.
class CircularProgress
A circular progress indicator showing a 0-100 score with color coding.
↳ __init__(self, master, size: int = 36, **kwargs)
↳ _get_parent_bg(self)
Get the parent's background color for the canvas.
tk.Canvas is a native tkinter widget and cannot use customtkinter's
"transparent" fg_color, so we map it to a real hex color.
↳ _draw_progress(self)
Draw the circular progress arc and score text.
↳ set_score(self, score: int, color: str | None = None)
Update the displayed score and optionally the color.
↳ set_pulse(self, active: bool, color: str = '#4caf50')
Set the pulsing animation state for the connection dot.
↳ _animate_pulse(self)
Animate the pulse effect.
↳ apply_theme(self)
Update canvas background to match current theme.
class WebappLoader
A frame that loads and displays a web dashboard with navigation controls.
↳ __init__(self, master, **kwargs)
↳ _nav_back(self)
↳ _nav_forward(self)
↳ _nav_refresh(self)
↳ _nav_go(self)
↳ _on_url_enter(self, event = None)
↳ _push_history(self, url: str)
Add URL to history, trimming forward history.
↳ _load_url(self, url: str)
Render `url` INSIDE the content frame via embedded pywebview.
On Windows the only reliable native way to embed a real Chromium
rendering surface inside a tkinter frame is to (1) spawn pywebview
in a
↳ _try_embed_pywebview(self, url: str)
Return True if the embedded view at least STARTED.
Architecture (forced by pywebview 5.x):
pywebview refuses to start on anything but the main thread of
its host process, on every platform. That
↳ _teardown_embedded_hwnd(self)
Destroy a previous embedded window AND its subprocess so re-
clicks don't pile up zombie pywebview processes.
↳ load_dashboard(self, endpoint: str = 'http://127.0.0.1:5000/dashboard')
Load the Dulus dashboard endpoint (default = local webchat on :5000).
↳ set_sandbox_server(self, srv: SandboxServer)
Hand off the GUI-owned SandboxServer so the launcher can reach it.
↳ _launch_sandbox(self)
Open the Dulus Sandbox UI embedded inside the content frame.
URL selection (in order):
1. http://127.0.0.1:5000/sandbox/ — the existing /webchat server
if it's already running. This is the pre
↳ apply_theme(self)
Re-apply current theme colors.
class DulusMainWindow
Main Dulus application window.
↳ __init__(self)
↳ _build_sidebar(self)
↳ _build_main_area(self)
↳ _schedule_quality_update(self)
Schedule the next quality score update (every 5 seconds).
↳ _update_quality_display(self)
Read quality metrics and update the circular indicator.
↳ _start_pulse(self)
Start the pulsing animation on the status dot.
↳ _stop_pulse(self)
Stop the pulsing animation.
↳ _animate_pulse(self)
Animate the status dot with a pulse effect.
↳ _show_quality_tooltip(self)
Show a tooltip window with quality score breakdown.
↳ _on_send_click(self)
↳ _on_enter_key(self, event = None)
↳ _on_shift_enter(self, event = None)
↳ _on_new_chat_click(self)
↳ _on_settings_click(self)
↳ _on_model_change(self, model: str)
↳ _on_voice_click(self)
↳ _on_attach_click(self)
↳ _toggle_tasks_view(self)
↳ _show_tasks_view(self)
↳ _show_chat_view(self)
↳ _toggle_webapp_view(self)
↳ _show_webapp_view(self)
Show the webapp loader, hiding chat and input.
We deliberately do NOT auto-load a URL here. The previous version
called `load_dashboard()` on every show which forced a browser
open to a possibly-dead
↳ _hide_all_views(self)
Hide all main content views (chat, tasks, webapp).
↳ set_status(self, text: str, color: str = TEXT_DIM)
Update the status label and dot color.
↳ set_model(self, model: str)
Set the model selector value.
↳ _on_sidebar_session_select(self, sid: str)
↳ set_sessions(self, sessions: list[dict])
Populate the sidebar session list.
↳ set_active_session(self, session_id: str | None)
Mark a session as active in the sidebar.
↳ show_thinking(self)
Show assistant thinking indicator.
↳ hide_thinking(self)
Hide thinking indicator.
↳ add_assistant_chunk(self, text: str)
Append streaming text to the current assistant message.
↳ add_tool_call(self, name: str, status: str = 'running')
Show a tool execution pill.
↳ focus_input(self)
Move focus to the input box.
↳ apply_theme(self, theme_name: str)
Apply a color theme to the main window widgets.
↳ run(self)
Start the main loop.
Imports
__future__gui.chat_widgetgui.sandbox_servergui.sidebargui.tasks_viewgui.themesmaththreadingtimetkintertypingwebbrowser
▶gui/personas.py230 LOC
Persona system for Dulus GUI.
Loads the canonical persona definitions from .dulus-context/personas.json
and provides helpers for retrieving persona data and rendering cards in
customtkinter interfaces.
Classes (1)
class PersonaCard
A small card widget that displays a single persona's identity.
Usage::
card = PersonaCard(parent, persona=get_persona("kimi-code"))
card.pack(padx=10, pady=10, fill="both", expand=True)
↳ __init__(self, master: Any, persona: dict[str, Any], width: int = 340, height: int = 280, **kwargs)
↳ _build(self)
Functions (6)
def _load_json(path: Path | str | None = None)
Load and cache personas.json. Raises FileNotFoundError if missing.
def reload()
Force reload personas.json from disk and return the raw data.
def get_all_personas(path: Path | str | None = None)
Return all persona definitions as a list of dicts.
def get_persona(persona_id: str, path: Path | str | None = None)
Return a single persona by its ``id`` (e.g. ``'kevrojo'``).
def get_color_for_agent(agent_name: str, path: Path | str | None = None)
Return the hex color for an agent name/id (case-insensitive).
Falls back to the default Dulus accent ``#ff6b1f`` if unknown.
def get_display_name(agent_name: str, path: Path | str | None = None)
Return the pretty display name for an agent, or the raw name as fallback.
Imports
__future__jsonospathlibtyping
▶gui/sandbox_server.py152 LOC
Local HTTP server that serves the bundled sandbox/dist tree.
Why a dedicated server (vs file:// URLs):
The sandbox's built index.html references its bundled JS as
`/sandbox/assets/...` — absolute paths under a `/sandbox/` prefix.
file:// URLs would either need rewriting (fragile) or would 404
every asset. A trivial localhost HTTP server serving the right
base path is the cheapest correct fix.
Why an ephemeral port:
Avoids colliding with whatever the user already has on :5000 /
:8
Classes (2)
class _SandboxRequestHandler
Serves the sandbox/dist tree under a /sandbox/ URL prefix.
Strips a leading /sandbox/ from the path so the built index.html's
`/sandbox/assets/...` references resolve relative to dist root.
Falls back to serving / as index.html for convenience.
↳ translate_path(self, path: str)
↳ log_message(self, format, *args)
class SandboxServer
Singleton-ish local HTTP server for the sandbox UI.
Usage:
srv = SandboxServer()
if srv.available:
srv.start() # idempotent
url = srv.url # "http://127.0.0.1:<port>/sandbox/"
...
srv.stop() # on GUI shutdown
↳ __init__(self)
↳ available(self)
True if the sandbox bundle was found on disk.
↳ url(self)
Full URL to the sandbox entry — empty string if not started.
↳ start(self)
Start the server (idempotent). Returns the URL.
↳ stop(self)
Stop the server and wait for the worker thread to exit.
Functions (1)
def _resolve_sandbox_dist()
Find sandbox/dist whether running from source tree or installed wheel.
Mirrors the resolution in sandbox_bootstrap.py so source-tree dev runs
and pip-installed runs both work.
Imports
__future__http.serverpathlibsocketserverthreadingtyping
▶gui/session_utils.py204 LOC
Utility functions for managing Dulus GUI sessions.
Functions (5)
def build_title(messages: list[dict])
Generate a descriptive title from the first user message.
def _read_session_meta(path: Path)
Read session metadata with mtime caching.
def scan_sessions()
Scan daily session directories and return sorted list of metadata.
Single source of truth for listing: only daily/ folder is scanned.
Other locations (root sessions/, checkpoints) continue to exist for
internal use but are not listed to avoid duplicates.
def save_session(state, config: dict, session_id: str | None = None)
Save AgentState to disk in standard Dulus format. Returns the session_id.
def delete_session(session_id: str)
Delete all session files related to the given ID. Returns True if any deleted.
Imports
configdatetimejsonpathlibuuid
▶gui/settings_dialog.py146 LOC
Settings popup for Dulus GUI.
Classes (1)
class SettingsDialog
Floating settings window.
↳ __init__(self, master, config: dict)
↳ _save(self)
Functions (1)
def _build_model_list()
Build list of provider/model strings from PROVIDERS registry.
Imports
__future__configcustomtkintergui.themesostyping
▶gui/sidebar.py541 LOC
Left sidebar panel for Dulus GUI.
Provides session history, model selector, context-usage bar,
quick-command buttons, available-tools list, and version info.
Classes (1)
class DulusSidebar
Left sidebar with session history, model selector, context bar, tools, and quick commands.
↳ __init__(self, master, bridge = None, on_new_chat: Callable[[], None] | None = None, on_command: Callable[[str], None] | None = None, on_model_change: Callable[[str], None] | None = None, on_session_select: Callable[[str], None] | None = None, **kwargs)
↳ _build_ui(self)
↳ _create_session_row(self, sid: str, title: str)
Create a single session row (button + delete) in the sidebar.
↳ set_sessions(self, sessions: list[dict])
Update the session history list in the sidebar (incremental diff).
↳ _on_delete_click(self, session_id: str)
Handle session deletion from the sidebar.
↳ set_active_session(self, session_id: str | None)
Mark a session as active in the sidebar.
↳ _highlight_active_session(self)
↳ refresh_sessions(self)
Load session history from all session directories (auto-scan).
↳ _on_settings_click(self)
↳ _on_session_click(self, sid: str)
↳ _refresh_tools(self)
Populate the tools list from the registry.
↳ _refresh_model_list(self)
Populate the model dropdown from curated list.
↳ update_context_bar(self)
Refresh the context usage progress bar (call from UI thread).
↳ _on_new_chat_click(self)
↳ _on_command_click(self, cmd: str)
↳ _on_model_change(self, model: str)
↳ _on_session_click(self, path: str)
↳ apply_theme(self, t: dict | None = None)
Re-apply current theme colors to all sidebar widgets.
Imports
__future__configgui.session_utilsgui.themesjsonospathlibproviderstool_registrytyping
▶gui/tasks_view.py499 LOC
Dulus Tasks View — professional Kanban-style task board v2.
Reads tasks from .dulus-context/tasks.json and displays them in a
three-column layout: Pending | In Progress | Completed.
v2 improvements:
- Filter by owner (agent) and phase (week)
- Priority badges (CRITICAL/HIGH/MEDIUM/LOW)
- Agent color coding
- Auto-refresh via file polling
- Phase grouping separators
- Owner summary stats
Classes (2)
class TaskCard
A single task card widget with priority, agent color, and phase.
↳ __init__(self, master, task: dict, **kwargs)
↳ _build(self)
↳ _toggle_expand(self)
class TasksView
Professional Kanban task board for Dulus with filters and auto-refresh.
↳ __init__(self, master, tasks_file: Path | str | None = None, **kwargs)
↳ _build_ui(self)
↳ _create_column(self, parent, col: int, title: str, color: str, status_key: str)
↳ _load_tasks(self)
↳ _matches_filters(self, task: dict)
↳ refresh(self)
↳ _check_file_changed(self)
↳ _start_polling(self)
↳ apply_theme(self)
Re-apply current theme colors to persistent widgets.
↳ destroy(self)
Functions (1)
def _fmt_date(iso: str)
Imports
__future__datetimegui.themesjsonospathlibthreadingtyping
▶gui/themes.py287 LOC
Theme system for Dulus GUI.
Provides multiple color presets that can be switched at runtime.
Functions (5)
def get_theme()
Return the currently active theme colors.
def set_theme(name: str)
Activate a theme by name. Returns the theme dict or None if unknown.
def list_themes()
Return available theme names.
def get_quality_color(score: int)
Return a color for the given quality score (0-100).
0-40: red (poor)
41-70: yellow (fair)
71-100: green (good)
def get_quality_label(score: int)
Return a human-readable label for the quality score.
Imports
__future__
▶gui/tool_panel.py94 LOC
Side panel showing active tool executions.
Classes (1)
class ToolPanel
Panel that displays running and completed tools.
↳ __init__(self, master, **kwargs)
↳ add_tool(self, name: str, status: str = 'running')
↳ update_tool(self, name: str, status: str = 'done', result: str = '')
↳ clear_tools(self)
Imports
__future__customtkinter
▶input.py1153 LOC
prompt_toolkit-based REPL input with typing-time slash-command autosuggest.
Optional dependency: when prompt_toolkit is not installed, HAS_PROMPT_TOOLKIT
is False and callers should fall through to readline-based input.
Dependency-injected: callers register command/meta providers via setup()
before calling read_line(). This module never imports Dulus core — keeping
the dependency one-way and eliminating any circular-import risk.
Classes (1)
class _OutputRedirector
Redirects stdout to the split layout output buffer.
Thread-safe: multiple threads (main REPL, Telegram bg runner, sentinel)
may write concurrently. A lock prevents buffer corruption.
CRITICAL: Strips cursor-movement ANSI sequences ([A, [2K, etc.)
before storing. These sequences come from Rich Li
↳ __init__(self, original)
↳ _strip_cursor_ansi(text: str)
Remove cursor-control ANSI sequences; keep color/style ones.
↳ write(self, text: str)
↳ flush(self)
↳ reset(self)
Clear internal buffer and line-open state.
Call at the start of each turn to prevent residual buffered text
from concatenating with the new turn's output.
↳ isatty(self)
Functions (18)
def setup(commands_provider: Callable[[], dict], meta_provider: Callable[[], dict], toolbar_provider: Optional[Callable[[], str]] = _TOOLBAR_SENTINEL)
Register providers for the live command registry and metadata.
`commands_provider` returns the dispatcher's COMMANDS dict.
`meta_provider` returns the _CMD_META dict (descriptions + subcommands).
`toolbar_provider` returns an ANSI toolbar string (or "" to hide).
Pass None explicitly to clear a prev
def reset_session()
Drop the cached session so the next read_line() rebuilds from scratch.
def _build_session(history_path: Optional[Path])
def read_line(prompt_ansi: str, history_path: Optional[Path] = None)
Read one line of input via prompt_toolkit; caches the session across calls.
The history file passed here MUST NOT be the readline history file — the
two line-editors use incompatible formats. See Dulus REPL for the
dedicated PT_HISTORY_FILE.
def set_hide_sender(enabled: bool)
Toggle whether the typed message gets echoed above the sticky bar.
def _count_deduped_recent()
Count non-consecutive-duplicate entries in _RECENT_USER_MSGS (same key as render).
def add_recent_msg(text: str)
Push a user message into the recent-history strip (sliding window).
def read_line_split(prompt: str = '> ', history_path: Optional[Path] = None)
Read input with split layout - fixed bottom bar, scrollable output above.
Similar to Kimi Code and Claude Code interfaces.
def append_output(text: str)
Append text to the output buffer (for split layout mode).
Use this to display messages without interrupting the input bar.
def clear_split_output()
Clear the split layout output buffer.
def get_original_stdout()
Return the real stdout before patch_stdout/_OutputRedirector wrapping.
def set_stdout_bypass(active: bool)
Temporarily bypass the _OutputRedirector and write directly to the real terminal.
Call with active=True before a background turn, active=False after.
This makes background output look identical to NOTIFICATION SYSTEM NEEDED —
no fragmentation, no ^M/^J, because the real terminal handles \r natively
def set_notification_callback(callback: Callable[[str], None])
Register a callback to handle background notifications.
The callback will be called with the notification text when it's safe
to display (during the next input cycle or when input is not active).
def queue_notification(text: str)
Queue a notification to be displayed safely.
This should be used by background threads (timers, jobs, etc.) to
display messages without corrupting the prompt_toolkit input bar.
def drain_notifications()
Drain all pending notifications from the queue.
Returns a list of notification texts. Should be called when it's
safe to display output (e.g., before showing a new prompt).
def safe_print_notification(text: str, end: str = '\n', flush: bool = False)
Print a notification in a prompt_toolkit-safe way.
If split layout is active, uses run_in_terminal.
Otherwise prints directly (which may cause display issues in sticky mode).
def set_toolbar_status(text: str)
Set a short status string to be shown in the bottom toolbar.
Thread-safe. Automatically invalidates the display if split layout is active.
Pass "" to clear.
def request_exit()
Signal the active prompt session to exit immediately.
Returns True if successfully signaled, False if no prompt is active.
Thread-safe.
Imports
__future__pathlibqueuerethreadingtimetyping
▶license_manager.py187 LOC
Dulus License Manager — Offline-first key validation + feature gating.
Tiers:
FREE No key required. Limited tool calls, local providers only.
PRO $15/mo. Full features, BYOK, priority support.
ENTERPRISE $50/mo. Team features + admin dashboard + SSO (future).
Key format (offline):
DULUS-<base64(json_payload + ":" + hmac_signature)>
The secret lives in ~/.dulus/.license_secret (never commit this file).
If the secret file is missing we fall back to a hardcoded dev-key s
Classes (2)
class LicenseTier
class LicenseManager
Parse and validate a Dulus license key.
↳ __init__(self, key: Optional[str] = None)
↳ _validate(self)
↳ can_use(self, feature: str)
Check if a feature is allowed by current tier.
↳ max_tool_calls(self)
↳ max_providers(self)
↳ max_subagents(self)
↳ max_plugins(self)
↳ allow_cloudsave(self)
↳ allow_voice(self)
↳ allow_telegram(self)
↳ allow_mcp(self)
↳ status_banner(self)
Functions (1)
def _generate_key(tier: str, days: int, secret: str)
Generate a signed license key (Kev-only tool).
Imports
__future__base64hashlibhmacjsonospathlibsystimetyping
▶memory/__init__.py93 LOC
Memory package for dulus.
Provides persistent, file-based memory across conversations.
Storage layout:
user scope : ~/.dulus/memory/<slug>.md (shared across projects)
project scope : .dulus/memory/<slug>.md (local to cwd)
The MEMORY.md index in each directory is auto-maintained and injected
into the system prompt so Claude has an overview of available memories.
Public API (backward-compatible with the old memory.py module):
MemoryEntry — dataclass for a single
Imports
consolidatorcontextpalacescanstoretypes
▶memory/audit.py51 LOC
Audit trail for Dulus RTK — logs all tool operations.
Functions (4)
def _ensure_dir()
def log_operation(tool_name: str, params: Dict[str, Any], result_preview: str = '')
Log a tool operation with timestamp.
def _trim_audit()
Keep audit file under max lines.
def get_recent(n: int = 50)
Return last N audit entries.
Imports
__future__jsonpathlibtimetyping
▶memory/consolidator.py312 LOC
Memory consolidator: extract long-term insights from completed sessions.
Called manually via `/memory consolidate` or programmatically after a session.
Uses a lightweight AI call to identify user preferences, feedback corrections,
and project decisions worth promoting to persistent semantic memory.
Design principles:
- Hard cap of 3 memories per session to avoid noise accumulation
- Auto-extracted memories start at 0.8 confidence (below explicit user saves)
- Won't overwrite a higher-confidenc
Functions (4)
def consolidate_session(messages: list, config: dict)
Analyze a session's messages and extract memories worth keeping long-term.
def mine_files(file_paths: list[str], config: dict, max_files: int = 15, max_bytes: int = 20000)
Read each file and create a 'project' memory for the relevant ones.
Used on session exit when MemPalace is ON to capture context about
files the user worked on. Returns the list of saved memory names.
def snapshot_memory_files()
Return the current set of .md files (absolute paths) in the user
memory directory. Use before consolidate_session, then call
new_memory_files(snapshot) after to get only what was just created.
def new_memory_files(snapshot: set[str])
Return .md files in the user memory directory that weren't in `snapshot`.
Imports
__future__datetime
▶memory/context.py268 LOC
Memory context building for system prompt injection.
Provides:
get_memory_context() — full context string for system prompt
find_relevant_memories() — keyword (+ optional AI) relevance filtering
truncate_index_content() — line + byte truncation with warning
Functions (4)
def truncate_index_content(raw: str)
Truncate MEMORY.md content to line AND byte limits, appending a warning.
Matches Claude Code's truncateEntrypointContent:
- Line-truncates first (natural boundary)
- Then byte-truncates at the last newline before the cap
- Appends which limit fired
def get_memory_context(include_guidance: bool = False)
Return memory context for injection into the system prompt.
Combines user-level and project-level MEMORY.md content (if present).
Returns empty string when no memories exist.
Args:
include_guidance: if True, prepend the full memory system guidance
(MEMORY_SYSTEM_PROMPT).
def find_relevant_memories(query: str, max_results: int = 5, use_ai: bool = False, config: dict | None = None)
Find memories relevant to a query.
Strategy:
1. Always: keyword match on name + description + content
2. If use_ai=True and config has a model: use a small AI call to rank
Returns:
List of dicts with keys: name, description, type, scope, content,
file_path, mtime_s, freshness_text
def _ai_select_memories(query: str, candidates: list, max_results: int, config: dict)
Use a fast AI call to select the most relevant memories from candidates.
Falls back to keyword results on any error.
Imports
__future__pathlibscanstoretypes
▶memory/offload.py166 LOC
Tmux Offload tool implementation for backgrounding heavy tasks.
Functions (2)
def _tmux_offload(params: dict, config: dict)
Implement the TmuxOffload tool.
def register_offload_tool()
Imports
__future__datetimejsonospathlibtmux_toolstool_registryuuid
▶memory/palace.py127 LOC
Memory Palace: Day 1 initialization of essential long-term memory buckets.
Functions (1)
def ensure_memory_palace()
Check if the user memory directory is empty/new and initialize default buckets.
Returns:
True if initialization was performed, False otherwise.
Imports
__future__datetimepathlibstore
▶memory/scan.py146 LOC
Memory file scanning with mtime tracking and freshness/age helpers.
Mirrors the key ideas from Claude Code's memoryScan.ts and memoryAge.ts:
- Scan memory directories, sort newest-first
- Format a manifest for display or AI relevance selection
- Report memory age in human-readable form ("today", "3 days ago")
- Emit a staleness caveat for memories older than 1 day
Classes (1)
class MemoryHeader
Lightweight descriptor loaded from a memory file's frontmatter.
Attributes:
filename: basename of the .md file
file_path: absolute path
mtime_s: modification time (seconds since epoch)
description: value from frontmatter `description:` field
type: value from fron
Functions (6)
def scan_memory_dir(mem_dir: Path, scope: str)
Scan a single memory directory and return headers sorted newest-first.
Reads only the frontmatter (first ~30 lines) for efficiency.
Silently skips unreadable files. Caps at MAX_MEMORY_FILES entries.
def scan_all_memories()
Scan both user and project memory directories, merged newest-first.
def memory_age_days(mtime_s: float)
Days since mtime_s (floor-rounded, clamped to 0 for future times).
def memory_age_str(mtime_s: float)
Human-readable age: 'today', 'yesterday', or 'N days ago'.
def memory_freshness_text(mtime_s: float)
Staleness caveat for memories older than 1 day (empty string if fresh).
Motivated by user reports of stale code-state memories (file:line
citations to code that has since changed) being asserted as fact.
def format_memory_manifest(headers: list[MemoryHeader])
Format a list of MemoryHeader as a text manifest.
Format per line: [type/scope] filename (age): description
Example:
[feedback/user] feedback_testing.md (3 days ago): Don't mock DB in tests
[project/project] project_freeze.md (today): Merge freeze until 2026-04-10
Imports
__future__dataclassesmathpathlibstoretime
▶memory/sessions.py100 LOC
Historical session search utility.
Functions (1)
def search_session_history(query: str, max_results: int = 5)
Search for a query string across historical session logs.
Checks both history.json (master) and daily/ copier directories.
Returns list of hits: {session_id, saved_at, hits: [{role, content_snippet}]}.
Imports
__future__configdatetimejsonpathlib
▶memory/store.py395 LOC
File-based memory storage with user-level and project-level scopes.
Storage layout:
user scope : ~/.dulus/memory/<slug>.md
project scope : .dulus/memory/<slug>.md (relative to cwd)
Search uses token-based fuzzy matching with field weighting
(name 3×, description 2×, content 1×) for better recall than
simple substring matching.
MEMORY.md in each directory is the index file — rebuilt automatically after
every save/delete. It is loaded into the system prompt to give Dulus an
Classes (1)
class MemoryEntry
A single memory entry loaded from a .md file.
Attributes:
name: human-readable name (also the display title in the index)
description: short one-line description (used for relevance decisions)
type: "user" | "feedback" | "project" | "reference"
hall:
Functions (16)
def get_project_memory_dir()
Return the project-local memory directory (relative to cwd).
def get_memory_dir(scope: str = 'user')
Return the memory directory for the given scope.
Args:
scope: "user" (global ~/.dulus/memory) or
"project" (.dulus/memory relative to cwd)
def _slugify(name: str)
Convert name to a filesystem-safe slug (max 60 chars).
def parse_frontmatter(text: str)
Parse ---\nkey: value\n---\nbody format.
Returns:
(meta_dict, body_str)
def _format_entry_md(entry: MemoryEntry)
Render a MemoryEntry as a markdown file with YAML frontmatter.
def save_memory(entry: MemoryEntry, scope: str = 'user')
Write/update a memory file and rebuild the index for that scope.
If a memory with the same name (slug) already exists, it is overwritten.
Args:
entry: MemoryEntry to persist
scope: "user" or "project"
def delete_memory(name: str, scope: str = 'user')
Remove the memory file matching name and rebuild the index.
No error if not found.
def load_entries(scope: str = 'user')
Scan all .md files (except MEMORY.md) in a scope and return entries.
Returns:
List of MemoryEntry sorted alphabetically by name.
def load_index(scope: str = 'all')
Load memory entries from one or both scopes.
Args:
scope: "user", "project", or "all" (both combined)
Returns:
List of MemoryEntry (user entries first, then project).
def _tokenize(text: str)
Split text into lowercase tokens (words).
def _token_score(query_tokens: list[str], text: str)
Score how well query tokens match a text field.
For each query token, find the best match among text tokens using
SequenceMatcher (handles typos, partial matches, synonyms-by-prefix).
Returns average best-match ratio (0.0–1.0).
def search_memory(query: str, scope: str = 'all', hall: str = '', min_score: float = 0.35)
Token-based fuzzy search on name + description + content.
Scores each memory using weighted field matching:
name × 3.0 + description × 2.0 + content × 1.0
Args:
query: search query string
scope: "user", "project", or "all"
hall: optional hall filter ("facts", "events", e
def _rewrite_index(scope: str)
Rebuild MEMORY.md for the given scope from all .md files in that dir.
def get_index_content(scope: str = 'user')
Return raw MEMORY.md content for the given scope, or '' if absent.
def check_conflict(entry: 'MemoryEntry', scope: str = 'user')
Check whether a same-named memory already exists with different content.
Returns a dict with the existing memory's key fields if a conflict is found,
or None if no existing file or if the content is identical.
def touch_last_used(file_path: str)
Update the last_used_at frontmatter field of a memory file to today.
Called by MemorySearch when a memory is returned so staleness/utility
tracking stays current. Silent on any error.
Imports
__future__dataclassesdifflibpathlibreunicodedata
▶memory/tools.py447 LOC
Memory tool registrations: MemorySave, MemoryDelete, MemorySearch.
Importing this module registers the three tools into the central registry.
Functions (7)
def _mempalace_env()
Return environment dict with UTF-8 overrides for Windows safety.
def _is_mempalace_initialized()
Check whether MemPalace has been init'd (config dir exists).
def _ensure_mempalace_initialized(mem_dir: 'Path')
Run ``mempalace init`` if the global palace has never been set up.
This is a no-op for existing installations so it is safe to call on every
startup / save.
def _memory_save(params: dict, config: dict)
Save or update a persistent memory entry, with conflict detection.
def _memory_delete(params: dict, config: dict)
Delete a persistent memory entry by name.
def _memory_search(params: dict, config: dict)
Search memories by keyword query with optional AI relevance filtering.
Results are ranked by: confidence × recency (30-day exponential decay).
def _memory_list(params: dict, config: dict)
List all memory entries with type, scope, age, confidence, and description.
Imports
__future__contextdatetimescansessionsstoretool_registry
▶memory/types.py114 LOC
Memory type and hall taxonomy with system-prompt guidance text.
Four types capture context NOT derivable from the current project state.
Code patterns, architecture, git history, and file structure are derivable
(via grep/git/CLAUDE.md) and should NOT be saved as memories.
Halls categorize memories by their nature (orthogonal to type):
facts, events, discoveries, preferences, advice.
▶memory/vector_search.py92 LOC
Vector search for memories using TF-IDF (pure Python, zero deps).
Functions (4)
def _tokenize(text: str)
def _tfidf_vectors(docs: List[str])
def _cosine(a: Counter, b: Counter)
def search_similar_memories(query: str, memories: List[Tuple[str, str]], top_k: int = 5)
Search memories by semantic similarity.
Args:
query: search query text
memories: list of (id, content) tuples
top_k: number of results to return
Returns:
list of (memory_id, score) sorted by relevance
Imports
__future__collectionsmathretyping
▶memory.py11 LOC
Backward-compatibility shim — real implementation is in memory/ package.
Imports
memory.contextmemory.store
▶multi_agent/__init__.py23 LOC
Multi-agent package for dulus.
Provides:
- AgentDefinition — typed agent definition (name, system_prompt, model, tools)
- SubAgentTask — lifecycle-tracked task
- SubAgentManager — thread-pool manager for spawning agents
- load_agent_definitions / get_agent_definition — agent registry
Imports
subagent
▶multi_agent/subagent.py501 LOC
Threaded sub-agent system for spawning nested agent loops.
Classes (3)
class AgentDefinition
Definition for a specialized agent type.
class SubAgentTask
Represents a sub-agent task with lifecycle tracking.
class SubAgentManager
Manages concurrent sub-agent tasks using a thread pool.
↳ __init__(self, max_concurrent: int = 5, max_depth: int = 5)
↳ spawn(self, prompt: str, config: dict, system_prompt: str, depth: int = 0, agent_def: Optional[AgentDefinition] = None, isolation: str = '', name: str = '')
Spawn a new sub-agent task.
Args:
prompt: user message for the sub-agent
config: agent configuration dict (copied before modification)
system_prompt: base system prompt
de
↳ wait(self, task_id: str, timeout: float = None)
Block until a task completes or timeout expires.
Returns:
The task, or None if task_id is unknown.
↳ get_result(self, task_id: str)
Return the result string for a completed task, or None.
↳ list_tasks(self)
Return all tracked tasks.
↳ send_message(self, task_id_or_name: str, message: str)
Send a message to a running background agent.
If the agent is currently blocked on an AskMainAgentQuestion call, the
message is delivered immediately as the answer and the agent resumes
the SAME turn
↳ cancel(self, task_id: str)
Request cancellation of a running task.
Returns:
True if the cancel flag was set, False if task not found or not running.
↳ shutdown(self)
Cancel all running tasks and shut down the thread pool.
Functions (8)
def _parse_agent_md(path: Path, source: str = 'user')
Parse a .md file with optional YAML frontmatter into an AgentDefinition.
File format:
---
description: "Short description"
model: claude-haiku-4-5-20251001
tools: [Read, Write, Edit, Bash]
---
System prompt body goes here...
def load_agent_definitions()
Load all agent definitions: built-ins → user-level → project-level.
Search paths:
~/.dulus/agents/*.md (user-level)
.dulus/agents/*.md (project-level, overrides user)
def get_agent_definition(name: str)
Look up an agent definition by name. Returns None if not found.
def _git_root(cwd: str)
Return the git root directory for cwd, or None if not in a git repo.
def _create_worktree(base_dir: str)
Create a temporary git worktree.
Returns:
(worktree_path, branch_name)
Raises:
subprocess.CalledProcessError or OSError on failure.
def _remove_worktree(wt_path: str, branch: str, base_dir: str)
Remove a git worktree and delete its branch (best-effort).
def _agent_run(prompt, state, config, system_prompt, depth = 0, cancel_check = None)
Lazy-import wrapper to avoid circular dependency with agent module.
Uses absolute import so this works whether called from inside or outside
the multi_agent package (sys.path includes the project root).
def _extract_final_text(messages)
Walk backwards through messages, return first assistant content string.
Imports
__future__concurrent.futuresdataclassesospathlibqueuesubprocesstempfiletypinguuid
▶multi_agent/tools.py393 LOC
Multi-agent tool registrations.
Registers the following tools into the central tool_registry:
Agent — spawn a sub-agent (always background)
SendMessage — send a message to a named background agent
CheckAgentResult — check status/result of a background agent
ListAgentTasks — list all active/finished agent tasks
ListAgentTypes — list available agent type definitions
Functions (7)
def get_agent_manager()
Return (and lazily create) the process-wide SubAgentManager.
def _agent_tool(params: dict, config: dict)
Spawn a sub-agent.
Reads from config:
_system_prompt — injected by agent.py run(), used as base system prompt
_depth — current nesting depth (prevents infinite recursion)
def _send_message(params: dict, config: dict)
def _check_agent_result(params: dict, config: dict)
def _list_agent_tasks(params: dict, config: dict)
def _ask_main_agent_question(params: dict, config: dict)
Pause a sub-agent and ask the main agent a question.
The sub-agent blocks on a threading.Event in its current turn (preserving
full context). The main agent receives a system message naming the
sub-agent and the question; it replies using SendMessage(to=<name>, ...).
def _list_agent_types(params: dict, config: dict)
Imports
__future__subagenttool_registry
▶offload_helper.py228 LOC
Offload Helper - Reemplazo para TmuxOffload
Funciona con las herramientas tmux que sí funcionan
Classes (1)
class TmuxJob
Representa un job ejecutado en tmux
↳ __init__(self, command: str)
↳ start(self)
Inicia el job en tmux detached. Retorna session ID.
Defense-in-depth against leaked sessions:
1. The command line itself ends in `tmux kill-session -t <id>`
so the session self-destructs
↳ is_running(self)
Verifica si el job sigue corriendo
↳ capture(self, lines: int = 1000)
Captura el output del job
↳ kill(self)
Mata el job y la sesión tmux
↳ wait(self, timeout: Optional[float] = None, poll_interval: float = 0.5)
Espera a que termine el job.
Retorna True si terminó, False si timeout.
Functions (3)
def offload(command: str)
Ejecuta un comando en tmux detached (fire-and-forget).
Retorna el session ID para capturar después.
Uso:
session = offload("sleep 10 && echo listo")
# ... más tarde ...
tmux capture-pane -t <session>:0.0 -p
def offload_and_wait(command: str, timeout: Optional[float] = None)
Ejecuta comando y espera a que termine.
Uso:
result = offload_and_wait("sleep 5 && date", timeout=10)
print(result['output']) # stdout del comando
Note: TmuxJob.start() appends a `tmux kill-session` self-destruct to
avoid leaked sessions on misconfigured hosts. That means the pane
def list_offloaded()
Lista todas las sesiones dulus activas
Imports
subprocesstimetypinguuid
▶paste_placeholders.py49 LOC
Paste placeholder support — fold large pastes into a compact token.
When a user pastes a large block of text (>10 lines or >2000 chars), it gets
stored in a dictionary and replaced with a short ``<<PASTE:xxxx>>`` token in
the input buffer. Before the message reaches the agent, ``expand()`` swaps
the tokens back to full text.
Functions (3)
def maybe_placeholderize(text: str)
Return *text* unchanged if small, or a ``<<PASTE:…>>`` token if large.
def expand(text: str)
Replace any ``<<PASTE:…>>`` tokens in *text* with the original content.
def clear()
Drop all stored pastes (call between sessions if desired).
Imports
__future__hashlibtyping
▶plugin/__init__.py22 LOC
Plugin system for dulus.
Imports
loaderrecommendstoretypes
▶plugin/autoadapter.py1755 LOC
Auto-Adapter: Static analysis + AI to generate manifests for external repos.
Functions (20)
def _sanitize_python_code(code: str)
Fix common JSON-to-Python spills like true/false/null.
def _analyze_repository(plugin_dir: Path | str, verbose: bool = False)
Scan the repository for structure, functions, and dependencies (no execution).
def _extract_exports(code: str)
Extract public functions and classes from Python code using AST.
def generate_plugin_files(plugin_dir: Path, safe_name: str, config: dict)
Use AI to generate plugin_tool.py and plugin.json based on analysis.
def _compile_check(plugin_dir: Path)
Hard syntax check on plugin_tool.py.
def _load_plugin_module(plugin_dir: Path, safe_name: str)
Import plugin_tool.py and return (module_or_None, error_or_empty).
def _get_bloat_cap()
Read the smoke-test output-bloat cap from config (user-tunable).
Returns the cap in chars, or None if the user hasn't configured one
(caller then falls back to the in-code default of 8000). The previous
hardcoded 2500 was too aggressive — most legitimate API responses
(yfinance info, sherlock multi
def _smoke_test_tool(td: Any)
Run a single tool with minimal valid params, mirroring execute_tool()'s
stdout/stderr capture. Many plugin tools `print()` their output instead of
returning it, so we MUST capture stdout or we will wrongly report "empty".
def _build_todo_items(plugin_dir: Path, safe_name: str)
Derive a structured todo list directly from the generated tools.
Each item: {title, verify, status}
verify is one of: 'compile' | 'import' | 'exports' | ('smoke', tool_name)
def _write_todo_file(plugin_dir: Path, safe_name: str, items: list[dict])
def _mark_task(todo_path: Path, title: str, status: str)
status: 'done' (x) or 'fail' (still [ ] but with FAILED tag)
def _run_verification(plugin_dir: Path, safe_name: str, verify: Any)
Dispatch to the right verification routine.
def _read_relevant_sources(plugin_dir: Path, error_msg: str, max_chars: int = 6000)
Read actual source files from the plugin repo to give the fix AI real API context.
Prioritizes files whose names appear in the error message, then __init__.py files.
def _attempt_fresh_start(plugin_dir: Path, safe_name: str, accumulated_errors: list[str], analysis: dict, config: dict)
Full rewrite of plugin_tool.py from scratch after repeated fix failures.
Feeds all accumulated error history so the agent doesn't repeat the same mistakes.
def _attempt_fix(plugin_dir: Path, safe_name: str, task_title: str, error_msg: str, analysis: dict, config: dict, original_goal: str | None = None, state = None, generation_context: str = '')
Run a full tool-enabled agent turn to fix a failing task.
The agent has Read/Write/Edit/Bash/Grep/WebSearch — same as normal Dulus.
Reuses existing state if provided (for multi-attempt fixes), otherwise creates new state.
Returns (success, state) so state can be reused for next attempt.
Args:
g
def _run_adapter_worker(plugin_dir: Path, safe_name: str, analysis: dict, config: dict, generator_context: str = '')
Worker loop: derive todo from generated tools, verify each, fix failures.
Returns True only if every required task passes.
Args:
generator_context: Context from generation phase (reasoning text) to help fix agent understand the library
def _remove_failed_tools(plugin_dir: Path, safe_name: str, failed_tool_names: list[str], verbose: bool = False)
Update plugin_tool.py to only include working tools in TOOL_DEFS and TOOL_SCHEMAS.
Keeps all the original code, just updates the export lists.
def _update_plugin_json_tools(plugin_dir: Path, safe_name: str, working_tool_names: list[str])
Update plugin.json to reflect only the working tools.
def _validate_generated_tools(plugin_dir: Path, safe_name: str)
Backward-compat shim — runs the worker without fix attempts (no AI).
def autoadapt_if_needed(plugin_dir: Path, name: str, config: dict)
Main entry point: check if manifest is missing and try to generate it.
Imports
__future__astcommonjsonmemory.contextmemory.sessionsospathlibproviderssystoolstypestyping
▶plugin/loader.py156 LOC
Plugin loader: discover and load tools/skills/mcp from installed plugins.
Functions (8)
def scrub_any_type(obj: Any)
Recursively remove 'type': 'any' from schema dictionaries as it's not valid JSON Schema.
def load_all_plugins(scope: PluginScope | None = None)
Return enabled plugins (optionally filtered by scope).
def load_plugin_tools(scope: PluginScope | None = None)
Import tool modules from all enabled plugins and collect their TOOL_SCHEMAS.
Returns combined list of tool schema dicts.
def reload_plugins(scope: PluginScope | None = None)
Reload all plugins and register their tools.
Returns a dict with counts of what was reloaded.
def register_plugin_tools(scope: PluginScope | None = None)
Import tool modules from enabled plugins and register them into tool_registry.
Returns number of tools registered.
def load_plugin_skills(scope: PluginScope | None = None)
Return paths to skill markdown files from enabled plugins.
def load_plugin_mcp_configs(scope: PluginScope | None = None)
Return mcp server configs contributed by enabled plugins.
def _import_plugin_module(entry: PluginEntry, module_name: str)
Dynamically import a module from a plugin directory.
Imports
__future__importlib.utilpathlibstoresystypestyping
▶plugin/recommend.py211 LOC
Plugin recommendation engine: match installed + marketplace plugins to context.
Classes (1)
class PluginRecommendation
Functions (5)
def _tokenize(text: str)
Lower-case word tokens from text.
def _score_against_context(entry: dict, context_tokens: set[str])
Return (score, reasons) for a marketplace entry vs context tokens.
def recommend_plugins(context: str, top_n: int = 5, include_installed: bool = False)
Given a natural-language context string (e.g. current task description or
user message), return up to top_n plugin recommendations sorted by relevance.
Args:
context: Free-text description of the current task / need.
top_n: Maximum number of recommendations.
include_installed: If True,
def recommend_from_files(paths: list[Path], top_n: int = 5)
Recommend plugins based on the types of files in the current project.
def format_recommendations(recs: list[PluginRecommendation])
Imports
__future__dataclassespathlibrestoretypestyping
▶plugin/store.py387 LOC
Plugin store: install/uninstall/enable/disable/update + config persistence.
Functions (21)
def _project_plugin_dir()
def _project_plugin_cfg()
def _read_cfg(cfg_path: Path)
def _write_cfg(cfg_path: Path, data: dict)
def _plugin_dir_for(scope: PluginScope)
def _plugin_cfg_for(scope: PluginScope)
def list_plugins(scope: PluginScope | None = None)
Return all installed plugins (optionally filtered by scope).
def get_plugin(name: str, scope: PluginScope | None = None)
def install_plugin(identifier: str, scope: PluginScope = PluginScope.USER, force: bool = False)
Install a plugin. identifier = 'name' | 'name@git_url' | 'name@local_path'.
Returns (success, message).
def _is_git_url(source: str)
def _clone_plugin(url: str, dest: Path)
def _install_dependencies(deps: list[str], cwd: Path | None = None)
def _update_plugin_list_memory(scope: PluginScope)
def _save_entry(entry: PluginEntry)
def _remove_entry(name: str, scope: PluginScope)
def uninstall_plugin(name: str, scope: PluginScope | None = None, keep_data: bool = False)
def _set_enabled(name: str, scope: PluginScope | None, enabled: bool)
def enable_plugin(name: str, scope: PluginScope | None = None)
def disable_plugin(name: str, scope: PluginScope | None = None)
def disable_all_plugins(scope: PluginScope | None = None)
def update_plugin(name: str, scope: PluginScope | None = None)
Imports
__future__jsonospathlibshutilstatsubprocesssystypestyping
▶plugin/types.py147 LOC
Plugin system types: manifest, entry, scope.
Classes (3)
class PluginScope
class PluginManifest
Parsed from PLUGIN.md YAML frontmatter or plugin.json.
↳ from_dict(cls, data: dict)
↳ from_plugin_dir(cls, plugin_dir: Path)
Load manifest from a plugin directory (plugin.json or PLUGIN.md frontmatter).
↳ _from_md(cls, md_file: Path)
class PluginEntry
A plugin registered in the config store.
↳ qualified_name(self)
↳ to_dict(self)
↳ from_dict(cls, data: dict)
Functions (2)
def parse_plugin_identifier(identifier: str)
Parse 'name' or 'name@source'. Returns (name, source_or_None).
def sanitize_plugin_name(name: str)
Ensure plugin name is safe for use as directory name (alphanumeric + underscore).
Imports
__future__dataclassesenumpathlibretyping
▶providers.py4482 LOC
Multi-provider support for Dulus.
Supported providers:
anthropic — Claude (claude-opus-4-6, claude-sonnet-4-6, ...)
openai — GPT (gpt-4o, o3-mini, ...)
gemini — Google Gemini (gemini-2.0-flash, gemini-1.5-pro, ...)
kimi — Moonshot AI (kimi-k2.5, moonshot-v1-8k/32k/128k)
kimi-code — Kimi Code (kimi-for-coding, membership API from kimi.com/code)
qwen — Alibaba DashScope (qwen-max, qwen-plus, ...)
zhipu — Zhipu GLM (glm-4, glm-4-plus, ...)
deepseek — De
Classes (6)
class _ProviderRetry
Lightweight retry wrapper for provider streaming calls.
Retries on: timeout, connection errors, 429 (rate limit), 5xx.
Does NOT retry on: 4xx (client errors), auth failures.
↳ is_retryable(cls, exc: Exception)
Return True if the exception is worth retrying.
↳ sleep_for_attempt(cls, attempt: int)
Exponential backoff with full jitter.
↳ wrap_generator(cls, fn: Callable, *args, **kwargs)
Wrap a generator function with retry logic.
Yields through the generator; if it raises a retryable exception,
waits and retries up to MAX_RETRIES times.
class WebToolParser
Shared parser for prompt-based tool calls in XML format.
Also supports auto-wrapping raw JSON tool calls if auto_wrap_json=True.
↳ __init__(self, auto_wrap_json: bool = False)
↳ parse_chunk(self, chunk: str)
Parse chunk, return display text and accumulate tool calls.
↳ flush(self)
Return any remaining text in the buffer.
class _DeepSeekPoWSolver
Lazy-initialized WASM PoW solver for DeepSeek web (sha3_wasm_bg).
↳ get(cls)
↳ __init__(self)
↳ _get_mem_array(self)
↳ _alloc_string(self, s: str)
↳ solve(self, challenge: str, salt: str, expire_at: int, difficulty: int)
class TextChunk
↳ __init__(self, text)
class ThinkingChunk
↳ __init__(self, text)
class AssistantTurn
Completed assistant turn with text + tool_calls + thinking.
↳ __init__(self, text, tool_calls, in_tokens, out_tokens, thinking = '', error = False, cache_creation_tokens = 0, cache_read_tokens = 0)
Functions (47)
def _parse_tool_call_payload(payload: str)
Best-effort extraction of (tool_name, input_dict) from the body of a
`<tool_call>...</tool_call>` block.
Why this exists: models sometimes leak Anthropic's `<function_calls>` /
`<parameter>` syntax INSIDE our `<tool_call>` block, which corrupts the
JSON. Without a reco
def _format_web_tool_manifest(tool_schemas: list, config: dict, messages: list)
Format tools as a prompt hint for web models.
First turn → full manifest with strong instructions + tool list.
Continuation turns → short format reminder (always injected, cheap).
Disable entirely with config["no_tools"] = True.
def _consolidate_web_history(messages: list, manifest: str = '')
Consolidate history since last assistant turn into one prompt string.
This ensures tool results and system notifications are correctly perceived
by web-based models that take a single prompt string.
def detect_provider(model: str)
Return provider name for a model string.
Supports 'provider/model' explicit format, or auto-detect by prefix.
def _claude_web_cookies_path(config: dict)
Return path to claude.ai cookies JSON file.
def _kimi_web_auth_path(config: dict)
Return path to kimi.com consumer auth JSON file.
def _kimi_web_list_chats(auth_data: dict, page_size: int = 50, page_token: str = '', query: str = '')
List recent chats from kimi.com using harvested cookies/headers.
Reuses the auth blob saved by /harvest (cookies + x-msh-* + Bearer).
Endpoint is kimi.chat.v1.ChatService/ListChats (NOT the gateway /Chat one).
Returns the parsed JSON from the API or raises on HTTP error.
def _gemini_web_auth_path(config: dict)
Return path to gemini.google.com consumer auth JSON file.
def _deepseek_web_auth_path(config: dict)
Return path to chat.deepseek.com consumer auth JSON file.
def _qwen_web_auth_path(config: dict)
Return path to chat.qwen.ai consumer auth JSON file.
def _claude_web_org_id(cookies_data: dict, config: dict)
Extract org ID: try cookies → try API → fallback from config → hardcoded.
def _claude_web_headers(cookies_data: dict, referer: str = 'https://claude.ai/new')
Build HTTP headers for claude.ai requests.
def _claude_web_fetch_org_id(cookies_data: dict)
Call /api/organizations using requests.Session with harvested cookies.
def _claude_web_create_conversation(cookies_data: dict, org_id: str)
Create a new claude.ai chat conversation using requests.Session.
def stream_claude_web(cookies_file: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from claude.ai web using harvested browser cookies.
Tool calling is prompt-based: tool manifest injected into the user
message; <tool_call>...</tool_call> tags parsed from the response.
Conversation context is maintained server-side via conversation_id.
def stream_claude_code(cookies_file: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from claude.ai/code remote-control session using harvested cookies.
Endpoint: POST https://claude.ai/v1/sessions/{session_id}/events
Payload: {"events": [{"type":"user","uuid":"...","session_id":"...","parent_tool_use_id":null,"message":{"role":"user","content":"..."}}]}
Auth: same clau
def stream_kimi_web(auth_file: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from kimi.com consumer web using harvested gRPC-Web tokens.
def stream_gemini_web(auth_file: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from gemini.google.com using the fast REST API with user-provided headers.
Uses the 'requests' library with the exact cookies and headers captured from
the user's browser. The harvester requires the user to type 'DULUS' as the
message so we can locate and replace it in the f.req payload.
def stream_deepseek_web(auth_file: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from chat.deepseek.com web using harvested browser session.
DeepSeek's web UI uses a simple SSE (text/event-stream) API:
POST https://chat.deepseek.com/api/v0/chat/completion
Headers: Authorization: Bearer <token>
Body: { model, messages, stream: true, chat_session_id? }
The har
def stream_qwen_web(auth_file: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from chat.qwen.ai web using harvested browser session.
Qwen web uses a JSON-stream API:
POST https://chat.qwen.ai/api/v2/chat/completions?chat_id=<uuid>
Cookies: token=<JWT>, plus anti-bot cookies (cna/isg/tfstk/...)
Body: {stream:true, version:"2.1", incremental_output:tru
def bare_model(model: str)
Strip 'provider/' prefix if present.
def get_api_key(provider_name: str, config: dict)
def calc_cost(model: str, in_tok: int, out_tok: int)
def _find_native_tool_marker(text: str)
def _extract_native_tool_calls(buf: str)
Parse buffered native-format tool calls. Returns [] on any failure.
def estimate_tokens_kimi(api_key: str, model: str, messages: list)
Estimate token count using Kimi's native API endpoint.
Args:
api_key: Moonshot API key
model: Model name (e.g., "kimi-k2.5")
messages: List of message dicts with "role" and "content"
Returns:
Estimated token count, or None if the request fails
def scrub_any_type(obj: Any)
Recursively remove 'type': 'any' from schema dictionaries as it's not valid JSON Schema.
def tools_to_openai(tool_schemas: list)
Convert Anthropic-style tool schemas to OpenAI function-calling format.
def messages_to_anthropic(messages: list)
Convert neutral messages → Anthropic API format.
def messages_to_openai(messages: list, ollama_native_images: bool = False, model: str = '')
Convert neutral messages → OpenAI API format.
Also sanitizes orphan tool_calls — if an assistant message has tool_calls
but the matching tool responses are missing (e.g. user interrupted mid-call),
the tool_calls are stripped to avoid API rejection.
def friendly_api_error(exc: Exception)
Map common API exceptions to short, actionable hints for the user.
Returns a single-line string suitable for streaming back to the REPL.
Falls back to the raw exception message when no pattern matches.
def _thinking_level_from(value)
Coerce legacy bool/int thinking config into an int 0-4.
def _sanitize_tool_for_anthropic(t: dict)
Coerce any tool schema (OpenAI / mixed / Anthropic) into Anthropic shape.
Handles three common mis-formats seen from plugins/MCP/auto-adapter:
(a) OpenAI wrapped: {"type":"function","function":{"name","description","parameters"}}
(b) OpenAI custom: {"type":"custom","custom":{"name","
def stream_anthropic(api_key: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from Anthropic API. Yields TextChunk/ThinkingChunk, then AssistantTurn.
Prompt caching: marks up to 3 cache breakpoints — system prompt, tools
block, and the latest user message. Anthropic caches everything BEFORE
each breakpoint, so the conversation history up to the latest user turn
rides
def stream_kimi(api_key: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from Kimi API using native HTTP requests. Yields TextChunk, then AssistantTurn.
This is a native implementation using urllib.request instead of the OpenAI SDK,
allowing direct comparison with the OpenAI-compatible version.
Token estimation:
1. Input tokens: Estimados ANTES usando estimate_t
def stream_litellm(model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream via LiteLLM — one entry point routing to 100+ backends.
`model` arrives WITHOUT the leading "litellm/" prefix (bare_model strips
one level). So for `litellm/openrouter/anthropic/claude-3-5-sonnet`,
`model` here is `openrouter/anthropic/claude-3-5-sonnet` — exactly the
format LiteLLM's `compl
def _get_nvidia_fallback_chain(config: dict)
def stream_openai_compat(api_key: str, base_url: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from any OpenAI-compatible API. Yields TextChunk, then AssistantTurn.
def _flatten_tool_messages(messages: list)
Convert tool-call history to plain text for models without native tool support.
Transforms:
- assistant messages with tool_calls → text + inline <tool_call> representation
- role:tool messages → role:user with [Tool Result] prefix
This lets the model see the full conversation without need
def _build_prompt_tool_manifest(tool_schemas: list)
Build the text block injected into the system prompt for prompt-based tool calling.
def _get_gcloud_token()
Obtain OAuth2 access token from gcloud CLI.
def _openai_messages_to_vertex_contents(messages: list)
Convert OpenAI-format messages to Vertex AI generateContent 'contents'.
def _openai_tools_to_vertex_tools(tool_schemas: list)
Convert OpenAI-format tools to Vertex AI functionDeclarations.
def stream_gcloud(model: str, system: str, messages: list, tool_schemas: list, config: dict)
Stream from Google Cloud Vertex AI using gcloud OAuth2 authentication.
Uses the generateContent REST API directly with Bearer tokens from
`gcloud auth print-access-token`. Supports native function calling.
def stream_ollama(base_url: str, model: str, system: str, messages: list, tool_schemas: list, config: dict)
def stream(model: str, system: str, messages: list, tool_schemas: list, config: dict)
Unified streaming entry point.
Auto-detects provider from model string.
Yields: TextChunk | ThinkingChunk | AssistantTurn
All provider calls are wrapped with automatic retry on transient
failures (timeouts, 429 rate-limit, 5xx server errors).
def list_ollama_models(base_url: str)
Fetch locally available model tags from Ollama server.
Imports
__future__functoolsjsonplatformrandomrerequestssubprocesstimetypingurllib.parseurllib.request
▶sandbox/__init__.py1 LOC
Dulus Sandbox OS — static web assets.
▶sandbox_bootstrap.py54 LOC
Resolve the path to the Dulus Sandbox web UI.
The sandbox (the desktop-OS-in-a-browser at `/sandbox/`) ships inside the
wheel as plain files at `sandbox/dist/` — same layout as Dulus 0.2.81.
No tarball, no extract-on-first-run; pip installs it and webchat serves
it directly from site-packages.
GitHub Linguist's "this repo is 49% TypeScript" complaint is handled by
`.gitattributes` (`sandbox/** linguist-vendored=true`), not by hiding
the files from the wheel.
Functions (2)
def ensure_sandbox()
Return the directory the webchat server should serve `/sandbox/` from.
Resolution order:
1. Installed wheel: `<site-packages>/sandbox/dist/`
2. Editable / source checkout: `<repo>/sandbox/dist/`
def __getattr__(name: str)
Imports
__future__pathlib
▶skill/__init__.py14 LOC
skill package — reusable prompt templates (skills).
Imports
executorloader
▶skill/builtin.py100 LOC
Built-in skills that ship with dulus.
Functions (1)
def _register_builtins()
Imports
__future__loader
▶skill/clawhub.py630 LOC
ClawHub + local Anthropic skill importer for Dulus.
Sources:
- LOCAL : ~/.claude/plugins/marketplaces/claude-plugins-official/ (Anthropic, on-disk)
- AWESOME : ~/.claude/plugins/marketplaces/alireza-claude-skills/ (alirezarezvani/claude-skills, ~235 skills across 9 domains)
- CLAWHUB : https://clawhub.ai (community, 52k+ skills, via API)
Functions (16)
def list_local(query: Optional[str] = None)
Return all SKILL.md entries from local marketplaces (Anthropic + Awesome Skills).
def get_local(slug: str)
Find a local skill by its id (plugin/skill or external/plugin/skill).
def install_local(slug: str)
Copy a local Anthropic skill (SKILL.md + all support files) into ~/.dulus/skills/<name>/
def _fetch_awesome_remote(with_descriptions: bool = False)
Hit the GitHub tree API to list awesome skills.
Default (with_descriptions=False): ONE API call, instant, no descriptions.
Returns 235 entries with name + url ready in <1s.
with_descriptions=True: also pulls each SKILL.md's frontmatter via
raw.githubusercontent.com — done with a thread pool so
def list_awesome_remote(query: Optional[str] = None, force_refresh: bool = False, with_descriptions: bool = False)
Return the awesome-skills catalog (cached).
Default: one GitHub tree call (~1s, no descriptions), cached 24h.
with_descriptions=True: also fetches each SKILL.md frontmatter in parallel.
def get_awesome_remote(slug: str)
Find an awesome-remote skill by its id (awesome/<dir>/<skill>) or skill name.
def install_awesome_remote(slug: str)
Download a skill from the awesome GitHub repo and install into ~/.dulus/skills/.
def _load_composio_api_key()
Load API key from env, ~/.dulus/config.json, or ~/.falcon/config.json.
def list_composio_toolkits(query: Optional[str] = None, force_refresh: bool = False)
Return Composio toolkits as skill-like dicts. Cached 24h.
Authenticated path (API key set): hit the live `/api/v3/toolkits` endpoint.
Unauthenticated path: return the curated _COMPOSIO_FALLBACK list so the
/skill list composio UI still shows something useful.
def search_clawhub(query: str, limit: int = 10)
Search ClawHub for skills matching query.
TODO: fill in real Convex endpoint once reversed.
def install_clawhub(slug: str)
Download a skill from ClawHub by slug and save to ~/.dulus/skills/.
TODO: fill in real endpoint.
def list_installed(query: Optional[str] = None)
Return skills already saved in ~/.dulus/skills/.
def read_skill(name: str)
Return the body (no frontmatter) of an installed skill.
def _parse_frontmatter(text: str)
def _strip_frontmatter(text: str)
def _dulus_frontmatter(entry: dict)
Imports
__future__jsonpathlibretypingurllib.parseurllib.request
▶skill/executor.py66 LOC
Skill execution: inline (current conversation) or forked (sub-agent).
Functions (3)
def execute_skill(skill: SkillDef, args: str, state, config: dict, system_prompt: str)
Execute a skill.
If skill.context == "fork", runs as an isolated sub-agent and yields its events.
Otherwise (inline), injects the rendered prompt into the current agent loop.
Args:
skill: SkillDef to execute
args: raw argument string from user (after the trigger word)
state: AgentState
def _execute_inline(message: str, state, config: dict, system_prompt: str)
Run skill prompt inline in the current conversation.
def _execute_forked(skill: SkillDef, message: str, config: dict, system_prompt: str)
Run skill as an isolated sub-agent (separate conversation context).
Imports
__future__loadertyping
▶skill/loader.py229 LOC
Skill loading: parse markdown files with YAML frontmatter into SkillDef objects.
Classes (1)
class SkillDef
Functions (7)
def _get_skill_paths()
Skill search roots in PRIORITY order (first wins after dedup).
Order: project > user > bundled. The loader iterates `reversed()`
of this list and overwrites by name as it goes, so anything listed
first here ends up as the surviving entry on collisions.
The `bundled` directory ships inside th
def _parse_list_field(value: str)
Parse YAML-like list: ``[a, b, c]`` or ``"a, b, c"``.
def _parse_skill_file(path: Path, source: str = 'user')
Parse a markdown file with ``---`` frontmatter into a SkillDef.
Frontmatter fields:
name, description, triggers, tools / allowed-tools,
when_to_use, argument-hint, arguments, model,
user-invocable, context
def register_builtin_skill(skill: SkillDef)
def load_skills(include_builtins: bool = True)
Return skills from disk + builtins, deduplicated (project > user > builtin).
def find_skill(query: str)
Find a skill whose trigger matches the first word (or whole string) of query.
def substitute_arguments(prompt: str, args: str, arg_names: list[str])
Replace $ARGUMENTS (whole args string) and $ARG_NAME placeholders.
Named args are positional: first word → first name, etc.
Values are substituted literally; placeholder *names* are validated to
avoid pathological replace() chains, but values are NOT shell-escaped —
callers using the result in shel
Imports
__future__dataclassespathlibretyping
▶skill/tools.py110 LOC
Skill tool: lets the model invoke skills by name via tool call.
Functions (3)
def _skill_tool(params: dict, config: dict)
Execute a skill by name and return its output.
def _skill_list_tool(params: dict, config: dict)
def _register()
Imports
__future__loadertool_registry
▶skills.py14 LOC
Backward-compatibility shim — real implementation is in skill/ package.
Imports
skill.executorskill.loader
▶soul.py119 LOC
Baked-in default soul for Dulus.
The soul lives as a constant in source — that's the immutable core identity.
On first run we copy it to ``~/.dulus/memory/soul.md`` so the existing soul
loader (dulus.py @ ~8317) can pick it up. Users can edit that MD freely; the
code constant remains the canonical fallback if they ever ``/memory purge-soul``.
We intentionally do NOT auto-overwrite an existing soul.md — that would erase
custom personalities. ``seed_soul_file()`` is a no-op when the file is pres
Functions (4)
def _default_memory_dir()
Cross-OS resolution of the Dulus memory dir.
Prefer the canonical constant from ``memory.store`` so we honor any
override done there; fall back to ``~/.dulus/memory`` if the module
isn't importable yet (e.g. during early bootstrap).
def get_soul_path(memory_dir: Path | None = None)
Resolve ``<dulus_memory_dir>/soul.md``.
def compose_soul(user_name: str = 'amigo')
Render the full soul text: BAKED_SOUL templated with the user's name,
then the immutable CREATOR_BLOCK appended at the end so it's always
present even if the user edits their copy of soul.md.
def seed_soul_file(user_name: str = 'amigo', memory_dir: Path | None = None, force: bool = False)
Write a composed soul (BAKED_SOUL + CREATOR_BLOCK) to ``soul.md``.
Returns the path that was written, or ``None`` if the file already existed
and ``force=False``. Creates the memory directory if needed.
Imports
__future__pathlib
▶spinner.py42 LOC
Shared spinner phrases for Dulus's tool/debate spinners.
Centralized so dulus.py and ui/render.py stay in sync.
▶steer_input.py148 LOC
SteerInput - Allow steering agent execution with follow-up input.
Provides a mechanism for users to inject follow-up input ("steer" commands)
during agent execution. This enables real-time course correction without
waiting for the current turn to complete.
The SteerInput class uses an asyncio.Queue to buffer steer inputs, which can
be consumed by the agent loop at strategic checkpoints.
Classes (1)
class SteerInput
Manages steer inputs during agent execution.
Steer inputs are user messages injected mid-execution to redirect
the agent's course. They are stored in an asyncio.Queue and consumed
by the agent loop at decision points.
Attributes:
_enabled: Whether steer input collection is active.
↳ get_next_steer_sync(self, timeout: float = 0.1)
Synchronous version of get_next_steer.
Args:
timeout: Maximum seconds to wait for an input.
Returns:
The next steer input string, or None if timeout expires.
↳ has_pending(self)
Check if there are steer inputs waiting to be processed.
Returns:
True if the queue is not empty.
↳ enable(self)
Enable steer input collection.
↳ disable(self)
Disable steer input collection.
↳ enabled(self)
Whether steer input collection is currently enabled.
↳ clear(self)
Clear all pending steer inputs.
Returns:
Number of items cleared.
↳ pending_count(self)
Number of steer inputs currently queued.
Returns:
The queue size.
Functions (2)
def get_steer_input()
Get the default SteerInput singleton.
Returns:
The shared SteerInput instance, creating it if needed.
def reset_steer_input()
Reset the default SteerInput singleton.
Imports
__future__asynciodataclassestyping
▶string_utils.py42 LOC
String utilities — adapted from kimi-cli.
Functions (3)
def shorten(text: str)
Shorten text to at most *width* characters.
Normalises whitespace, then truncates — preferring a word boundary
when one exists near the cut point, but falling back to a hard cut
so that CJK text without spaces won't collapse to just the placeholder.
def shorten_middle(text: str, width: int, remove_newline: bool = True)
Shorten the text by inserting ellipsis in the middle.
def random_string(length: int = 8)
Generate a random string of fixed length.
Imports
__future__randomrestring
▶subagent.py11 LOC
Backward-compatibility shim — real implementation is in multi_agent/subagent.py.
Imports
multi_agent.subagent
▶task/__init__.py12 LOC
Task system for dulus.
Imports
storetypes
▶task/store.py209 LOC
Thread-safe task store: in-memory dict persisted to .dulus/tasks.json.
Functions (11)
def _tasks_file()
def _load()
def _save()
def _next_id()
Generate a short sequential numeric ID.
def create_task(subject: str, description: str, status: str = 'pending', owner: str = '', active_form: str = '', metadata: dict[str, Any] | None = None)
def get_task(task_id: str)
def list_tasks()
def update_task(task_id: str, subject: str | None = None, description: str | None = None, status: str | None = None, active_form: str | None = None, owner: str | None = None, add_blocks: list[str] | None = None, add_blocked_by: list[str] | None = None, metadata: dict[str, Any] | None = None)
Update a task. Returns (updated_task, list_of_updated_fields).
def delete_task(task_id: str)
def clear_all_tasks()
Remove all tasks (used in tests).
def reload_from_disk()
Force reload from disk (used in tests).
Imports
__future__datetimejsonpathlibthreadingtypestypinguuid
▶task/tools.py276 LOC
Task tools: TaskCreate, TaskUpdate, TaskGet, TaskList — registered into tool_registry.
Functions (5)
def _task_create(subject: str, description: str, status: str = 'pending', owner: str = '', active_form: str = '', metadata: dict = None)
def _task_update(task_id: str, subject: str = None, description: str = None, status: str = None, active_form: str = None, owner: str = None, add_blocks: list = None, add_blocked_by: list = None, metadata: dict = None)
def _task_get(task_id: str)
def _task_list()
def _register()
Imports
__future__storetool_registrytypes
▶task/types.py92 LOC
Task system types: Task dataclass, TaskStatus enum.
Classes (2)
class TaskStatus
class Task
↳ to_dict(self)
↳ from_dict(cls, data: dict)
↳ status_icon(self)
↳ one_line(self, resolved_ids: set[str] | None = None)
Imports
__future__dataclassesdatetimeenumtyping
▶tests/__init__.py0 LOC
▶tests/e2e_checkpoint.py228 LOC
End-to-end checkpoint test: simulate a real user session.
Classes (1)
class AgentState
Functions (1)
def auto_snapshot(user_input)
Same logic as dulus.py auto-snapshot with throttle.
Imports
checkpointcheckpoint.hookscheckpoint.storedataclassesdatetimeospathlibshutilsystempfileuuid
▶tests/e2e_commands.py191 LOC
End-to-end test for /init, /export, /copy, /status commands.
Classes (1)
class FakeState
Functions (2)
def test_commands()
def _run_tests(tmpdir)
Imports
__future__dataclassesjsonospathlibshutilsystempfileunittest.mock
▶tests/e2e_compact.py193 LOC
Tests for /compact command and compaction enhancements.
Classes (1)
class FakeState
Functions (1)
def test_compact()
Imports
__future__dataclassespathlibsysunittest.mock
▶tests/e2e_plan_mode.py182 LOC
End-to-end test for plan mode.
Classes (1)
class FakeState
Functions (1)
def test_plan_mode()
Imports
__future__dataclassesospathlibsystempfile
▶tests/e2e_plan_tools.py167 LOC
End-to-end test for EnterPlanMode / ExitPlanMode tools.
Functions (2)
def test_plan_tools()
def _run(tmpdir)
Imports
__future__ospathlibshutilsystempfile
▶tests/test_afk_yolo.py178 LOC
Tests for AFK Mode and YOLO Mode.
Classes (3)
class TestAFKMode
Test suite for AFKMode.
↳ test_default_state_is_disabled(self)
AFK mode should be disabled by default.
↳ test_enable_sets_state(self)
enable() should set the state to True.
↳ test_disable_sets_state(self)
disable() should set the state to False.
↳ test_toggle_returns_new_state(self)
toggle() should return the new state.
↳ test_toggle_twice_returns_false(self)
Toggling twice should return False.
↳ test_repr_shows_state(self)
__repr__ should include the enabled state.
↳ test_dataclass_field_is_hidden(self)
The _enabled field should not appear in repr from dataclass.
↳ test_multiple_instances_are_independent(self)
Different instances should have independent state.
class TestYOLOMode
Test suite for YOLOMode.
↳ test_default_state_is_disabled(self)
YOLO mode should be disabled by default.
↳ test_enable_sets_state(self)
enable() should set the state to True.
↳ test_disable_sets_state(self)
disable() should set the state to False.
↳ test_toggle_returns_new_state(self)
toggle() should return the new state.
↳ test_toggle_twice_returns_false(self)
Toggling twice should return False.
↳ test_repr_shows_state(self)
__repr__ should include the enabled state.
↳ test_dataclass_field_is_hidden(self)
The _enabled field should not appear in repr from dataclass.
↳ test_multiple_instances_are_independent(self)
Different instances should have independent state.
class TestAFKYOLOIndependence
Test that AFK and YOLO modes are independent.
↳ test_modes_are_independent(self)
AFK and YOLO can be enabled/disabled independently.
↳ test_all_combinations(self)
Test all four combinations of AFK/YOLO states.
Imports
dulus_tools.afk_modedulus_tools.yolo_modepathlibpytestsys
▶tests/test_approval_runtime.py392 LOC
Tests for ApprovalRuntime.
Classes (4)
class TestApprovalRequest
Test suite for ApprovalRequest dataclass.
↳ test_create_minimal(self)
Create a minimal ApprovalRequest.
↳ test_create_full(self)
Create an ApprovalRequest with all fields.
↳ test_default_created_at(self)
created_at should have a default value.
class TestApprovalResponse
Test suite for ApprovalResponse dataclass.
↳ test_create(self)
Create an ApprovalResponse.
class TestApprovalRuntime
Test suite for ApprovalRuntime.
↳ test_create_request(self)
Create a request and verify it's stored.
↳ test_create_request_with_explicit_id(self)
Create a request with an explicit ID.
↳ test_list_pending_empty(self)
list_pending should return empty list when no requests.
↳ test_list_pending_after_resolve(self)
Resolved requests should not appear in list_pending.
↳ test_resolve_approve(self)
Resolve a request with approve.
↳ test_resolve_reject(self)
Resolve a request with reject.
↳ test_resolve_nonexistent(self)
Resolving a nonexistent request should return False.
↳ test_get_request(self)
Get a request by ID.
↳ test_get_request_nonexistent(self)
Get a nonexistent request should return None.
↳ test_subscribe_and_notify(self)
Subscribe to events and receive notifications.
↳ test_unsubscribe(self)
Unsubscribe should stop receiving events.
↳ test_cancel_by_source(self)
Cancel requests by source.
↳ test_cancel_by_source_no_match(self)
Cancel with no matching source should return 0.
↳ test_clear_resolved(self)
Clear resolved requests from memory.
class TestApprovalRuntimeAsync
Async test suite for ApprovalRuntime.
Imports
__future__asynciodulus_tools.approval_runtimepathlibpytestsys
▶tests/test_background_task_tools.py456 LOC
Tests for the Background Task tools (tools_background.py).
Classes (6)
class TestBgTaskListTool
Test suite for the BgTaskList tool.
↳ test_tool_is_registered(self)
BgTaskList tool must be registered.
↳ test_tool_is_read_only(self)
BgTaskList should be read-only.
↳ test_tool_is_concurrent_safe(self)
BgTaskList should be safe to run concurrently.
↳ test_empty_list(self)
Listing with no tasks should return empty message.
↳ test_list_shows_tasks(self)
Listing should show added tasks.
class TestBgTaskOutputTool
Test suite for the BgTaskOutput tool.
↳ test_tool_is_registered(self)
BgTaskOutput tool must be registered.
↳ test_tool_is_read_only(self)
BgTaskOutput should be read-only.
↳ test_output_for_missing_task(self)
Getting output for missing task should return error.
↳ test_output_with_content(self)
Getting output should include stdout content.
↳ test_output_with_stderr(self)
Getting output should include stderr content.
↳ test_output_no_block(self)
Non-blocking output should not wait.
↳ test_schema_has_task_id_required(self)
The schema should require 'task_id'.
class TestBgTaskStopTool
Test suite for the BgTaskStop tool.
↳ test_tool_is_registered(self)
BgTaskStop tool must be registered.
↳ test_tool_is_not_read_only(self)
BgTaskStop should NOT be read-only (it stops tasks).
↳ test_tool_is_not_concurrent_safe(self)
BgTaskStop should NOT be marked concurrent-safe.
↳ test_stop_missing_task(self)
Stopping a missing task should return error.
↳ test_stop_non_running_task(self)
Stopping a non-running task should report it's not running.
↳ test_stop_running_task(self)
Stopping a running task should succeed.
↳ test_force_stop(self)
Force stopping a task should report forceful stop.
class TestBackgroundTaskStore
Test suite for the BackgroundTaskStore class.
↳ test_empty_store(self)
A new store should have no tasks.
↳ test_add_and_get_task(self)
Adding and retrieving a task should work.
↳ test_list_tasks_order(self)
Tasks should be listed most recent first.
↳ test_remove_task(self)
Removing a task should delete it.
↳ test_update_status(self)
Updating status should work.
↳ test_thread_safety(self)
Concurrent operations should be safe.
class TestBackgroundTask
Test suite for the BackgroundTask dataclass.
↳ test_task_creation(self)
Creating a task should set all fields.
↳ test_task_duration(self)
Duration should be positive after a small delay.
↳ test_task_is_running_without_thread(self)
A task without a thread should not appear running if status is not running.
↳ test_task_append_output(self)
Appending output should add to the correct list.
↳ test_task_to_dict(self)
to_dict should serialize key fields.
↳ test_task_stop(self)
Stopping a task should set the stop event.
↳ test_force_stop_sets_status(self)
Force stopping should set status to stopped when thread is alive.
class TestCreateBackgroundTask
Test suite for the create_background_task helper.
↳ test_create_task(self)
Creating a background task should return a task with an ID.
↳ test_task_actually_runs(self)
The background task should actually execute.
↳ test_task_with_args(self)
The background task should pass arguments.
↳ test_task_error_handling(self)
The background task should handle errors gracefully.
Functions (1)
def _clean_registry()
Reset registry and task store before each test.
Imports
__future__pytestthreadingtimetool_registrytools_background
▶tests/test_background_tasks.py846 LOC
Tests for the dulus.background_tasks module (Features 14-15).
Covers BackgroundTaskStore, BackgroundTaskManager, BackgroundAgentRunner,
and all utility functions.
Classes (5)
class TestUtilityFunctions
Tests for module-level helper functions.
↳ test_is_terminal_status_true(self)
↳ test_is_terminal_status_false(self)
↳ test_generate_task_id_format(self)
↳ test_generate_task_id_unique(self)
↳ test_format_task_basic(self)
↳ test_format_task_with_command(self)
↳ test_format_task_list_empty(self)
↳ test_format_task_list_with_items(self)
class TestBackgroundTaskStore
Tests for BackgroundTaskStore persistence layer.
↳ test_init_creates_directory(self, tmp_path: Path)
↳ test_task_dir(self, tmp_store: BackgroundTaskStore)
↳ test_create_task_writes_files(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_create_task_runtime_defaults(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_read_spec_roundtrip(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_write_runtime_and_read_back(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_read_output_empty(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_read_output_with_content(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_read_output_offset_pagination(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_tail_output_basic(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_tail_output_exceeds_max_bytes(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_tail_output_missing_file(self, tmp_store: BackgroundTaskStore)
↳ test_list_task_ids(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_list_views(self, tmp_store: BackgroundTaskStore)
↳ test_list_views_skips_corrupted(self, tmp_store: BackgroundTaskStore)
↳ test_merged_view(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
↳ test_merged_view_not_found(self, tmp_store: BackgroundTaskStore)
↳ test_output_path(self, tmp_store: BackgroundTaskStore)
↳ test_kind_payload_roundtrip(self, tmp_store: BackgroundTaskStore)
class TestBackgroundTaskManager
Tests for BackgroundTaskManager lifecycle and operations.
↳ test_init_creates_store(self, tmp_path: Path)
↳ test_default_config(self, tmp_manager: BackgroundTaskManager)
↳ test_custom_config_override(self, tmp_path: Path)
↳ test_has_active_tasks_empty(self, tmp_manager: BackgroundTaskManager)
↳ test_create_bash_task_returns_view(self, tmp_manager: BackgroundTaskManager)
↳ test_create_bash_task_persists(self, tmp_manager: BackgroundTaskManager)
↳ test_create_bash_task_max_limit(self, tmp_manager: BackgroundTaskManager)
↳ test_create_bash_task_launch_failure(self, tmp_manager: BackgroundTaskManager)
↳ test_get_task_found(self, tmp_manager: BackgroundTaskManager)
↳ test_get_task_not_found(self, tmp_manager: BackgroundTaskManager)
↳ test_list_tasks_active_only(self, tmp_manager: BackgroundTaskManager)
↳ test_list_tasks_all(self, tmp_manager: BackgroundTaskManager)
↳ test_list_tasks_limit(self, tmp_manager: BackgroundTaskManager)
↳ test_kill_existing_task(self, tmp_manager: BackgroundTaskManager)
↳ test_kill_already_terminal(self, tmp_manager: BackgroundTaskManager)
↳ test_kill_missing_process(self, tmp_manager: BackgroundTaskManager)
↳ test_kill_all_active(self, tmp_manager: BackgroundTaskManager)
↳ test_read_output(self, tmp_manager: BackgroundTaskManager)
↳ test_tail_output(self, tmp_manager: BackgroundTaskManager)
↳ test_resolve_output_path(self, tmp_manager: BackgroundTaskManager)
↳ test_launch_worker_creates_subprocess(self, tmp_manager: BackgroundTaskManager)
↳ test_launch_worker_with_cwd(self, tmp_manager: BackgroundTaskManager, tmp_path: Path)
↳ test_recovery_marks_stale_tasks(self, tmp_manager: BackgroundTaskManager)
↳ test_recovery_skips_fresh_tasks(self, tmp_manager: BackgroundTaskManager)
↳ test_recovery_respects_interrupted(self, tmp_manager: BackgroundTaskManager)
↳ test_reconcile_returns_terminal_ids(self, tmp_manager: BackgroundTaskManager)
↳ test_list_task_views_wrapper(self, tmp_manager: BackgroundTaskManager)
↳ test_kill_on_windows_fallback(self, tmp_manager: BackgroundTaskManager)
class TestBackgroundAgentRunner
Tests for BackgroundAgentRunner async task execution.
class TestEdgeCases
Edge cases and integration scenarios.
↳ test_concurrent_task_count(self, tmp_manager: BackgroundTaskManager)
Verify that active task count is accurate with multiple tasks.
↳ test_task_sort_order(self, tmp_store: BackgroundTaskStore)
Tasks should be listed newest first.
↳ test_empty_store_list_views(self, tmp_store: BackgroundTaskStore)
An empty store should return an empty list of views.
↳ test_empty_store_list_task_ids(self, tmp_store: BackgroundTaskStore)
An empty store should return an empty list of task IDs.
↳ test_read_output_beyond_eof(self, tmp_store: BackgroundTaskStore, sample_spec: TaskSpec)
Reading beyond EOF should return empty text with same offset.
↳ test_kill_all_active_with_no_active(self, tmp_manager: BackgroundTaskManager)
kill_all_active with no active tasks should return empty list.
↳ test_wait_on_nonexistent_task(self, tmp_manager: BackgroundTaskManager)
Waiting on a nonexistent task should raise FileNotFoundError.
↳ test_task_id_prefix(self)
Generated task IDs should have the correct kind prefix.
↳ test_terminal_statuses_are_mutually_exclusive(self)
A status should not be both terminal and non-terminal.
↳ test_config_defaults_complete(self)
All config keys should have default values.
Functions (3)
def tmp_store(tmp_path: Path)
Return a fresh BackgroundTaskStore backed by a temp directory.
def tmp_manager(tmp_path: Path)
Return a fresh BackgroundTaskManager with a temp session directory.
def sample_spec()
Return a sample TaskSpec for testing.
Imports
__future__asynciodulus_tools.background_tasksjsonospathlibpytestsignalsystimeunittest.mock
▶tests/test_checkpoint.py458 LOC
Tests for the checkpoint system.
Classes (5)
class FakeState
class TestTypes
↳ test_file_backup_roundtrip(self)
↳ test_file_backup_none_filename(self)
↳ test_snapshot_roundtrip(self)
class TestStore
↳ test_track_file_edit_existing_file(self, tmp_home, tmp_path)
↳ test_track_file_edit_nonexistent(self, tmp_home, tmp_path)
↳ test_track_file_edit_large_file_skipped(self, tmp_home, tmp_path)
↳ test_make_snapshot_basic(self, tmp_home, tmp_path)
↳ test_make_snapshot_incremental(self, tmp_home, tmp_path)
↳ test_list_snapshots(self, tmp_home)
↳ test_get_snapshot(self, tmp_home)
↳ test_rewind_files(self, tmp_home, tmp_path)
↳ test_rewind_deletes_new_file(self, tmp_home, tmp_path)
↳ test_max_snapshots_sliding_window(self, tmp_home)
↳ test_files_changed_since(self, tmp_home, tmp_path)
↳ test_delete_session_checkpoints(self, tmp_home)
↳ test_cleanup_old_sessions(self, tmp_home)
class TestHooks
↳ test_set_session_and_tracking(self, tmp_home, tmp_path)
↳ test_reset_tracked(self, tmp_home, tmp_path)
↳ test_install_hooks_wraps_tools(self)
Verify install_hooks wraps Write/Edit/NotebookEdit without error.
class TestIntegration
↳ test_write_snapshot_rewind_cycle(self, tmp_home, tmp_path)
Simulate: write file → snapshot → modify → rewind → verify restored.
↳ test_initial_snapshot(self, tmp_home)
Initial snapshot should be id=1 with empty messages and prompt '(initial state)'.
↳ test_throttle_skips_when_no_changes(self, tmp_home)
Snapshot should be skipped when no files changed and message_index is same.
↳ test_throttle_creates_when_messages_grew(self, tmp_home)
Snapshot should be created when messages grew even without file changes.
↳ test_throttle_conversation_rewind_works(self, tmp_home)
After throttled snapshots, conversation rewind via message_index still works.
Functions (2)
def tmp_home(tmp_path)
Redirect ~/.dulus/checkpoints to a temp directory.
def reset_versions()
Reset file version counters between tests.
Imports
__future__dataclassesjsonospathlibpytestshutiltempfileunittest.mock
▶tests/test_clipboard_utils.py259 LOC
Tests for ClipboardUtils (Feature 18).
Classes (6)
class TestGetClipboardCommand
Tests for platform-specific clipboard command detection.
↳ test_darwin_platform(self)
macOS should return pbcopy/pbpaste.
↳ test_win32_platform(self)
Windows should return clip/None.
↳ test_linux_no_tool(self)
Linux with no clipboard tool should return (None, None).
↳ test_linux_xclip_found(self)
Linux with xclip should return xclip commands.
class TestCopyText
Tests for copy_text.
↳ test_copy_text_success(self)
copy_text should return True on success.
↳ test_copy_text_no_tool(self)
copy_text should return False when no clipboard tool is available.
↳ test_copy_text_failure(self)
copy_text should return False on subprocess failure.
↳ test_copy_text_windows(self)
copy_text should use utf-16-le encoding on Windows.
class TestPasteText
Tests for paste_text.
↳ test_paste_text_success(self)
paste_text should return clipboard content.
↳ test_paste_text_no_tool(self)
paste_text should return empty string when no tool is available.
↳ test_paste_text_failure(self)
paste_text should return empty string on failure.
class TestCopyFileContent
Tests for copy_file_content.
↳ test_copy_full_file(self, tmp_path: Path)
copy_file_content should copy the entire file.
↳ test_copy_line_range(self, tmp_path: Path)
copy_file_content should copy a specific line range.
↳ test_copy_missing_file(self, tmp_path: Path)
copy_file_content should return False for missing files.
↳ test_copy_empty_file(self, tmp_path: Path)
copy_file_content should handle empty files.
class TestIsImageInClipboard
Tests for is_image_in_clipboard.
↳ test_darwin_png_signature(self)
Should detect PNG signature on macOS.
↳ test_darwin_jpeg_signature(self)
Should detect JPEG signature on macOS.
↳ test_darwin_gif_signature(self)
Should detect GIF signature on macOS.
↳ test_darwin_text_not_image(self)
Should return False for text on macOS.
↳ test_non_darwin_platform(self)
Should return False on non-macOS platforms (simplified).
↳ test_is_image_error(self)
Should return False on exception.
class TestConvenienceFunctions
Tests for copy_to_clipboard and paste_from_clipboard.
↳ test_copy_to_clipboard(self)
copy_to_clipboard should delegate to ClipboardUtils.copy_text.
↳ test_paste_from_clipboard(self)
paste_from_clipboard should delegate to ClipboardUtils.paste_text.
Imports
__future__dulus_tools.clipboard_utilspathlibpytestsysunittest.mock
▶tests/test_compaction.py190 LOC
Tests for compaction.py — token estimation, context limits, snipping, split point.
Classes (4)
class TestEstimateTokens
↳ test_simple_messages(self)
↳ test_empty_messages(self)
↳ test_empty_content(self)
↳ test_tool_result_messages(self)
↳ test_structured_content(self)
Content that is a list of dicts (e.g. Anthropic tool_result blocks).
↳ test_with_tool_calls(self)
class TestGetContextLimit
↳ test_anthropic(self)
↳ test_gemini(self)
↳ test_deepseek(self)
↳ test_openai(self)
↳ test_qwen(self)
↳ test_unknown_model_fallback(self)
↳ test_explicit_provider_prefix(self)
class TestSnipOldToolResults
↳ test_old_tool_results_get_truncated(self)
↳ test_recent_tool_results_preserved(self)
↳ test_short_tool_results_not_touched(self)
↳ test_non_tool_messages_untouched(self)
class TestFindSplitPoint
↳ test_returns_reasonable_index(self)
↳ test_single_message(self)
↳ test_empty_messages(self)
↳ test_split_preserves_recent(self)
Imports
__future__compactionossys
▶tests/test_diff_view.py50 LOC
Functions (6)
def test_generate_unified_diff()
def test_generate_unified_diff_empty_old()
def test_edit_returns_diff(tmp_path)
def test_write_existing_returns_diff(tmp_path)
def test_write_new_file_no_diff(tmp_path)
def test_diff_truncation()
Imports
ospytestsystempfile
▶tests/test_diff_visualization.py208 LOC
Tests for DiffVisualizer (Feature 19).
Classes (4)
class TestRenderCli
Tests for CLI diff rendering.
↳ test_render_cli_contains_path(self, simple_diff_block: dict[str, str])
CLI output should contain the file path.
↳ test_render_cli_contains_diff_markers(self, simple_diff_block: dict[str, str])
CLI output should contain unified diff markers.
↳ test_render_cli_ansi_colors(self, simple_diff_block: dict[str, str])
CLI output should include ANSI color codes.
↳ test_render_cli_no_changes(self, empty_diff_block: dict[str, str])
CLI output should indicate no changes for identical text.
↳ test_render_cli_removed_line(self, simple_diff_block: dict[str, str])
CLI output should show the removed line.
↳ test_render_cli_added_line(self, simple_diff_block: dict[str, str])
CLI output should show the added line.
↳ test_render_cli_default_path(self)
CLI output should use 'unknown' when path is missing.
class TestRenderHtml
Tests for HTML diff rendering.
↳ test_render_html_contains_path(self, simple_diff_block: dict[str, str])
HTML output should contain the file path.
↳ test_render_html_has_diff_block_class(self, simple_diff_block: dict[str, str])
HTML output should have diff-block wrapper.
↳ test_render_html_has_pre_tag(self, simple_diff_block: dict[str, str])
HTML output should use <pre> for diff content.
↳ test_render_html_diff_add_class(self, simple_diff_block: dict[str, str])
HTML output should mark additions with diff-add class.
↳ test_render_html_diff_del_class(self, simple_diff_block: dict[str, str])
HTML output should mark deletions with diff-del class.
↳ test_render_html_escapes_content(self)
HTML output should escape special characters.
↳ test_render_html_no_changes(self, empty_diff_block: dict[str, str])
HTML output should handle no changes.
class TestRenderTelegram
Tests for Telegram diff rendering.
↳ test_render_telegram_contains_path(self, simple_diff_block: dict[str, str])
Telegram output should contain the file path.
↳ test_render_telegram_has_file_emoji(self, simple_diff_block: dict[str, str])
Telegram output should have file emoji.
↳ test_render_telegram_line_counts(self, simple_diff_block: dict[str, str])
Telegram output should report line change counts.
↳ test_render_telegram_markdown_bold(self, simple_diff_block: dict[str, str])
Telegram output should use Markdown bold for filename.
↳ test_render_telegram_empty_diff(self, empty_diff_block: dict[str, str])
Telegram output should handle no-change diffs.
class TestGenerateUnifiedDiff
Tests for generate_unified_diff.
↳ test_generate_unified_diff(self)
Should produce a standard unified diff.
↳ test_generate_unified_diff_new_file(self)
Should handle new file (empty old text).
↳ test_generate_unified_diff_deleted_file(self)
Should handle deleted file (empty new text).
↳ test_generate_unified_diff_empty_both(self)
Should handle both texts being empty.
↳ test_generate_unified_diff_multiline(self)
Should handle multi-line diffs correctly.
Functions (2)
def simple_diff_block()
Return a simple diff block for testing.
def empty_diff_block()
Return a diff block with identical old and new text.
Imports
__future__dulus_tools.diff_visualizerpathlibpytestsys
▶tests/test_display_blocks.py454 LOC
Tests for the Display Blocks System (display_blocks.py).
Classes (5)
class TestRenderCli
Test suite for CLI rendering.
↳ test_render_unknown_block(self)
Rendering an unknown block type should not crash.
↳ test_render_block_missing_type(self)
Rendering a block with no type should handle gracefully.
↳ test_render_diff_cli(self)
Diff block should show path and changes.
↳ test_render_diff_cli_empty(self)
Diff with no changes should indicate no changes.
↳ test_render_diff_cli_summary(self)
Diff summary should be compact.
↳ test_render_todo_cli(self)
Todo block should show items with icons.
↳ test_render_todo_cli_empty(self)
Empty todo block should show empty message.
↳ test_render_shell_cli(self)
Shell block should show the command.
↳ test_render_bg_task_cli(self)
Background task block should show task info.
↳ test_render_think_cli(self)
Think block should show thought with indentation.
↳ test_render_think_cli_multiline(self)
Think block should handle multiline thoughts.
↳ test_render_code_cli(self)
Code block should show code with markers.
↳ test_render_table_cli(self)
Table block should show formatted table.
↳ test_render_table_cli_no_headers(self)
Table block without headers should still render.
↳ test_render_table_cli_empty(self)
Empty table should show empty message.
↳ test_render_error_cli(self)
Error block should show the error message.
class TestRenderHtml
Test suite for HTML rendering.
↳ test_render_todo_html(self)
Todo block should render as HTML list.
↳ test_render_todo_html_empty(self)
Empty todo should render as HTML.
↳ test_render_shell_html(self)
Shell block should render as HTML pre/code.
↳ test_render_think_html(self)
Think block should render as HTML.
↳ test_render_bg_task_html(self)
Background task should render as HTML.
↳ test_render_diff_html(self)
Diff should render as HTML.
↳ test_render_code_html(self)
Code should render as HTML with language class.
↳ test_render_table_html(self)
Table should render as HTML table.
↳ test_render_error_html(self)
Error should render as HTML with red color.
↳ test_render_unknown_html(self)
Unknown block should not crash in HTML mode.
↳ test_html_escapes_content(self)
HTML rendering should escape special characters.
class TestRenderTelegram
Test suite for Telegram rendering.
↳ test_render_todo_telegram(self)
Todo block should render for Telegram.
↳ test_render_todo_telegram_empty(self)
Empty todo should render for Telegram.
↳ test_render_shell_telegram(self)
Shell block should render as Telegram code block.
↳ test_render_think_telegram(self)
Think block should render for Telegram.
↳ test_render_bg_task_telegram(self)
Background task should render for Telegram.
↳ test_render_error_telegram(self)
Error should render for Telegram.
↳ test_render_unknown_telegram(self)
Unknown block should not crash in Telegram mode.
class TestRenderDispatch
Test suite for the render() dispatch method.
↳ test_render_cli_dispatch(self)
render() with 'cli' should call render_cli.
↳ test_render_html_dispatch(self)
render() with 'html' should call render_html.
↳ test_render_telegram_dispatch(self)
render() with 'telegram' should call render_telegram.
↳ test_render_invalid_format_fallback(self)
render() with unknown format should fall back to str().
class TestEdgeCases
Edge cases and error handling.
↳ test_empty_block(self)
Rendering an empty block should not crash.
↳ test_none_values_in_block(self)
Blocks with None values should be handled gracefully.
↳ test_missing_required_fields(self)
Blocks missing expected fields should be handled.
↳ test_large_content(self)
Large content should not cause issues.
Imports
__future__display_blockspytest
▶tests/test_export_import.py299 LOC
Tests for SessionExporter and SessionImporter — Feature 13.
Covers:
* export_markdown (basic, with session_id/token_count, structured content)
* export_json (basic, round-trip)
* export_text (basic)
* import_from_file (missing file, JSON, Markdown, text, max_context_size)
* import_from_session_id (missing session, valid session)
Functions (27)
def sample_history()
A small conversation history for export tests.
def exporter()
def importer()
def tmp_output(tmp_path: Path)
A temporary file path for writing exports.
def test_export_markdown_basic(exporter: SessionExporter, sample_history: list, tmp_output: Path)
def test_export_markdown_with_metadata(exporter: SessionExporter, sample_history: list, tmp_output: Path)
def test_export_markdown_structured_content(exporter: SessionExporter, tmp_output: Path)
def test_export_markdown_creates_directories(exporter: SessionExporter, sample_history: list, tmp_path: Path)
def test_export_markdown_empty_history(exporter: SessionExporter, tmp_output: Path)
def test_export_json_basic(exporter: SessionExporter, sample_history: list, tmp_output: Path)
def test_export_json_with_metadata(exporter: SessionExporter, sample_history: list, tmp_output: Path)
def test_export_json_round_trip(exporter: SessionExporter, sample_history: list, tmp_output: Path)
Exported JSON should be importable and contain identical messages.
def test_export_json_empty(exporter: SessionExporter, tmp_output: Path)
def test_export_text_basic(exporter: SessionExporter, sample_history: list, tmp_output: Path)
def test_export_text_structured_content(exporter: SessionExporter, tmp_output: Path)
def test_export_text_empty(exporter: SessionExporter, tmp_output: Path)
def test_import_missing_file(importer: SessionImporter)
def test_import_json_file(importer: SessionImporter, tmp_path: Path)
def test_import_invalid_json_file(importer: SessionImporter, tmp_path: Path)
def test_import_markdown_file(importer: SessionImporter, tmp_path: Path)
def test_import_text_file(importer: SessionImporter, tmp_path: Path)
def test_import_with_max_context_size(importer: SessionImporter, tmp_path: Path)
def test_import_unknown_extension(importer: SessionImporter, tmp_path: Path)
Files with unknown extensions are treated as plain text.
def test_import_missing_session(importer: SessionImporter, tmp_path: Path)
def test_import_valid_session(importer: SessionImporter, tmp_path: Path)
def test_import_session_empty_wire(importer: SessionImporter, tmp_path: Path)
def test_import_session_default_root(importer: SessionImporter)
When sessions_root is None, it should default to ~/.dulus/sessions.
Imports
__future__dulus_tools.export_importjsonpathlibpytestsystempfile
▶tests/test_hook_engine.py368 LOC
Tests for HookEngine.
Classes (5)
class TestHookDef
Test suite for HookDef dataclass.
↳ test_create_minimal(self)
Create a minimal HookDef.
↳ test_create_full(self)
Create a HookDef with all fields.
↳ test_compiled_matcher_valid(self)
A valid regex should compile.
↳ test_compiled_matcher_empty(self)
An empty matcher should return None.
↳ test_compiled_matcher_invalid(self)
An invalid regex should return None.
class TestHookResult
Test suite for HookResult dataclass.
↳ test_create(self)
Create a HookResult.
class TestHookEngine
Test suite for HookEngine.
↳ test_init_empty(self)
Create an empty HookEngine.
↳ test_init_with_hooks(self)
Create a HookEngine with hooks.
↳ test_add_hook(self)
Add a hook to the engine.
↳ test_remove_hook(self)
Remove hooks by event type.
↳ test_remove_hook_no_match(self)
Remove hooks with no match should return 0.
class TestHookEngineConfig
Test suite for HookEngine config loading.
↳ test_from_config_nonexistent(self)
Loading from nonexistent config should return empty engine.
↳ test_from_config_valid_toml(self, tmp_path: Path)
Loading from a valid TOML config.
↳ test_from_config_invalid_toml(self, tmp_path: Path)
Loading from an invalid TOML should return empty engine.
↳ test_parse_hooks_toml_simple(self)
Test the simple TOML parser fallback.
↳ test_parse_hooks_toml_simple_empty(self)
Test parsing empty TOML.
↳ test_parse_hooks_toml_simple_no_hooks(self)
Test parsing TOML without hooks.
↳ test_ensure_default_config(self, tmp_path: Path)
ensure_default_config should create a default config file.
↳ test_ensure_default_config_no_overwrite(self, tmp_path: Path)
ensure_default_config should not overwrite existing config.
class TestHookEngineEdgeCases
Edge case tests for HookEngine.
Imports
__future__asynciodulus_tools.hook_enginepathlibpytestsystempfile
▶tests/test_injection_fix.py65 LOC
Functions (1)
def test_consolidation()
Imports
osproviderssys
▶tests/test_license.py208 LOC
Tests for Dulus license system.
Classes (5)
class TestLicenseValidation
↳ test_valid_pro_key(self)
↳ test_valid_enterprise_key(self)
↳ test_invalid_signature_wrong_secret(self)
↳ test_expired_key(self)
↳ test_free_tier_no_key(self)
↳ test_malformed_prefix(self)
↳ test_malformed_base64(self)
↳ test_payload_tampering_tier_changed(self)
Un atacante modifica el tier en el payload pero reusa la firma original.
↳ test_payload_tampering_expiry_extended(self)
Un atacante extiende la expiración pero reusa la firma original.
↳ test_expired_exact_boundary(self)
Key que expira exactamente AHORA debe ser inválida.
class TestFeatureGates
↳ test_free_limits(self)
↳ test_pro_limits(self)
↳ test_enterprise_limits(self)
↳ test_pro_vs_free_features(self)
class TestRevocation
↳ test_revoked_key_simulated(self)
Simulación de revocación: el manager no tiene revocación nativa,
pero el servidor sí. Este test documenta el comportamiento esperado.
class TestCryptoConsistency
↳ test_manager_vs_server_signature_algorithm(self)
Manager y server deben usar el mismo algoritmo HMAC (raw secret).
↳ test_cross_validation_manager_to_server(self)
Una key generada por license_manager debe validar en license_server.
class TestMachineFingerprint
↳ test_machine_locked_key(self)
Cuando se implemente, una key generada para máquina A
debe fallar en máquina B.
Imports
base64jsonlicense_managerpathlibsystimeunittest
▶tests/test_mcp.py396 LOC
Tests for the MCP package (mcp/).
Classes (5)
class TestTypes
↳ test_server_config_from_dict_stdio(self)
↳ test_server_config_from_dict_sse(self)
↳ test_server_config_defaults(self)
↳ test_server_config_disabled(self)
↳ test_mcp_tool_schema(self)
↳ test_make_request(self)
↳ test_make_request_with_params(self)
↳ test_make_notification(self)
↳ test_init_params_structure(self)
class TestConfig
↳ test_load_empty(self, tmp_config)
↳ test_load_user_config(self, tmp_config)
↳ test_load_project_config_overrides_user(self, tmp_config, monkeypatch)
↳ test_add_server_to_user_config(self, tmp_config)
↳ test_remove_server_from_user_config(self, tmp_config)
↳ test_remove_nonexistent(self, tmp_config)
↳ test_multiple_servers(self, tmp_config)
class TestMCPClient
↳ _make_client(self, transport_mock)
↳ test_list_tools_empty(self)
↳ test_list_tools_parses_tool(self)
↳ test_list_tools_read_only_hint(self)
↳ test_list_tools_no_tools_capability(self)
↳ test_call_tool_success(self)
↳ test_call_tool_error_flag(self)
↳ test_call_tool_image_content(self)
↳ test_call_tool_not_connected(self)
↳ test_qualified_name_sanitized(self)
↳ test_status_line_connected(self)
↳ test_status_line_error(self)
class TestMCPManager
↳ test_add_server(self)
↳ test_call_tool_unknown_server(self)
↳ test_call_tool_invalid_name(self)
↳ test_all_tools_empty_when_disconnected(self)
↳ test_all_tools_from_connected_server(self)
↳ test_singleton(self)
class TestStdioTransportEcho
Use Python's own interpreter as a trivial echo MCP server.
↳ test_full_round_trip(self, tmp_path)
Functions (2)
def reset_manager(monkeypatch)
Each test gets a fresh MCPManager singleton.
def tmp_config(tmp_path, monkeypatch)
Redirect MCP config paths to tmp_path.
Imports
__future__dulus_mcp.clientdulus_mcp.configdulus_mcp.typesjsonpathlibpytestthreadingtimeunittest.mock
▶tests/test_memory.py275 LOC
Tests for the memory package (memory/).
Classes (9)
class TestSaveAndLoad
↳ test_roundtrip(self)
↳ test_creates_file_on_disk(self)
↳ test_update_existing(self)
Save same name twice → only 1 entry with updated content.
↳ test_project_scope_stored_separately(self)
↳ test_load_index_all_combines_scopes(self)
class TestDelete
↳ test_delete_removes_file_and_index(self)
↳ test_delete_nonexistent_no_error(self)
↳ test_delete_from_project_scope(self)
class TestSearch
↳ test_search_by_keyword(self)
↳ test_search_case_insensitive(self)
↳ test_search_in_content(self)
↳ test_search_across_scopes(self)
class TestGetMemoryContext
↳ test_returns_index_text(self)
↳ test_empty_when_no_memories(self)
↳ test_project_memories_labelled(self)
class TestTruncation
↳ test_no_truncation_within_limits(self)
↳ test_line_truncation(self)
↳ test_byte_truncation(self)
class TestSlugify
↳ test_basic(self)
↳ test_special_chars(self)
↳ test_max_length(self)
class TestParseFrontmatter
↳ test_parse(self)
↳ test_no_frontmatter(self)
class TestScanAndAge
↳ test_scan_memory_dir(self)
↳ test_format_manifest(self)
↳ test_memory_age_days_today(self)
↳ test_memory_age_days_old(self)
↳ test_memory_age_str(self)
↳ test_freshness_text_fresh(self)
↳ test_freshness_text_stale(self)
class TestMemoryTypes
↳ test_types_list(self)
Functions (2)
def redirect_memory_dirs(tmp_path, monkeypatch)
Redirect user and project memory dirs to tmp_path for all tests.
def _make_entry(name = 'test note', description = 'a test', type_ = 'user', content = 'hello world', scope = 'user')
Imports
memory.contextmemory.scanmemory.storememory.typespathlibpytest
▶tests/test_notification_manager.py641 LOC
Tests for NotificationManager.
Classes (2)
class TestNotificationEvent
Test suite for NotificationEvent dataclass.
↳ test_create_minimal(self)
Create a minimal NotificationEvent.
↳ test_create_full(self)
Create a NotificationEvent with all fields.
↳ test_to_dict(self)
Serialize to dict.
↳ test_from_dict(self)
Deserialize from dict.
↳ test_from_dict_ignores_unknown_fields(self)
from_dict should ignore unknown fields.
class TestNotificationManager
Test suite for NotificationManager.
↳ tmp_root(self, tmp_path: Path)
Create a temporary root directory.
↳ test_init_creates_store(self, tmp_root: Path)
Initialization should create the notifications directory.
↳ test_new_id(self, tmp_root: Path)
new_id should generate unique IDs.
↳ test_publish(self, tmp_root: Path)
Publish a notification.
↳ test_publish_with_explicit_id(self, tmp_root: Path)
Publish a notification with an explicit ID.
↳ test_deduplication(self, tmp_root: Path)
Publishing with same dedupe_key should return existing.
↳ test_no_dedup_without_key(self, tmp_root: Path)
Publishing without dedupe_key should always create new.
↳ test_claim_for_sink(self, tmp_root: Path)
Claim pending notifications for a sink.
↳ test_claim_respects_targets(self, tmp_root: Path)
Claim should only return notifications targeting the sink.
↳ test_claim_empty(self, tmp_root: Path)
Claim with no pending notifications should return empty.
↳ test_claim_limit(self, tmp_root: Path)
Claim should respect the limit.
↳ test_ack(self, tmp_root: Path)
Acknowledge a claimed notification.
↳ test_ack_not_found(self, tmp_root: Path)
Ack a nonexistent notification.
↳ test_ack_not_claimed_by_you(self, tmp_root: Path)
Ack a notification claimed by another sink.
↳ test_ack_already_acked(self, tmp_root: Path)
Ack a notification twice should return already_acked.
↳ test_has_pending_for_sink(self, tmp_root: Path)
Check if there are pending notifications for a sink.
↳ test_recover_stale_claims(self, tmp_root: Path)
Recover notifications with stale claims.
↳ test_recover_no_stale(self, tmp_root: Path)
Recover with no stale claims should return 0.
↳ test_delete(self, tmp_root: Path)
Delete a notification.
↳ test_delete_nonexistent(self, tmp_root: Path)
Delete a nonexistent notification should return False.
↳ test_list_all(self, tmp_root: Path)
List all notifications.
↳ test_list_pending(self, tmp_root: Path)
List only pending notifications.
↳ test_persistence(self, tmp_root: Path)
Notifications should be persisted to disk.
↳ test_load_from_disk(self, tmp_root: Path)
Notifications should be loaded from disk on init.
↳ test_load_from_disk_with_dedupe(self, tmp_root: Path)
Deduplication index should be rebuilt from disk.
↳ test_delete_cleans_up_dedupe(self, tmp_root: Path)
Deleting a notification should clean up the dedupe index.
↳ test_load_skips_corrupted_files(self, tmp_root: Path)
Loading should skip corrupted JSON files.
Imports
__future__dulus_tools.notification_managerjsonpathlibpytestsystime
▶tests/test_plugin.py350 LOC
Tests for the plugin package (plugin/).
Classes (4)
class TestPluginTypes
↳ test_parse_simple(self)
↳ test_parse_with_source(self)
↳ test_sanitize_name(self)
↳ test_manifest_from_dict(self)
↳ test_manifest_defaults(self)
↳ test_manifest_from_plugin_dir_json(self, tmp_path, local_plugin)
↳ test_manifest_from_plugin_dir_md(self, tmp_path)
↳ test_manifest_missing(self, tmp_path)
↳ test_entry_to_dict_roundtrip(self, tmp_path)
↳ test_entry_qualified_name(self, tmp_path)
class TestPluginStore
↳ test_list_empty(self)
↳ test_install_local(self, local_plugin)
↳ test_install_creates_dir(self, local_plugin)
↳ test_install_no_source_error(self)
↳ test_install_duplicate(self, local_plugin)
↳ test_install_force(self, local_plugin)
↳ test_get_plugin(self, local_plugin)
↳ test_get_plugin_missing(self)
↳ test_uninstall(self, local_plugin)
↳ test_uninstall_not_found(self)
↳ test_enable_disable(self, local_plugin)
↳ test_disable_all(self, local_plugin, tmp_path)
↳ test_update_local_path_rejected(self, local_plugin)
↳ test_update_not_found(self)
↳ test_project_scope(self, local_plugin)
class TestPluginRecommend
↳ test_empty_context(self)
↳ test_git_context(self)
↳ test_python_lint_context(self)
↳ test_sql_context(self)
↳ test_top_n(self)
↳ test_sorted_by_score(self)
↳ test_recommend_from_files(self, tmp_path)
↳ test_format_recommendations(self)
↳ test_format_empty(self)
class TestAskUserQuestion
↳ test_drain_empty(self)
drain_pending_questions returns False when nothing pending.
↳ test_roundtrip_with_freetext(self)
Submit a question, simulate user typing 'yes', collect result.
↳ test_roundtrip_with_option_selection(self)
Select option 1 from a numbered list.
↳ test_tool_schema_registered(self)
AskUserQuestion must appear in TOOL_SCHEMAS.
Functions (2)
def tmp_plugin_paths(tmp_path, monkeypatch)
Redirect all plugin config paths to tmp_path.
def local_plugin(tmp_path)
Create a minimal local plugin directory.
Imports
__future__jsonpathlibplugin.recommendplugin.storeplugin.typespytestshutilthreadingunittest.mock
▶tests/test_session_fork.py380 LOC
Tests for SessionFork — Feature 11.
Covers:
* enumerate_turns on empty / missing files
* enumerate_turns with TurnBegin / TurnEnd records
* _extract_user_text with str, list, dict, and mixed input
* truncate_at_turn boundary correctness
* fork (full copy and at specific turn)
* undo (success and error cases)
Functions (26)
def _make_wire_record(msg_type: str, payload: dict | None = None)
Return a single JSONL line for the wire format.
def _build_wire_file(path: Path, records: list[str])
Write a list of JSON strings to *path* as JSONL.
def tmp_session(tmp_path: Path)
Provide a fresh SessionFork pointing at a temporary directory.
def test_enumerate_turns_empty_dir(tmp_session: SessionFork)
Missing wire.jsonl should return an empty list.
def test_enumerate_turns_no_turns(tmp_session: SessionFork)
Wire file with no TurnBegin should return an empty list.
def test_enumerate_turns_single_turn(tmp_session: SessionFork)
A single TurnBegin with a plain-string user_input.
def test_enumerate_turns_multiple_turns(tmp_session: SessionFork)
Multiple turns with different user input formats.
def test_enumerate_turns_skips_metadata(tmp_session: SessionFork)
Metadata records should be ignored when counting turns.
def test_enumerate_turns_ignores_bad_json(tmp_session: SessionFork)
Malformed JSON lines should be silently skipped.
def test_enumerate_turns_with_custom_path(tmp_session: SessionFork, tmp_path: Path)
Passing an explicit wire_path should work.
def test_extract_user_text_str(tmp_session: SessionFork)
def test_extract_user_text_str_multiline(tmp_session: SessionFork)
Only the first line is kept.
def test_extract_user_text_str_truncation(tmp_session: SessionFork)
Long strings are truncated to 80 characters.
def test_extract_user_text_list_of_dicts(tmp_session: SessionFork)
def test_extract_user_text_list_of_strs(tmp_session: SessionFork)
def test_extract_user_text_list_mixed(tmp_session: SessionFork)
def test_extract_user_text_list_no_text_key(tmp_session: SessionFork)
def test_extract_user_text_none(tmp_session: SessionFork)
def test_extract_user_text_int(tmp_session: SessionFork)
def test_truncate_at_turn_missing_file(tmp_session: SessionFork)
Should raise ValueError when wire file does not exist.
def test_truncate_at_turn_zero(tmp_session: SessionFork)
Truncate at the very first turn.
def test_truncate_at_turn_one(tmp_session: SessionFork)
Truncate at the second turn (index 1).
def test_truncate_at_turn_includes_metadata(tmp_session: SessionFork)
Metadata should always be carried forward.
def test_truncate_at_turn_beyond_range(tmp_session: SessionFork)
Turn index beyond available turns should include all existing turns.
def test_read_all_lines_missing(tmp_session: SessionFork)
Reading a non-existent path should return [].
def test_read_all_lines_basic(tmp_session: SessionFork)
Should return stripped non-empty lines.
Imports
__future__dulus_tools.session_forkjsonpathlibpytestsystempfile
▶tests/test_shell_mode.py175 LOC
Tests for ShellMode (Feature 16).
Classes (3)
class TestShellModeState
Tests for shell mode state toggling.
↳ test_default_inactive(self, shell: ShellMode)
Shell mode should start inactive.
↳ test_activate(self, shell: ShellMode)
activate() should set active to True.
↳ test_deactivate(self, shell: ShellMode)
deactivate() should set active to False.
↳ test_toggle_on(self, shell: ShellMode)
toggle() should flip from False to True.
↳ test_toggle_off(self, shell: ShellMode)
toggle() should flip from True to False.
↳ test_default_shell_from_env(self, monkeypatch: pytest.MonkeyPatch)
Shell should default from SHELL/COMSPEC environment variable.
↳ test_default_shell_fallback(self, monkeypatch: pytest.MonkeyPatch)
Shell should fall back to system default when SHELL/COMSPEC is unset.
class TestShellModeExecution
Tests for shell command execution via execute().
class TestShellModeHistory
Tests for command history tracking.
↳ test_history_is_copy(self, shell: ShellMode)
get_history() should return a copy, not a reference.
↳ test_history_empty_by_default(self, shell: ShellMode)
History should be empty for a new ShellMode.
Functions (1)
def shell()
Return a fresh ShellMode instance.
Imports
__future__asynciodulus_tools.shell_modeospathlibpytestsys
▶tests/test_skills.py234 LOC
Functions (23)
def skill_dir(tmp_path, monkeypatch)
Create a temp skill directory with sample skills and patch _get_skill_paths.
def test_parse_list_field_bracket()
def test_parse_list_field_plain()
def test_parse_list_field_single()
def test_parse_skill_file(skill_dir)
def test_parse_skill_file_review(skill_dir)
def test_parse_skill_file_invalid(tmp_path)
def test_parse_skill_file_no_name(tmp_path)
def test_parse_skill_file_context_fork(tmp_path)
def test_parse_skill_file_allowed_tools(tmp_path)
def test_load_skills(skill_dir)
def test_load_skills_empty_dir(tmp_path, monkeypatch)
def test_load_skills_nonexistent_dir(tmp_path, monkeypatch)
def test_load_skills_builtins_present(monkeypatch)
Without patching, builtins (commit, review) should be present.
def test_load_skills_project_overrides_builtin(tmp_path, monkeypatch)
A project skill with the same name overrides the builtin.
def test_find_skill_commit(skill_dir)
def test_find_skill_review(skill_dir)
def test_find_skill_review_pr(skill_dir)
def test_find_skill_nonexistent(skill_dir)
def test_substitute_arguments_placeholder()
def test_substitute_named_args(tmp_path)
def test_substitute_missing_arg()
def test_substitute_no_placeholders()
Imports
__future__pathlibpytestskillskill.loader
▶tests/test_steer_input.py248 LOC
Tests for the SteerInput system (steer_input.py).
Classes (3)
class TestSteerInput
Test suite for the SteerInput class.
↳ test_default_enabled(self)
SteerInput should be enabled by default.
↳ test_enable_disable(self)
Enable/disable should toggle the enabled state.
↳ test_get_next_steer_sync_no_loop(self)
Sync get without running loop should return None gracefully.
class TestSteerInputSingleton
Test suite for the module-level singleton.
↳ setup_method(self)
Reset singleton before each test.
↳ teardown_method(self)
Reset singleton after each test.
↳ test_get_steer_input_creates_singleton(self)
get_steer_input should create the singleton on first call.
↳ test_get_steer_input_returns_same_instance(self)
Multiple calls should return the same instance.
↳ test_reset_steer_input(self)
reset_steer_input should create a new instance on next get.
class TestSteerInputEdgeCases
Edge cases for SteerInput.
Imports
__future__asynciopyteststeer_input
▶tests/test_subagent.py136 LOC
Tests for the sub-agent system (subagent.py).
Classes (7)
class TestSpawnAndWait
↳ test_spawn_and_wait_completes(self, manager)
↳ test_spawn_returns_immediately(self, manager)
class TestListTasks
↳ test_list_tasks(self, manager)
class TestCancel
↳ test_cancel_running_task(self, slow_manager)
class TestDepthLimit
↳ test_spawn_at_max_depth_fails(self, manager)
class TestGetResult
↳ test_get_result_completed(self, manager)
↳ test_get_result_unknown_id(self, manager)
class TestExtractFinalText
↳ test_extracts_last_assistant(self)
↳ test_returns_none_for_empty(self)
↳ test_returns_none_no_assistant(self)
class TestWaitUnknown
↳ test_wait_unknown_returns_none(self, manager)
Functions (4)
def _make_mock_agent_run(sleep_per_iter = 0.05, iters = 3)
Return a mock _agent_run that simulates work and checks cancellation.
def _make_slow_mock(sleep_per_iter = 0.2, iters = 10)
Return a slow mock for cancellation testing.
def manager(monkeypatch)
Create a SubAgentManager with mocked _agent_run.
def slow_manager(monkeypatch)
Create a SubAgentManager with a slow mock for cancel testing.
Imports
multi_agent.subagentpytestthreadingtime
▶tests/test_task.py292 LOC
Tests for the task package (task/).
Classes (3)
class TestTaskTypes
↳ test_default_status(self)
↳ test_status_icon(self)
↳ test_to_dict_roundtrip(self)
↳ test_from_dict_unknown_status_defaults_pending(self)
↳ test_one_line_no_blockers(self)
↳ test_one_line_with_blockers(self)
↳ test_one_line_resolved_blockers_hidden(self)
class TestTaskStore
↳ test_create_returns_task(self)
↳ test_ids_are_sequential(self)
↳ test_get_returns_task(self)
↳ test_get_unknown_returns_none(self)
↳ test_list_returns_all(self)
↳ test_list_empty(self)
↳ test_update_status(self)
↳ test_update_subject_and_description(self)
↳ test_update_owner(self)
↳ test_update_no_changes_returns_empty_fields(self)
↳ test_update_unknown_task(self)
↳ test_update_add_blocks(self)
↳ test_update_add_blocked_by(self)
↳ test_update_metadata_merge(self)
↳ test_delete_removes_task(self)
↳ test_delete_unknown(self)
↳ test_persistence_round_trip(self, tmp_path)
Tasks saved to disk are re-loaded correctly.
↳ test_clear_all(self)
↳ test_thread_safety(self)
Concurrent creates should produce unique IDs.
class TestTaskToolFunctions
Test the string-returning functions used by the registered tools.
↳ test_task_create_tool(self)
↳ test_task_update_tool_status(self)
↳ test_task_update_tool_delete(self)
↳ test_task_update_not_found(self)
↳ test_task_get_tool(self)
↳ test_task_get_not_found(self)
↳ test_task_list_tool_empty(self)
↳ test_task_list_tool_multiple(self)
↳ test_task_list_hides_resolved_blockers(self)
↳ test_tool_schemas_registered(self)
All four task tools must be registered in tool_registry.
↳ test_tool_schemas_in_tool_schemas_list(self)
Task tool schemas are also present in TOOL_SCHEMAS for Claude's tool list.
Functions (1)
def isolated_store(tmp_path, monkeypatch)
Each test gets a fresh in-memory + on-disk task store.
Imports
__future__jsonpathlibpytesttasktask.storetask.typesthreading
▶tests/test_telegram_buffer.py60 LOC
Functions (2)
def test_telegram_buffer_pruning()
Test that old Telegram messages are pruned from output buffer.
def test_sanitize_text()
Test that sanitize_text removes surrogates but keeps valid text/emojis.
Imports
commoninputossys
▶tests/test_think_tool.py94 LOC
Tests for the Think tool (tools_think.py).
Classes (1)
class TestThinkTool
Test suite for the Think tool.
↳ test_tool_is_registered(self)
Think tool must be registered in the tool registry.
↳ test_tool_is_read_only(self)
Think tool should be read-only (doesn't mutate state).
↳ test_tool_is_concurrent_safe(self)
Think tool should be safe to run concurrently.
↳ test_think_execution(self)
Think tool should execute and return the thought.
↳ test_think_with_empty_string(self)
Think tool should handle empty string thoughts.
↳ test_think_with_long_thought(self)
Think tool should handle very long thoughts.
↳ test_think_schema_has_required_thought(self)
The schema should require the 'thought' parameter.
↳ test_think_description_is_meaningful(self)
The tool description should explain its purpose.
↳ test_think_execution_returns_string(self)
Think tool should always return a string.
↳ test_think_with_multiline_thought(self)
Think tool should handle multiline thoughts.
Functions (1)
def _clean_registry()
Reset registry before each test.
Imports
__future__pytesttool_registrytools_think
▶tests/test_todo_tool.py222 LOC
Tests for the SetTodoList tool (tools_todo.py).
Classes (2)
class TestSetTodoListTool
Test suite for the SetTodoList tool.
↳ test_tool_is_registered(self)
SetTodoList tool must be registered.
↳ test_tool_is_not_read_only(self)
SetTodoList should NOT be read-only (it writes todos to disk).
↳ test_tool_is_concurrent_safe(self)
SetTodoList should be safe to run concurrently.
↳ test_get_empty_todo_list(self)
Getting todos when none exist should return empty message.
↳ test_set_todo_list(self)
Setting todos should persist them.
↳ test_set_and_get_todo_list(self)
Setting todos then getting them should return the list.
↳ test_todo_list_persistence(self)
Todos should persist to disk and be reloadable.
↳ test_todo_list_overwrite(self)
Setting new todos should replace old ones.
↳ test_empty_todo_array(self)
Setting an empty array should clear todos.
↳ test_todo_with_empty_title_is_filtered(self)
Todos with empty titles should be filtered out.
↳ test_todo_status_icons(self)
Each status should have the correct icon in output.
↳ test_todo_invalid_status(self)
Invalid statuses should use the default icon.
↳ test_schema_has_todos_property(self)
The schema should have a 'todos' property.
class TestTodoManager
Test suite for the TodoManager class directly.
↳ test_manager_init_default_dir(self)
TodoManager should default to cwd.
↳ test_manager_init_custom_dir(self, tmp_path)
TodoManager should accept custom session dir.
↳ test_load_todos_missing_file(self, tmp_path)
Loading todos when file doesn't exist should return empty list.
↳ test_load_todos_corrupt_file(self, tmp_path)
Loading todos from corrupt file should return empty list.
↳ test_read_todos_empty(self, tmp_path)
Reading empty todos should return empty message.
↳ test_write_and_read_todos(self, tmp_path)
Writing then reading todos should preserve data.
↳ test_save_creates_directories(self, tmp_path)
Saving todos should create the .dulus directory if needed.
↳ test_write_todos_filters_empty_titles(self, tmp_path)
write_todos should filter out empty titles.
Functions (1)
def _clean_registry(tmp_path, monkeypatch)
Reset registry and todo state before each test.
Imports
__future__jsonospytesttool_registrytools_todo
▶tests/test_todo_visualization.py230 LOC
Tests for TodoVisualizer (Feature 20).
Classes (4)
class TestRenderCli
Tests for CLI todo rendering.
↳ test_render_cli_header(self, sample_todos: dict)
CLI output should have a header.
↳ test_render_cli_pending_icon(self, sample_todos: dict)
CLI output should show pending items with [ ] icon.
↳ test_render_cli_in_progress_icon(self, sample_todos: dict)
CLI output should show in_progress items with [/] icon.
↳ test_render_cli_done_icon(self, sample_todos: dict)
CLI output should show done items with [x] icon and strikethrough.
↳ test_render_cli_strikethrough_done(self, sample_todos: dict)
CLI output should apply strikethrough to done items.
↳ test_render_cli_empty(self, empty_todos: dict)
CLI output should handle empty todo lists.
↳ test_render_cli_unknown_status(self)
CLI output should handle unknown status gracefully.
↳ test_render_cli_default_title(self)
CLI output should use 'Untitled' when title is missing.
↳ test_render_cli_separator_lines(self, sample_todos: dict)
CLI output should have separator lines.
↳ test_render_cli_ansi_codes(self, sample_todos: dict)
CLI output should include ANSI color codes.
class TestRenderHtml
Tests for HTML todo rendering.
↳ test_render_html_has_todo_block(self, sample_todos: dict)
HTML output should have todo-block wrapper.
↳ test_render_html_has_list(self, sample_todos: dict)
HTML output should use a <ul> list.
↳ test_render_html_pending_checkbox(self, sample_todos: dict)
HTML output should have unchecked box for pending.
↳ test_render_html_done_checkbox(self, sample_todos: dict)
HTML output should have checked box for done.
↳ test_render_html_status_classes(self, sample_todos: dict)
HTML output should apply CSS classes per status.
↳ test_render_html_escapes_titles(self)
HTML output should escape special characters in titles.
↳ test_render_html_empty(self, empty_todos: dict)
HTML output should handle empty todo lists.
↳ test_render_html_contains_titles(self, sample_todos: dict)
HTML output should contain all todo titles.
class TestRenderTelegram
Tests for Telegram todo rendering.
↳ test_render_telegram_header(self, sample_todos: dict)
Telegram output should have a header.
↳ test_render_telegram_pending_emoji(self, sample_todos: dict)
Telegram output should use white square for pending.
↳ test_render_telegram_in_progress_emoji(self, sample_todos: dict)
Telegram output should use refresh emoji for in_progress.
↳ test_render_telegram_done_emoji(self, sample_todos: dict)
Telegram output should use checkmark for done.
↳ test_render_telegram_empty(self, empty_todos: dict)
Telegram output should handle empty lists.
↳ test_render_telegram_unknown_status(self)
Telegram output should use question mark for unknown status.
↳ test_render_telegram_markdown_bold(self, sample_todos: dict)
Telegram output should use Markdown bold for header.
class TestStatusIcons
Tests for the status icon mappings.
↳ test_cli_status_icons_complete(self)
STATUS_ICONS_CLI should have all three statuses.
↳ test_cli_status_icon_values(self)
STATUS_ICONS_CLI should return icon and ANSI color.
↳ test_telegram_status_emojis_complete(self)
STATUS_EMOJI_TG should have all three statuses.
↳ test_telegram_status_emoji_values(self)
STATUS_EMOJI_TG should return emoji characters.
Functions (3)
def sample_todos()
Return a todo block with items in various states.
def empty_todos()
Return a todo block with no items.
def single_todo()
Return a todo block with a single item.
Imports
__future__dulus_tools.todo_visualizerpathlibpytestsys
▶tests/test_tool_registry.py160 LOC
Functions (12)
def _clean_registry()
Reset registry before each test.
def _make_echo_tool(name: str = 'echo', read_only: bool = False)
Helper to build a simple echo tool.
def test_register_and_get()
def test_get_unknown_returns_none()
def test_get_all_tools_empty()
def test_get_all_tools()
def test_get_tool_schemas()
def test_execute_tool()
def test_execute_unknown_tool()
def test_output_truncation()
def test_no_truncation_when_within_limit()
def test_duplicate_register_overwrites()
Imports
__future__pytesttool_registry
▶tests/test_voice.py296 LOC
Tests for the voice/ package (no hardware required).
All tests run without a microphone or STT library installed.
They cover the pure-Python helpers: WAV wrapping, keyterm extraction,
availability checks, and the REPL integration sentinel.
Classes (9)
class TestSplitIdentifier
↳ test_camel_case(self)
↳ test_kebab_case(self)
↳ test_snake_case(self)
↳ test_short_fragments_dropped(self)
↳ test_path_like(self)
class TestGetVoiceKeyterms
↳ test_returns_list(self)
↳ test_global_terms_present(self)
↳ test_max_length(self)
↳ test_deduplication(self)
↳ test_recent_files_passed(self)
class TestPcmToWav
↳ test_riff_header(self)
↳ test_data_chunk(self)
↳ test_roundtrip_length(self)
class TestKeytermsToPrompt
↳ test_empty(self)
↳ test_contains_terms(self)
↳ test_truncates_at_40(self)
class TestSttAvailability
↳ test_returns_tuple(self)
↳ test_backend_name_string(self)
↳ test_openai_api_available_when_key_set(self)
↳ test_unavailable_without_backends(self)
class TestRecorderAvailability
↳ test_returns_tuple(self)
↳ test_sounddevice_makes_available(self)
class TestVoiceInit
↳ test_check_voice_deps_returns_tuple(self)
↳ test_exports(self)
class TestReplVoiceIntegration
↳ test_voice_in_commands(self)
↳ test_voice_command_callable(self)
↳ test_handle_slash_voice_sentinel(self)
handle_slash('/voice ...') propagates __voice__ sentinel from cmd_voice.
↳ test_voice_status_no_crash(self, capsys)
'/voice status' should not raise even without audio hardware.
↳ test_voice_lang_set(self, capsys)
↳ test_wake_in_commands(self)
↳ test_wake_command_callable(self)
↳ test_wake_status_no_crash(self, capsys)
↳ test_wake_phrases_no_crash(self, capsys)
class TestWakeWord
↳ test_contains_wake_exact(self)
↳ test_contains_wake_case_insensitive(self)
↳ test_contains_wake_no_match(self)
↳ test_wake_phrases_list(self)
↳ test_listener_init(self)
↳ test_voice_package_exports_wake(self)
Functions (1)
def _make_pcm(n_samples: int = 1600)
Return silent int16 PCM (all zeros).
Imports
__future__pathlibpyteststructsysunittest.mock
▶tests/test_wire_events.py245 LOC
Tests for WireEventBus and WireEvent classes (Feature 17).
Classes (2)
class TestWireEventTypes
Tests for individual event type constructors.
↳ test_base_wire_event_defaults(self)
WireEvent should have sensible defaults.
↳ test_turn_begin_event(self)
TurnBeginEvent should set event_type and store user_input.
↳ test_turn_end_event(self)
TurnEndEvent should set event_type.
↳ test_step_begin_event(self)
StepBeginEvent should set event_type and step_number.
↳ test_step_end_event(self)
StepEndEvent should set event_type and step_number.
↳ test_tool_call_event(self)
ToolCallEvent should set event_type and tool details.
↳ test_compaction_begin_event(self)
CompactionBeginEvent should set event_type.
↳ test_compaction_end_event(self)
CompactionEndEvent should set event_type.
class TestWireEventBus
Tests for the WireEventBus.
↳ test_subscribe_returns_token(self, bus: WireEventBus)
subscribe() should return a string token.
Functions (1)
def bus(tmp_path: Path)
Return a WireEventBus with a temporary session directory.
Imports
__future__asynciodulus_tools.wire_eventsjsonpathlibpytestsys
▶tmux_offloader.py194 LOC
TmuxOffloader - Wrapper alternativo a TmuxOffload
Usa tmux directamente ya que TmuxOffload tiene bugs
Functions (6)
def generate_session_name(prefix = 'job')
Genera nombre único de sesión
def run_in_tmux(command, session_name = None, wait = False, timeout = None)
Ejecuta un comando en una sesión tmux detached.
Args:
command: Comando a ejecutar (string)
session_name: Nombre de sesión (auto-generado si None)
wait: Si True, espera a que termine y retorna output
timeout: Segundos máximos de espera (si wait=True)
Returns:
Si wait=False: sess
def get_session_output(session_name)
Captura el output de una sesión tmux existente.
Retorna el output o None si la sesión no existe.
def is_session_active(session_name)
Verifica si una sesión tmux sigue activa
def kill_session(session_name)
Mata una sesión tmux
def list_sessions()
Lista todas las sesiones tmux activas
Imports
pathlibrandomstringsubprocesstime
▶tmux_tools.py359 LOC
Tmux integration tools for Dulus.
Gives the AI model direct control over tmux sessions: create panes,
send commands, read output, and manage layouts. Auto-detected at
startup — tools are only registered when tmux is available on the host.
Functions (17)
def _find_tmux()
Locate a tmux binary.
def tmux_available()
Return True if a tmux-compatible binary exists on the system.
def _safe(value: str)
Sanitize a tmux target/session name to prevent shell injection.
def _t(params: dict, key: str = 'target')
Build a -t flag from params, or empty string if absent.
def _run(cmd: str, timeout: int = 10)
Run a tmux command and return combined stdout+stderr.
Replaces bare 'tmux' prefix with the detected binary path.
Unsets nesting guards ($TMUX / $PSMUX_SESSION) so commands work
from inside an existing session.
def _tmux_list_sessions(params: dict, config: dict)
def _tmux_new_session(params: dict, config: dict)
def _tmux_split_window(params: dict, config: dict)
def _tmux_send_keys(params: dict, config: dict)
def _tmux_capture_pane(params: dict, config: dict)
def _tmux_list_panes(params: dict, config: dict)
def _tmux_select_pane(params: dict, config: dict)
def _tmux_kill_pane(params: dict, config: dict)
def _tmux_new_window(params: dict, config: dict)
def _tmux_list_windows(params: dict, config: dict)
def _tmux_resize_pane(params: dict, config: dict)
def register_tmux_tools()
Register all tmux tools. Returns number of tools registered.
Imports
osreshlexshutilsubprocesssystool_registry
▶tool_registry.py214 LOC
Tool plugin registry for dulus.
Provides a central registry for tool definitions, lookup, schema export,
and dispatch with output truncation.
Classes (1)
class ToolDef
Definition of a single tool plugin.
Attributes:
name: unique tool identifier
schema: JSON-schema dict sent to the API (name, description, input_schema)
func: callable(params: dict, config: dict) -> str
read_only: True if the tool never mutates state
concurrent_safe: True if s
Functions (8)
def register_tool(tool_def: ToolDef)
Register a tool, overwriting any existing tool with the same name.
def get_tool(name: str)
Look up a tool by name. Returns None if not found.
def get_all_tools()
Return all registered tools (insertion order).
def get_tool_schemas()
Return the schemas of all registered tools (for API tool parameter).
def is_display_only(name: str)
Check if a tool is display-only (visual output, don't read back).
Returns True if the tool's output should not be fed back to the model,
typically for ASCII art, visual charts, or display-only content.
def execute_tool(name: str, params: Dict[str, Any], config: Dict[str, Any], max_output: int = 2500)
Dispatch a tool call by name.
Args:
name: tool name
params: tool input parameters dict
config: runtime configuration dict
max_output: maximum allowed output length in characters.
Default 2500 — applies uniformly to built-ins AND plugins.
Tools that need more MUST pag
def clear_last_output()
Reset the last_tool_output.txt file. Should be called at turn start.
def clear_registry()
Remove all registered tools. Intended for testing.
Imports
__future__dataclassesjsonpathlibtyping
▶tools/build_sandbox_bundle.py104 LOC
Build the sandbox tarball that ships inside the Dulus wheel.
Usage:
python tools/build_sandbox_bundle.py # uses existing sandbox/dist
python tools/build_sandbox_bundle.py --rebuild # also runs `npm run build` first
Output:
_bundles/sandbox.tar.gz (committed to git; consumed by sandbox_bootstrap.py)
Why a script and not a setuptools hook:
The Vite build needs Node.js and a populated node_modules. Wedging that
into the Python build chain would force every consu
Functions (4)
def _ensure_dist(rebuild: bool)
Make sure sandbox/dist/index.html exists, optionally rebuilding.
def _build_tarball()
Pack sandbox/dist/* into _bundles/sandbox.tar.gz.
def _summary()
Print size + file count for quick sanity checking.
def main()
Imports
__future__argparseospathlibsubprocesssystarfile
▶tools.py2941 LOC
Tool definitions and implementations for Dulus.
Functions (55)
def _is_in_tg_turn(config: dict)
Return True if the *current thread* is handling a Telegram interaction.
Checks the thread-local flag first (set by the slash-command runner thread),
then falls back to the config key (set by the main REPL for _bg_runner turns).
def _is_safe_bash(cmd: str)
def generate_unified_diff(old, new, filename, context_lines = 3)
def maybe_truncate_diff(diff_text, max_lines = 80)
def _read(file_path: str, limit: int = None, offset: int = None)
def _line_count(file_path: str)
def _ocr_extract(image_path: str, languages: str = 'en,es')
Local OCR backend for the ExtractTextFromImage tool.
Mirrors the engine-order logic of cmd_ocr in dulus.py: try pytesseract
first (fast, accurate, needs Tesseract binary), fall back to easyocr
(pure-Python, heavier, only if user installed it). Returns the
extracted text as a plain string, or a frie
def _print_last_output()
Print the full content of the last tool output directly.
Use this to display large outputs (ASCII art, logs, etc.) without re-writing them.
def _search_last_output(pattern: str = None, context: int = 2)
Search or summarize the tool outputs accumulated during this turn.
def _write(file_path: str, content: str)
def _edit(file_path: str, old_string: str, new_string: str, replace_all: bool = False)
def _kill_proc_tree(pid: int)
Kill a process and all its children.
def _find_windows_bash()
Return (kind, path) for the best bash available on Windows, or None.
def _find_shell_by_type(shell_type: str, forced_path: str = '')
Find a specific shell type on Windows. Returns (kind, path) or None.
def _win_to_posix(path_str: str, wsl: bool = False)
Convert a Windows path string to POSIX for bash/WSL.
C:\Users\foo → /c/Users/foo (gitbash)
C:\Users\foo → /mnt/c/Users/foo (wsl)
def _is_bash_safe(command: str)
Check if a bash command passes the safety filter.
Returns (is_safe, reason_if_unsafe).
def _rtk_binary()
def _rtk_enabled()
def _ensure_rtk_in_path()
Add the bundled rtk binary's directory to PATH so subshells resolve `rtk`.
Idempotent: re-checks PATH each call (flag may flip at runtime).
def _rtk_wrap_cmd(cmd: list)
Prepend the rtk binary so a subprocess argv list runs through rtk.
Used by tools that shell out directly via subprocess (GitStatus/Log/Diff,
Grep). For RTK-supported subcommands (git, grep, ls, find, diff, log, …)
this gets you token-optimized output; unsupported commands pass through.
Soft-fallbac
def _maybe_rewrite_with_rtk(command: str)
def _bash(command: str, timeout: int = 30)
def _glob(pattern: str, path: str = None)
def _has_rg()
def _grep_python_pure(pattern: str, search_path: Path, glob_pat: str = None, output_mode: str = 'files_with_matches', case_insensitive: bool = False, context: int = 0)
Pure-Python grep fallback for Windows or when grep/rg misbehave.
def _grep(pattern: str, path: str = None, glob: str = None, output_mode: str = 'files_with_matches', case_insensitive: bool = False, context: int = 0)
def _libretranslate_host()
Return the best LibreTranslate host URL.
In WSL2, localhost points to the WSL VM — use the Windows host IP instead
(read from /etc/resolv.conf nameserver line).
Falls back to localhost if not in WSL or can't parse.
def _clean_html(html: str)
Extract content text from HTML — only meaningful tags, strips noise.
def _libretranslate(text: str, source: str, target: str, host: str = None)
Translate via LibreTranslate (local). Returns None if unavailable.
Splits into 800-char chunks to stay within API limits.
def _libretranslate_available()
def _webfetch(url: str)
Fetch URL → plain text.
def _bravesearch(query: str, api_key: str, country: str = None)
Search using Brave Search API.
def _bochasearch(query: str, api_key: str)
Search using Bocha AI Search (博查) — native Chinese web search API.
def _websearch(query: str, config: dict = None, region: str = None)
def _parse_cell_id(cell_id: str)
Convert 'cell-N' shorthand to integer index; return None if not that form.
def _notebook_edit(notebook_path: str, new_source: str, cell_id: str = None, cell_type: str = None, edit_mode: str = 'replace')
def _detect_language(file_path: str)
def _run_quietly(cmd: list[str], cwd: str | None = None, timeout: int = 30)
Run a command, return (returncode, combined_output).
def _get_diagnostics(file_path: str, language: str = None)
def _ask_user_question(question: str, options: list[dict] | None = None, allow_freetext: bool = True, config: dict = None)
Block the agent loop and surface a question to the user in the terminal.
def ask_input_interactive(prompt: str, config: dict, menu_text: str = None)
Prompt the user for input, routing to Telegram if in a Telegram turn.
If menu_text is provided, it is sent ahead of the prompt.
def drain_pending_questions(config: dict)
Called by the REPL loop after each streaming turn.
Renders pending questions and collects user input.
Returns True if any questions were answered.
def _sleeptimer(seconds: int, config: dict)
def _print_to_console(content: str = '', style: str = 'normal', prefix: str = '', from_line: int = None, to_line: int = None, file_path: str = None, config: dict = None)
Print content to the user's console.
This tool displays text to the user WITHOUT consuming output tokens.
The content is shown immediately in the chat console.
If the conversation started via Telegram, also sends to Telegram.
Args:
content: Text to display (or use file_path to read from file)
def execute_tool(name: str, inputs: dict, permission_mode: str = 'auto', ask_permission: Optional[Callable[[str], bool]] = None, config: dict = None)
Dispatch tool execution; ask permission for write/destructive ops.
Permission checking is done here, then delegation goes to the registry.
The config dict is forwarded to tool functions so they can access
runtime context like _depth, _system_prompt, model, etc.
def _register_builtins()
Register all built-in tools into the central registry.
def _enter_plan_mode(params: dict, config: dict)
Enter plan mode: read-only except plan file.
def _exit_plan_mode(params: dict, config: dict)
Exit plan mode and present plan for user approval.
def _plugin_list(params: dict, config: dict)
Implement the PluginList tool to query installed tools dynamically.
def _plugin_tools_list(params: dict, config: dict)
List all tools exposed by installed plugins.
def _read_job(params: dict, config: dict)
Read a job result by its ID. Simple way to get TmuxOffload results.
def _git_diff(params: dict, _config: dict)
def _git_status(_params: dict, _config: dict)
def _git_log(params: dict, _config: dict)
def _launch_sandbox(params: dict, config: dict)
Imports
checkpoint.hooksdifflibdulus_mcp.toolsglobjsonmemory.offloadmemory.toolsmulti_agent.toolsospathlibreskill.toolssubprocesstask.toolsthreadingtool_registrytyping
▶tools_background.py416 LOC
Background Task Tools - Manage background tasks for Dulus.
Provides tools for listing, retrieving output from, and stopping background
tasks. These tools integrate with the agent's background task execution system
to give visibility into async operations.
Classes (2)
class BackgroundTask
Represents a single background task.
↳ duration(self)
Return elapsed seconds since start.
↳ is_running(self)
Check if the task is still running.
↳ stop(self, force: bool = False)
Signal the task to stop.
↳ append_output(self, line: str, is_stderr: bool = False)
Append an output line.
↳ to_dict(self)
Serialize to dict for display.
class BackgroundTaskStore
In-memory store for background tasks.
↳ __init__(self)
Initialize an empty task store.
↳ add_task(self, task: BackgroundTask)
Add a task to the store.
Args:
task: The BackgroundTask to store.
↳ get_task(self, task_id: str)
Retrieve a task by ID.
Args:
task_id: The task's unique ID.
Returns:
The BackgroundTask or None if not found.
↳ list_tasks(self)
List all tasks, most recent first.
Returns:
List of BackgroundTask objects.
↳ remove_task(self, task_id: str)
Remove a task from the store.
Args:
task_id: The task's unique ID.
Returns:
True if the task was removed, False if not found.
↳ update_status(self, task_id: str, status: str, exit_code: int | None = None)
Update a task's status.
Args:
task_id: The task's unique ID.
status: New status string.
exit_code: Optional exit code.
Functions (6)
def get_task_store()
Get the global BackgroundTaskStore singleton.
Returns:
The shared BackgroundTaskStore instance.
def create_background_task(description: str, kind: str, target: Callable, args: tuple = (), kwargs: dict | None = None)
Create and start a new background task.
Args:
description: Human-readable description of the task.
kind: Type of task (e.g., "bash", "python", "subagent").
target: Callable to run in the background thread.
args: Positional arguments for the target.
kwargs: Keyword arguments for
def _bg_task_list()
List all background tasks.
Returns:
Formatted string listing all tasks.
def _bg_task_output(task_id: str, block: bool = False, timeout: int = 60)
Get output from a background task.
Args:
task_id: The task's unique ID.
block: If True, wait for task completion.
timeout: Max seconds to wait when blocking.
Returns:
The task's output or an error message.
def _bg_task_stop(task_id: str, force: bool = False)
Stop a background task.
Args:
task_id: The task's unique ID.
force: If True, forcefully terminate.
Returns:
Confirmation or error message.
def _register()
Register all background task tools into the central registry.
Imports
__future__dataclassesthreadingtimetool_registrytypinguuid
▶tools_think.py68 LOC
Think Tool - Explicit agent reasoning for Dulus.
Provides a Think tool that allows the agent to perform explicit reasoning
without taking any action. The thought is logged and displayed to the user,
helping with transparency and debugging of the agent's decision-making process.
Functions (2)
def _think(thought: str)
Log a thought and return a display-friendly result.
Args:
thought: The reasoning text to log.
Returns:
A formatted string with the thought content for display.
def _register()
Register the Think tool into the central registry.
Imports
__future__tool_registry
▶tools_todo.py196 LOC
SetTodoList Tool - Persistent todo management for Dulus.
Provides a SetTodoList tool that allows the agent to manage a persistent
todo list stored in the session directory. Supports setting, getting, and
updating todo items with status tracking (pending, in_progress, done).
Classes (1)
class TodoManager
Manages persistent todo list storage in the session directory.
↳ __init__(self, session_dir: str | None = None)
Initialize the todo manager.
Args:
session_dir: Directory for storing todos. Defaults to current working directory.
↳ _load_todos(self)
Load todos from disk.
Returns:
List of todo dicts, or empty list if file doesn't exist or is corrupt.
↳ _save_todos(self, todos: list[dict])
Save todos to disk.
Args:
todos: List of todo dicts to persist.
↳ read_todos(self)
Read todos and format for display.
Returns:
Formatted string of the current todo list.
↳ write_todos(self, todos: list[dict])
Update the todo list.
Args:
todos: New list of todo items.
Returns:
Confirmation message with count of updated items.
Functions (3)
def _get_manager(session_dir: str | None = None)
Get or create the TodoManager singleton.
Args:
session_dir: Optional directory override.
Returns:
The shared TodoManager instance.
def _set_todo_list(todos: list[dict] | None = None, config: dict | None = None)
Handle SetTodoList tool execution.
Args:
todos: List of todo items to set, or None to just read.
config: Runtime configuration dict.
Returns:
Formatted result string.
def _register()
Register the SetTodoList tool into the central registry.
Imports
__future__jsonospathlibtool_registrytyping
▶ui/__init__.py1 LOC
▶ui/input.py464 LOC
prompt_toolkit-based REPL input with typing-time slash-command autosuggest.
Optional dependency: when prompt_toolkit is not installed, HAS_PROMPT_TOOLKIT
is False and callers should fall through to readline-based input.
Dependency-injected: callers register command/meta providers via setup()
before calling read_line(). This module never imports Dulus core — keeping
the dependency one-way and eliminating any circular-import risk.
Classes (1)
class _OutputRedirector
Redirects stdout to the split layout output buffer.
↳ __init__(self, original)
↳ write(self, text: str)
↳ flush(self)
↳ isatty(self)
Functions (11)
def setup(commands_provider: Callable[[], dict], meta_provider: Callable[[], dict])
Register providers for the live command registry and metadata.
`commands_provider` returns the dispatcher's COMMANDS dict.
`meta_provider` returns the _CMD_META dict (descriptions + subcommands).
def reset_session()
Drop the cached session so the next read_line() rebuilds from scratch.
def _build_session(history_path: Optional[Path])
def read_line(prompt_ansi: str, history_path: Optional[Path] = None)
Read one line of input via prompt_toolkit; caches the session across calls.
The history file passed here MUST NOT be the readline history file — the
two line-editors use incompatible formats. See Dulus REPL for the
dedicated PT_HISTORY_FILE.
def read_line_split(prompt: str = '> ', history_path: Optional[Path] = None)
Read input with split layout - fixed bottom bar, scrollable output above.
Similar to Kimi Code and Claude Code interfaces.
def append_output(text: str)
Append text to the output buffer (for split layout mode).
Use this to display messages without interrupting the input bar.
def clear_split_output()
Clear the split layout output buffer.
def set_notification_callback(callback: Callable[[str], None])
Register a callback to handle background notifications.
The callback will be called with the notification text when it's safe
to display (during the next input cycle or when input is not active).
def queue_notification(text: str)
Queue a notification to be displayed safely.
This should be used by background threads (timers, jobs, etc.) to
display messages without corrupting the prompt_toolkit input bar.
def drain_notifications()
Drain all pending notifications from the queue.
Returns a list of notification texts. Should be called when it's
safe to display output (e.g., before showing a new prompt).
def safe_print_notification(text: str)
Print a notification in a prompt_toolkit-safe way.
If split layout is active, uses append_output.
Otherwise prints directly (which may cause display issues in sticky mode).
Imports
__future__pathlibqueuetyping
▶ui/render.py272 LOC
ui/render.py — All terminal rendering for Dulus.
Provides:
- ANSI color helpers (C, clr, info, ok, warn, err)
- Rich Markdown streaming (stream_text, flush_response)
- Spinner management
- Tool call display (print_tool_start, print_tool_end)
- Diff rendering (render_diff)
Functions (17)
def _truncate_err_global(s: str, max_len: int = 200)
def render_diff(text: str)
Print diff text with ANSI colors: red for removals, green for additions.
def _has_diff(text: str)
Check if text contains a unified diff.
def set_rich_live(enabled: bool)
Called from repl.py to apply the rich_live config setting.
def _make_renderable(text: str)
Return a Rich renderable: Markdown if text contains markup, else plain.
def _start_live()
Start a Rich Live block for in-place Markdown streaming (no-op if not Rich).
def stream_text(chunk: str)
Buffer chunk; update Live in-place when Rich available, else print directly.
Safety: if accumulated text exceeds _LIVE_LINE_LIMIT lines, auto-switch
from Rich Live to plain streaming to prevent terminal re-render duplication
on terminals that can't handle large Live areas (macOS Terminal, etc.).
def stream_thinking(chunk: str, verbose: bool)
def flush_response()
Commit buffered text to screen: stop Live (freezes rendered Markdown in place).
def _run_tool_spinner()
Background spinner on a single line using carriage return.
def _start_tool_spinner()
def _change_spinner_phrase()
Change the spinner phrase without stopping it.
def set_spinner_phrase(phrase: str)
Set a specific spinner phrase (used by SSJ debate mode).
def _stop_tool_spinner()
def _tool_desc(name: str, inputs: dict)
def print_tool_start(name: str, inputs: dict, verbose: bool)
Show tool invocation.
def print_tool_end(name: str, result: str, verbose: bool)
Imports
__future__jsonspinnersysthreading
▶voice/__init__.py64 LOC
Voice package for dulus.
Public API
----------
check_voice_deps() → (available: bool, reason: str | None)
record_once(...) → raw PCM bytes (int16, 16 kHz, mono)
transcribe(...) → text string
voice_input(...) → transcribed text (record + transcribe in one call)
WakeWordListener → background wake-word (hotword) detector
Functions (2)
def check_voice_deps()
Return (available, reason_if_not).
def voice_input(language: str = 'auto', max_seconds: int = 30, on_energy: 'callable | None' = None, device_index: 'int | None' = None)
Record until silence, then transcribe. Returns transcribed text.
Imports
audio_utilskeytermsrecordersttttswake_word
▶voice/audio_utils.py56 LOC
Audio utilities for dulus voice package.
Functions (1)
def beep(frequency: int = 1000, duration: int = 150)
Trigger a system beep using ffplay (if available) or winsound/terminal bell.
Using ffplay is more robust for volume mixers and cross-platform.
Imports
sys
▶voice/keyterms.py179 LOC
Voice keyterms: domain-specific vocabulary hints for STT accuracy.
Passed as Whisper's `initial_prompt` so that coding terminology
(grep, MCP, TypeScript, JSON, …) is recognised correctly instead of being
mistranscribed as phonetically similar common words.
Inspired by Claude Code's voiceKeyterms.ts, but expanded for a multi-provider
setting and adapted to pull context from the Python runtime environment.
Functions (5)
def split_identifier(name: str)
Split camelCase / PascalCase / kebab-case / snake_case into words.
Fragments ≤ 2 chars or > 20 chars are discarded.
Examples:
"dulus" → ["nano", "claude", "code"]
"MyWebhookHandler" → ["My", "Webhook", "Handler"]
def _git_branch()
def _project_root()
Find the git root or fall back to cwd.
def _recent_py_files(root: Path, limit: int = 20)
Return the most-recently modified Python/TS/JS files in the repo.
def get_voice_keyterms(recent_files: list[str] | None = None)
Build a list of keyterms for the STT engine.
Combines:
• Hardcoded global coding vocabulary
• Project root directory name
• Git branch words
• Recent source file stem words
Returns up to MAX_KEYTERMS unique terms.
Imports
__future__pathlibresubprocess
▶voice/recorder.py274 LOC
Audio capture for voice input.
Backend priority (tried in order):
1. sounddevice — cross-platform, pure-Python wrapper around PortAudio.
Best option: works on macOS, Linux, Windows.
pip install sounddevice
2. arecord — Linux ALSA utility. No pip install needed.
3. sox rec — SoX command-line recorder. Supports silence detection.
sudo apt install sox / brew install sox
All backends capture raw PCM: 16 kHz, 16-
Functions (7)
def _has_cmd(cmd: str)
def check_recording_availability()
Return (available, reason_if_not).
def list_input_devices()
Return a list of available input devices with index and name.
def _record_sounddevice(max_seconds: int = 30, on_energy: 'callable | None' = None, device_index: 'int | None' = None, silence_secs: float = SILENCE_DURATION_SECS)
def _record_arecord(max_seconds: int = 30, on_energy: 'callable | None' = None, silence_secs: float = SILENCE_DURATION_SECS)
Record via arecord. Silence detection done in Python on the piped PCM.
def _record_sox(max_seconds: int = 30, on_energy: 'callable | None' = None, silence_secs: float = SILENCE_DURATION_SECS)
Record via SoX `rec` with built-in silence detection.
def record_until_silence(max_seconds: int = 30, on_energy: 'callable | None' = None, device_index: 'int | None' = None, silence_secs: float = SILENCE_DURATION_SECS)
Record from microphone until silence or max_seconds.
Returns raw PCM bytes: int16, 16 kHz, mono.
Tries backends in order: sounddevice → arecord → sox rec.
Raises RuntimeError if no backend is available.
Imports
__future__iopathlibshutilsubprocessthreading
▶voice/stt.py476 LOC
Speech-to-text (STT) backends.
Backend priority (tried in order):
1. NVIDIA Riva — cloud, whisper-large-v3 via gRPC, needs NVIDIA_API_KEY.
pip install nvidia-riva-client
2. faster-whisper — local, offline, fast, best for coding vocab.
pip install faster-whisper
3. openai-whisper — local, offline, original OpenAI Whisper library.
pip install openai-whisper
4. OpenAI Whisper API — cloud, needs OPENAI_API_KEY.
Functions (16)
def prewarm_whisper()
Trigger the local Whisper model load + dummy transcribe.
Designed to be called from a background thread at REPL boot so the wake
word + /voice paths are instant on first real audio. Returns True if
the load completed, False if no local backend is installed.
def _riva_available()
Riva backend is usable iff the client lib is installed AND we have a key.
def _transcribe_nvidia_riva(pcm_bytes: bytes, language: Optional[str], translate: bool = False)
Transcribe via NVIDIA NVCF Riva (whisper-large-v3, gRPC).
Riva expects a real audio container — we wrap raw PCM in WAV.
`language=None` or "auto" → "multi" (Riva auto-detect).
`translate=True` adds custom_configuration "task:translate" so foreign
speech comes back as English.
def _audio_file_to_pcm(audio_bytes: bytes, suffix: str = '.ogg')
Convert an audio file (OGG, MP3, etc.) to raw int16 PCM (16kHz mono) via ffmpeg.
def _pcm_to_wav(pcm_bytes: bytes)
Wrap raw int16 PCM in a minimal WAV container.
def check_stt_availability()
Return (available, reason_if_not).
def get_stt_backend_name()
Return a human-readable name of the backend that will be used.
Must match the priority order in ``transcribe()`` — local Whisper
first, Riva only as a fallback when no local backend is installed.
def _get_faster_whisper_model()
def _has_cuda()
def _transcribe_faster_whisper(pcm_bytes: bytes, keyterms: List[str], language: Optional[str])
def _get_openai_whisper_model()
def _transcribe_openai_whisper(pcm_bytes: bytes, keyterms: List[str], language: Optional[str])
def _transcribe_openai_api(pcm_bytes: bytes, language: Optional[str])
def _keyterms_to_prompt(keyterms: List[str])
Convert a list of keywords into a Whisper initial_prompt string.
Whisper treats the initial_prompt as preceding context; sprinkling the
coding vocabulary terms nudges the model to prefer these spellings.
def transcribe(pcm_bytes: bytes, keyterms: Optional[List[str]] = None, language: str = 'auto')
Transcribe raw PCM audio to text.
Args:
pcm_bytes: Raw int16 PCM, 16 kHz, mono.
keyterms: Coding-domain vocabulary hints (improves accuracy).
language: BCP-47 language code, or 'auto' for detection.
Returns:
Transcribed text, or empty string if audio contains no speech.
def transcribe_audio_file(audio_bytes: bytes, suffix: str = '.ogg', language: str = 'auto')
Transcribe an audio file (OGG, MP3, etc.) to text.
Converts to PCM via ffmpeg, then runs through the STT pipeline.
Falls back to OpenAI Whisper API (which accepts OGG natively) if
ffmpeg is not available.
Imports
__future__ioospathlibrecorderstructtempfiletyping
▶voice/tts.py735 LOC
Text-to-speech (TTS) backends.
Backend priority (tried in order):
1. pyttsx3 — local, offline, robotic system voices (Windows SAPI5).
pip install pyttsx3
2. edge-tts — Microsoft Edge voices, fast, free, cloud.
pip install edge-tts
3. gTTS — cloud, free, needs internet.
pip install gTTS
4. OpenAI TTS — cloud, high quality, needs OPENAI_API_KEY.
5. Azure — cloud, Microsoft Azure Spee
Functions (21)
def _watch_for_cancel()
Background thread: set _stop_event if user presses 'c'.
def _play_audio_file(file_path: str | Path)
Play an audio file, interruptible with 'c' key.
def _play_windows_mci(file_path: str)
Play via MCI, polling _stop_event every 50ms to allow 'c' cancel.
def _get_pyttsx3_engine()
def _azure_tts_available()
def _say_azure(text: str, voice: Optional[str] = None, lang: str = 'es')
def _riva_lang_code(lang: str)
def _riva_voice_for(lang: str)
Resolve voice via env var (per-language first, then global, then default).
Set DULUS_RIVA_TTS_VOICE_ES="Magpie-Multilingual.ES-US.Lupe" etc. to map
voices per language. Run `talk.py --list-voices` once to discover names.
def _pcm_to_wav(pcm: bytes, sample_rate: int = 44100)
Wrap raw int16 mono PCM in a minimal WAV container.
def _riva_tts_available()
def _split_for_riva(text: str, limit: int = _RIVA_TTS_MAX_CHARS)
Split text into <=limit-char chunks at sentence/clause/word boundaries.
def _say_nvidia_riva(text: str, lang: str = 'es')
def _say_openai(text: str, voice: str = 'alloy', speed: float = 1.0)
def _say_gtts(text: str, lang: str = 'en')
def _say_edge_tts(text: str, lang: str = 'es')
def _say_pyttsx3(text: str, rate: Optional[int] = None)
def _clean_for_tts(text: str)
Strip markdown, HTML, emojis, and code blocks before speaking.
def _elevenlabs_available()
True iff the elevenlabs SDK is importable AND an API key is configured.
def _say_elevenlabs(text: str, voice: Optional[str] = None)
Synthesize via ElevenLabs and play the resulting MP3.
Reads API key from env first, config second. Voice resolution order:
explicit arg > env var > config > module default (KevRojo's clone).
Returns True on successful playback.
def say(text: str, voice: Optional[str] = None, speed: float = 1.0, lang: str = 'es', provider: Optional[str] = None)
Speak text using the best available TTS backend. Press 'c' to stop.
Args:
provider: Explicit backend to use. "auto" or None tries in priority order.
Supported: "pyttsx3", "edge", "gtts", "openai", "azure", "riva".
def check_tts_availability()
Return (available, reason_if_not).
Imports
__future__ospathlibrestructsubprocesstempfilethreadingtimetyping
▶voice/wake_word.py429 LOC
Wake-word (hotword) detection for Dulus.
Uses **energy-based VAD** — cheap RMS polling on small audio chunks.
Only runs STT when speech energy crosses a threshold (someone talking
loud / close to the mic, as you do when addressing an assistant).
Default wake phrases:
"dulus", "hey dulus", "okey dulus", "ok dulus"
Flow
----
1. Poll mic energy in tiny chunks (~0.3 s).
2. If energy > threshold → someone is speaking close/loud.
3. Capture a short burst (~2.5 s) and STT it.
4. If the text c
Classes (1)
class WakeWordListener
Background thread that listens for a wake phrase and then captures
the following voice command.
Parameters
----------
wake_phrases:
Override the default list of wake phrases.
rms_threshold:
Energy level that triggers STT (0..1). Increase if
background noise causes false wakes; decrease
↳ __init__(self, wake_phrases: list[str] | None = None, rms_threshold: float = _VAD_RMS_THRESHOLD, record_secs: float = _WAKE_RECORD_SECS, device_index: int | None = None, language: str = 'auto', debug: bool = False)
↳ start(self, on_wake: Callable[[str], None] | None = None, on_command: Callable[[str], None] | None = None)
Begin listening in a background thread.
`on_wake(phrase)` — called when the wake phrase is detected.
`on_command(text)` — called with the follow-up voice command.
↳ stop(self)
Signal the listener to stop and wait for the thread.
↳ is_running(self)
↳ _loop(self, on_wake: Callable[[str], None] | None, on_command: Callable[[str], None] | None)
One continuous InputStream with ring-buffer pre-roll.
No separate record_until_silence() calls — threshold detection,
wake-word capture, and command capture all use the same stream.
Zero audio gaps,
Functions (3)
def _rms_of_chunk(pcm: bytes)
Return RMS (0..1) of int16 PCM chunk.
def _contains_wake(text: str, phrases: list[str] | None = None)
Return the matched wake phrase (lower-case) or None.
def listen_once(wake_phrases: list[str] | None = None, rms_threshold: float = _VAD_RMS_THRESHOLD, record_secs: float = _WAKE_RECORD_SECS, device_index: int | None = None, language: str = 'auto', timeout: float | None = None)
Block until a wake phrase is detected, then return the matched phrase.
Returns None on timeout (if given) or on interrupt.
Imports
__future__audio_utilscollectionsosqueuerecordersttsysthreadingtimetyping
▶webbridge/__init__.py5 LOC
Dulus WebBridge — Browser automation for AI agents via Playwright.
Imports
core
▶webbridge/core.py694 LOC
Core WebBridge implementation using Playwright.
Classes (1)
class DulusWebBridge
Singleton browser automation controller using Playwright.
Uses a dedicated background worker thread so the browser stays alive
across multiple tool calls. Playwright objects are bound to the event
loop that created them; by always running Playwright code in the same
thread we avoid "browser has be
↳ __new__(cls)
↳ _ensure_playwright(self)
Raise a clear error if Playwright is not installed.
↳ _active_page(self)
Return the currently active page/tab.
↳ _is_browser_alive(self)
Check if the browser process is still responsive.
↳ _get_profile_dir(self)
Return the persistent profile directory for cookies/state.
↳ _get_lock_file(self)
Return the lock file path for cross-process browser detection.
↳ _read_lock_info(self)
Read lock file to find existing browser's CDP endpoint.
↳ _write_lock_info(self, cdp_endpoint: str | None = None)
Write lock file with current browser info.
↳ _clear_lock(self)
Remove lock file on clean shutdown.
↳ _ensure_worker(self)
Start the background worker thread if it isn't running.
↳ _worker_loop_target(self)
Target for the background thread — creates and runs an event loop forever.
↳ _sync(self, coro)
Run an async coroutine in the dedicated worker thread.
Playwright objects are bound to the event loop that created them.
By always submitting coroutines to the same background thread we
keep the brow
↳ _build_selector(self, tag, soup, tag_name: str)
Build a concise unique-ish CSS selector for *tag*.
↳ status(self)
Return current browser status (sync, safe to call anytime).
↳ navigate_sync(self, url: str, headless: bool = False, tab_id: Optional[str] = None)
↳ click_sync(self, selector: str, force: bool = False, tab_id: Optional[str] = None)
↳ evaluate_sync(self, script: str, tab_id: Optional[str] = None)
↳ type_sync(self, selector: str, text: str, tab_id: Optional[str] = None)
↳ screenshot_sync(self, path: Optional[str] = None, tab_id: Optional[str] = None)
↳ get_text_sync(self, tab_id: Optional[str] = None)
↳ get_dom_sync(self, tab_id: Optional[str] = None)
↳ scroll_sync(self, direction: str = 'down', tab_id: Optional[str] = None)
↳ close_sync(self)
↳ new_tab_sync(self, url: str = 'about:blank')
↳ switch_tab_sync(self, tab_id: str)
↳ close_tab_sync(self, tab_id: str)
↳ list_tabs_sync(self)
Functions (1)
def _check_playwright()
Check if Playwright is installed. Cached.
Imports
__future__asynciobase64concurrent.futuresospathlibthreadingtyping
▶webbridge/tools.py453 LOC
WebBridge tools registered in Dulus tool registry.
Provides 7 AI-callable tools for browser automation:
WebBridgeNavigate → open a URL
WebBridgeClick → click an element
WebBridgeType → type text into an input
WebBridgeScreenshot→ capture a screenshot
WebBridgeExtract → extract page text or DOM structure
WebBridgeScroll → scroll up/down
WebBridgeClose → close the browser
Functions (13)
def _webbridge_navigate(params: dict, config: dict)
def _webbridge_click(params: dict, config: dict)
def _webbridge_evaluate(params: dict, config: dict)
def _webbridge_type(params: dict, config: dict)
def _webbridge_screenshot(params: dict, config: dict)
def _webbridge_extract(params: dict, config: dict)
def _webbridge_scroll(params: dict, config: dict)
def _webbridge_close(params: dict, config: dict)
def _webbridge_new_tab(params: dict, config: dict)
def _webbridge_switch_tab(params: dict, config: dict)
def _webbridge_close_tab(params: dict, config: dict)
def _webbridge_list_tabs(params: dict, config: dict)
def register_webbridge_tools()
Register all WebBridge tools into the Dulus tool registry.
Imports
__future__corejsonpathlibtool_registry
▶webchat.py432 LOC
Dulus WebChat — standalone or in-process mirror of the terminal agent.
When launched via /webchat from backend.py, the in-process server in
webchat_server.py is used instead. This file remains usable as a
standalone fallback.
Functions (3)
def _run_agent_standalone(user_message: str)
Run agent loop with local state/config, yielding all events.
def create_app()
def main()
Imports
__future__agentargparsecommonconfigcontextdulus_mcp.toolsflaskjsonmemory.toolsmulti_agent.toolsqueueskill.toolstask.toolsthreadingtimetoolstypinguuidwebbrowser
▶webchat_server.py4311 LOC
Dulus WebChat — in-process mirror of the terminal agent + Roundtable mode.
Classes (1)
class RoundtableAgent
↳ __init__(self, agent_id: str, model: str)
Functions (18)
def _resolve_dashboard_dir()
Find docs/dashboard whether running from source or installed package.
def _add_sse_client(q: queue.Queue)
def _remove_sse_client(q: queue.Queue)
def broadcast_event(event_type: str, payload: dict)
Broadcast JSON event to all connected SSE clients.
def _sse_heartbeat()
Send periodic ping to keep connections alive.
def _ensure_plugin_tools()
def _strip_ansi(text: str)
def _inject_mempalace(user_input: str, config: dict)
Inject relevant memories from MemPalace into the user message.
Mirrors the logic in dulus.py REPL for consistent behavior.
def _run_slash_command(cmd_line: str)
Run a slash command through the REPL's registered handler,
capturing stdout. Mirrors the Telegram bridge behavior
(dulus.py:_handle_slash_from_telegram).
Returns (output_text, assistant_reply_or_None).
`assistant_reply` is set when the slash triggered a model query
(cmd_type == "query") so the call
def _run_agent_mirror(user_message: str)
Run the agent loop with shared state/config, yielding all events.
def _event_to_dict(event)
def _sanitize_for_api(text: str)
Aggressive sanitize: remove control chars (except
), surrogates, and normalize.
def _build_roundtable_prompt(agent: RoundtableAgent, user_msg: str, history: list[dict])
Build a lean prompt with ONLY the last text message per member.
history items: {"agent": str, "text": str}
def _run_agent_for_roundtable(agent: RoundtableAgent, user_msg: str, history: list[dict], q: queue.Queue)
def create_app()
def start(state: AgentState, config: dict, port: int = 5000, open_browser: bool = False)
def stop()
def is_running()
Imports
__future__agentbackend.agents_bridgebackend.contextbackend.marketplacebackend.personasbackend.pluginscommoncontextdulus_mcp.toolsflaskgui.session_utilsjsonmemory.toolsmulti_agent.toolspathlibqueueskill.toolssystasktask.toolsthreadingtimetoolstypinguuidwebbrowser
▶welcome.py290 LOC
First-run welcome wizard for Dulus.
Kept intentionally small per scope:
1. Pick provider + model
2. Prompt for API key (when needed)
3. Seed soul.md from baked default
4. Run ``mempalace init`` if the plugin is installed
No Telegram, no TTS, no name prompt yet — those stay in their own
``dulus setup`` subcommands later.
Functions (7)
def is_first_run(config_path: Optional[Path] = None)
def _prompt(question: str, default: str = '')
def _prompt_choice(question: str, choices, default_idx: int = 0)
def _prompt_secret(question: str)
def _mempalace_available()
def _run_mempalace_init()
def run_welcome_wizard(config: dict)
Imports
__future__ospathlibshutilsubprocesssystyping