Phase 1b wires Gemma's native tool calling to real capabilities. GeneratorAgent publishes tool requests to the bus and blocks on results. WebSearchTool and WebFetchTool are independent bus participants. Phase 1b is complete when Stories S3 and S4 pass against the live stack.
src/local/tools/web_search_tool.py newsrc/local/tools/web_fetch_tool.py newconfig/web_search.yaml newconfig/web_fetch.yaml newtests/test_web_search_tool.py newtests/test_web_fetch_tool.py newtests/stories/s3_web_search.yaml newtests/stories/s4_web_fetch.yaml newconfig/generator.yaml — add tool schemas modsrc/local/agents/generator_agent.py — wire _execute_tool() modsrc/local/agents/generator_transitions.py — add AWAIT_TOOL action + transition modrun_local.py — start tool agents modEach step must be independently testable before moving on.
Implement web_search_tool.py with mock + SearXNG/Brave/Tavily providers. Write config/web_search.yaml. Unit test: publish a tool.request.web_search envelope directly, assert tool.result.web_search arrives with correct correlation_id and non-empty result.
Implement web_fetch_tool.py with httpx + BeautifulSoup extraction. Write config/web_fetch.yaml. Unit test: publish tool.request.web_fetch with a live URL, assert extracted text arrives and is truncated to max_chars. Test error path: invalid URL → result contains error string.
Wire _execute_tool() to the bus (short-lived subscriber pattern). Add tool schemas to generator.yaml. Update state machine with AWAIT_TOOL transition. Start full stack. Stories S3 (web_search fires, answer grounded in search results) and S4 (web_fetch fires, answer grounded in page content) pass. Phase 1b complete.
tool.request.web_searchenvelope.payload.args.query and envelope.correlation_idtool.result.web_search with the same correlation_iderror field setprovider: searxng # searxng | brave | tavily | mock searxng_url: "http://localhost:8080" max_results: 5 timeout: 10 # seconds per search request # brave_api_key read from env BRAVE_API_KEY # tavily_api_key read from env TAVILY_API_KEY
{
"result": "Today's date: 2026-05-31\n\n[1] Title: ...\n Snippet: ...\n URL: https://...\n\n[2] ...",
"error": null,
"query_id": "...",
"session_id": "..."
}
Today's date: 2026-05-31
[1] Title: Bitcoin price today — CoinMarketCap
Snippet: Bitcoin is currently trading at $103,450 USD...
URL: https://coinmarketcap.com/currencies/bitcoin/
[2] Title: BTC/USD — Bloomberg
Snippet: Bitcoin (BTC) hit a new high of $105,000 this week...
URL: https://bloomberg.com/...
Date prefix grounds time-sensitive answers. URLs included so Gemma can pass one to web_fetch if it needs full page content.
class SearchProvider(Protocol):
def search(self, query: str, max_results: int) -> list[dict]:
# returns [{"title": ..., "snippet": ..., "url": ...}, ...]
class SearXNGProvider: # default for dev — self-hosted, free
class BraveProvider: # BRAVE_API_KEY env var
class TavilyProvider: # TAVILY_API_KEY — returns extracted content directly
class MockProvider: # canned results — use in unit tests
Tavily returns extracted page content in snippets — Gemma may not need to call web_fetch separately when using Tavily. SearXNG is the default for local dev (no API key required).
result: "No results found for: {query}", error: null (not an error — Gemma handles gracefully)result: "", error: "search timeout after 10s"result: "", error: "search failed: 503"args.query → result: "", error: "missing query argument"# tests/test_web_search_tool.py
def test_mock_search_publishes_result():
# Start agent with mock provider in a thread
# Publish tool.request.web_search directly to bus
# Subscribe to tool.result.web_search
# Assert result arrives within 2s with matching correlation_id
# Assert "Today's date" prefix present in result
def test_no_results_is_not_an_error():
# Mock provider returns []
# Assert error field is null, result contains "No results found"
def test_provider_timeout_sets_error():
# Mock provider raises httpx.TimeoutException
# Assert error field is set
tool.request.web_fetchenvelope.payload.args.url and envelope.correlation_idhttpx, extracts body text with BeautifulSoupmax_chars (default 3000)tool.result.web_fetch with the same correlation_iderror field setmax_chars: 3000 timeout: 15 # seconds per fetch user_agent: "LoCAL2/1.0 (research agent)"
{
"result": "URL: https://...\nExtracted text:\n[first 3000 chars of body]",
"error": null,
"query_id": "...",
"session_id": "..."
}
resp = httpx.get(url, timeout=cfg["timeout"], headers={"User-Agent": ...})
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# Remove script, style, nav, footer, header tags
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
text = "\n".join(line for line in text.splitlines() if line.strip())
result = f"URL: {url}\nExtracted text:\n{text[:cfg['max_chars']]}"
error: "fetch failed: 404"error: "fetch timeout after 15s: {url}"error: "fetch error: {exc}"args.url → error: "missing url argument"Gemma reads the error string and can decide to try a different URL or acknowledge the failure. No retry logic in WebFetchTool — that is Gemma's decision.
# tests/test_web_fetch_tool.py
def test_fetch_extracts_body_text():
# httpx.get patched to return known HTML
# Assert result contains expected extracted text
# Assert truncated to max_chars
def test_fetch_error_sets_error_field():
# httpx.get raises HTTPStatusError(404)
# Assert error field contains "fetch failed: 404"
def test_missing_url_sets_error():
# Publish request with no url in args
# Assert error: "missing url argument"
query.received. To receive tool results, _execute_tool() opens a dedicated ZmqSubscriber for that specific tool.result.* subject, polls until correlation_id matches (or timeout), then closes it. This avoids mixing incoming queries with tool results on the same socket, and requires no shared state or threading._TOOL_REQUEST = {
"web_search": TOOL_REQUEST_WEB_SEARCH,
"web_fetch": TOOL_REQUEST_WEB_FETCH,
}
_TOOL_RESULT = {
"web_search": TOOL_RESULT_WEB_SEARCH,
"web_fetch": TOOL_RESULT_WEB_FETCH,
}
def _execute_tool(self, name: str, args: dict, correlation_id: str) -> str:
req_subject = _TOOL_REQUEST.get(name)
res_subject = _TOOL_RESULT.get(name)
if not req_subject:
return f"[unknown tool: {name!r}]"
self._sm.transition(GeneratorAction.AWAIT_TOOL) # DISPATCHING_TOOL → WAITING_FOR_TOOL
sub = ZmqSubscriber(PROXY_BACKEND_ADDR, subscriptions=[res_subject], bind=False)
try:
self._pub.publish(self._make_envelope(
req_subject, "tool_request",
{"tool": name, "args": args,
"session_id": None, "query_id": correlation_id},
correlation_id, None,
))
deadline = time.time() + self._tool_timeout
while time.time() < deadline:
msg = sub.receive_with_timeout(200)
if msg and msg.correlation_id == correlation_id:
p = msg.payload
if p.get("error"):
self._sm.transition(GeneratorAction.TOOL_RESULT)
return f"[tool error: {p['error']}]"
self._sm.transition(GeneratorAction.TOOL_RESULT)
return p.get("result") or ""
finally:
sub.close()
self._sm.transition(GeneratorAction.TOOL_TIMEOUT)
logger.warning("tool %r timed out after %ss", name, self._tool_timeout)
return f"[tool timeout after {self._tool_timeout}s]"
# generator_actions.py — add: AWAIT_TOOL = "await_tool" # DISPATCHING_TOOL → WAITING_FOR_TOOL # generator_transitions.py — add: (S.DISPATCHING_TOOL, A.AWAIT_TOOL): S.WAITING_FOR_TOOL, # Existing transitions already cover result/timeout back to GENERATING: # (S.WAITING_FOR_TOOL, A.TOOL_RESULT): S.GENERATING ← already present # (S.WAITING_FOR_TOOL, A.TOOL_TIMEOUT): S.GENERATING ← already present # (S.WAITING_FOR_TOOL, A.FAIL): S.ERROR ← already present (via FAIL loop)
For multiple tool calls in one response, the outer loop re-enters DISPATCHING_TOOL between each call. The _generate() outer loop transitions back to DISPATCHING_TOOL before calling _execute_tool() for the next tool.
self._sm.transition(GeneratorAction.DISPATCH_TOOL) # GENERATING → DISPATCHING_TOOL
for tc in tool_calls:
fn = tc.get("function") or {}
name = fn.get("name", "")
args = fn.get("arguments") or {}
result = self._execute_tool(name, args, correlation_id)
# _execute_tool handles DISPATCHING_TOOL → WAITING_FOR_TOOL → GENERATING internally
# After return, state is GENERATING — re-enter DISPATCHING_TOOL for next tc
if len(tool_calls) > 1 and tc is not tool_calls[-1]:
self._sm.transition(GeneratorAction.DISPATCH_TOOL) # loop: GENERATING → DISPATCHING_TOOL
tool_call_log.append({"tool": name, "args": args, "result": str(result)})
messages.append({"role": "tool", "content": str(result), "name": name})
# State is GENERATING after last tool — ready for next ollama.chat() iteration
tools:
- name: web_search
description: >
Search the web for current information, news, prices, or recent events.
Use when the answer requires up-to-date facts not in your training data.
parameters:
type: object
properties:
query:
type: string
description: Concise search query (English preferred)
required: [query]
- name: web_fetch
description: >
Fetch and extract the full text content of a specific URL.
Use after web_search when snippets are insufficient and you need
the complete page content. Requires a known URL.
parameters:
type: object
properties:
url:
type: string
description: Full URL to fetch (must start with http:// or https://)
required: [url]
def _start_web_search(model: str) -> None:
from local.tools.web_search_tool import WebSearchTool
WebSearchTool().run()
def _start_web_fetch() -> None:
from local.tools.web_fetch_tool import WebFetchTool
WebFetchTool().run()
# In main(), after generator thread:
ws_thread = threading.Thread(target=_start_web_search, daemon=True, name="web_search")
ws_thread.start()
wf_thread = threading.Thread(target=_start_web_fetch, daemon=True, name="web_fetch")
wf_thread.start()
time.sleep(0.2) # let tool agents subscribe before generator accepts queries
| From | Action | To | When |
|---|---|---|---|
| IDLE | RECEIVE | RECEIVING | query.received arrives |
| RECEIVING | START_GENERATION | GENERATING | messages built, ollama.chat() starting |
| GENERATING | DISPATCH_TOOL | DISPATCHING_TOOL | tool_calls detected in response |
| DISPATCHING_TOOL | AWAIT_TOOL | WAITING_FOR_TOOL | tool.request.* published, now polling for result |
| WAITING_FOR_TOOL | TOOL_RESULT | GENERATING | tool.result.* received with matching correlation_id |
| WAITING_FOR_TOOL | TOOL_TIMEOUT | GENERATING | no tool.result.* within tool_timeout seconds |
| GENERATING | PUBLISH | PUBLISHING | final text response, no tool calls |
| PUBLISHING | RESET | IDLE | response.generation published |
| ERROR | RESET | IDLE | error path cleanup |
| any non-IDLE | FAIL | ERROR | unhandled exception |
For multiple tool calls in one response: after WAITING_FOR_TOOL → GENERATING, if there are remaining tool calls in the list, transition GENERATING → DISPATCHING_TOOL again for the next one. The loop is in _generate().
story_id: S3
title: "Web search — current data query"
turns:
- query: "What is the current price of Bitcoin in USD?"
expected_content:
- "$" # any price figure
must_not_contain:
- "I don't have access"
- "I cannot browse"
expected_bus_events:
present:
- "tool.request.web_search"
- "tool.result.web_search"
- "response.generation"
absent:
- "tool.request.web_fetch"
response_generation_checks:
answer_not_empty: true
tool_calls_not_empty: true # at least one tool call logged
story_id: S4
title: "Web fetch — URL content extraction"
turns:
- query: "Fetch and summarize the content at https://example.com"
expected_content:
- "Example Domain" # known h1 on example.com
must_not_contain:
- "I cannot access"
- "I don't have the ability"
expected_bus_events:
present:
- "tool.request.web_fetch"
- "tool.result.web_fetch"
- "response.generation"
response_generation_checks:
answer_not_empty: true
tool_calls_not_empty: true
tests/run_story.py now needs to check tool_calls_not_empty and expected_bus_events.present/absent. Bus event checking requires subscribing to the bus during the run — add a --check-bus flag or use the tool_calls in the API response as a proxy for bus events.--model flag or by editing config/generator.yaml._execute_tool() must be created BEFORE publishing the request, otherwise the result may arrive on the bus before the subscriber is bound and the message is missed. The implementation above does this correctly — sub = ZmqSubscriber(...) before self._pub.publish(...).stream=True, tool calls are assembled from stream chunks. In the current implementation iter_tool_calls takes the last non-None chunk.message.tool_calls value. Verify that the Ollama Python library delivers complete tool call objects in the final stream chunk — if tool call args are partial in intermediate chunks, intermediate values must not be used. If streaming tool calls prove unreliable, fall back to stream=False only when tool schemas are active.http://localhost:8080. If it's not running, web_search will timeout and Gemma will receive a tool error. For initial testing, set provider: brave or provider: tavily with the relevant API key, or use a live SearXNG instance.tool_timeout in generator config should be ≥ 15s to allow for slow search providers. SearXNG aggregates multiple search engines and can be slow. Tavily is typically faster (<3s). Set per-provider expectations in tests using mocks.