← Back to main plan

LoCAL2 — Phase 1b

Tool Bus: WebSearchTool · WebFetchTool · GeneratorAgent tool loop

Scope & Deliverables

Phase 1b wires Gemma's native tool calling to real capabilities. GeneratorAgent publishes tool requests to the bus and blocks on results. WebSearchTool and WebFetchTool are independent bus participants. Phase 1b is complete when Stories S3 and S4 pass against the live stack.

New files

  • src/local/tools/web_search_tool.py new
  • src/local/tools/web_fetch_tool.py new
  • config/web_search.yaml new
  • config/web_fetch.yaml new
  • tests/test_web_search_tool.py new
  • tests/test_web_fetch_tool.py new
  • tests/stories/s3_web_search.yaml new
  • tests/stories/s4_web_fetch.yaml new

Modified files

  • config/generator.yaml — add tool schemas mod
  • src/local/agents/generator_agent.py — wire _execute_tool() mod
  • src/local/agents/generator_transitions.py — add AWAIT_TOOL action + transition mod
  • run_local.py — start tool agents mod

Build Order — Steps 6–8

Each step must be independently testable before moving on.

6

WebSearchTool

Implement web_search_tool.py with mock + SearXNG/Brave/Tavily providers. Write config/web_search.yaml. Unit test: publish a tool.request.web_search envelope directly, assert tool.result.web_search arrives with correct correlation_id and non-empty result.

7

WebFetchTool

Implement web_fetch_tool.py with httpx + BeautifulSoup extraction. Write config/web_fetch.yaml. Unit test: publish tool.request.web_fetch with a live URL, assert extracted text arrives and is truncated to max_chars. Test error path: invalid URL → result contains error string.

8

GeneratorAgent tool loop + Stories S3 + S4 ✓ Phase 1b

Wire _execute_tool() to the bus (short-lived subscriber pattern). Add tool schemas to generator.yaml. Update state machine with AWAIT_TOOL transition. Start full stack. Stories S3 (web_search fires, answer grounded in search results) and S4 (web_fetch fires, answer grounded in page content) pass. Phase 1b complete.

Step 6 — WebSearchTool

Behaviour

config/web_search.yaml

provider: searxng        # searxng | brave | tavily | mock
searxng_url: "http://localhost:8080"
max_results: 5
timeout: 10              # seconds per search request
# brave_api_key read from env BRAVE_API_KEY
# tavily_api_key read from env TAVILY_API_KEY

Result payload

{
  "result": "Today's date: 2026-05-31\n\n[1] Title: ...\n    Snippet: ...\n    URL: https://...\n\n[2] ...",
  "error": null,
  "query_id": "...",
  "session_id": "..."
}

Output format — exact string Gemma reads

Today's date: 2026-05-31

[1] Title: Bitcoin price today — CoinMarketCap
    Snippet: Bitcoin is currently trading at $103,450 USD...
    URL: https://coinmarketcap.com/currencies/bitcoin/

[2] Title: BTC/USD — Bloomberg
    Snippet: Bitcoin (BTC) hit a new high of $105,000 this week...
    URL: https://bloomberg.com/...

Date prefix grounds time-sensitive answers. URLs included so Gemma can pass one to web_fetch if it needs full page content.

Provider abstraction

class SearchProvider(Protocol):
    def search(self, query: str, max_results: int) -> list[dict]:
        # returns [{"title": ..., "snippet": ..., "url": ...}, ...]

class SearXNGProvider:   # default for dev — self-hosted, free
class BraveProvider:     # BRAVE_API_KEY env var
class TavilyProvider:    # TAVILY_API_KEY — returns extracted content directly
class MockProvider:      # canned results — use in unit tests

Tavily returns extracted page content in snippets — Gemma may not need to call web_fetch separately when using Tavily. SearXNG is the default for local dev (no API key required).

Error cases

Unit test sketch

# tests/test_web_search_tool.py
def test_mock_search_publishes_result():
    # Start agent with mock provider in a thread
    # Publish tool.request.web_search directly to bus
    # Subscribe to tool.result.web_search
    # Assert result arrives within 2s with matching correlation_id
    # Assert "Today's date" prefix present in result

def test_no_results_is_not_an_error():
    # Mock provider returns []
    # Assert error field is null, result contains "No results found"

def test_provider_timeout_sets_error():
    # Mock provider raises httpx.TimeoutException
    # Assert error field is set

Step 7 — WebFetchTool

Behaviour

config/web_fetch.yaml

max_chars: 3000
timeout: 15              # seconds per fetch
user_agent: "LoCAL2/1.0 (research agent)"

Result payload

{
  "result": "URL: https://...\nExtracted text:\n[first 3000 chars of body]",
  "error": null,
  "query_id": "...",
  "session_id": "..."
}

Extraction logic

resp = httpx.get(url, timeout=cfg["timeout"], headers={"User-Agent": ...})
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# Remove script, style, nav, footer, header tags
for tag in soup(["script", "style", "nav", "footer", "header"]):
    tag.decompose()
text = soup.get_text(separator="\n", strip=True)
text = "\n".join(line for line in text.splitlines() if line.strip())
result = f"URL: {url}\nExtracted text:\n{text[:cfg['max_chars']]}"

Error cases

Gemma reads the error string and can decide to try a different URL or acknowledge the failure. No retry logic in WebFetchTool — that is Gemma's decision.

Unit test sketch

# tests/test_web_fetch_tool.py
def test_fetch_extracts_body_text():
    # httpx.get patched to return known HTML
    # Assert result contains expected extracted text
    # Assert truncated to max_chars

def test_fetch_error_sets_error_field():
    # httpx.get raises HTTPStatusError(404)
    # Assert error field contains "fetch failed: 404"

def test_missing_url_sets_error():
    # Publish request with no url in args
    # Assert error: "missing url argument"

Step 8 — GeneratorAgent Tool Loop

Key design decision: short-lived subscriber per tool call
GeneratorAgent's main subscriber listens only to query.received. To receive tool results, _execute_tool() opens a dedicated ZmqSubscriber for that specific tool.result.* subject, polls until correlation_id matches (or timeout), then closes it. This avoids mixing incoming queries with tool results on the same socket, and requires no shared state or threading.

_execute_tool() implementation

_TOOL_REQUEST = {
    "web_search": TOOL_REQUEST_WEB_SEARCH,
    "web_fetch":  TOOL_REQUEST_WEB_FETCH,
}
_TOOL_RESULT = {
    "web_search": TOOL_RESULT_WEB_SEARCH,
    "web_fetch":  TOOL_RESULT_WEB_FETCH,
}

def _execute_tool(self, name: str, args: dict, correlation_id: str) -> str:
    req_subject = _TOOL_REQUEST.get(name)
    res_subject = _TOOL_RESULT.get(name)
    if not req_subject:
        return f"[unknown tool: {name!r}]"

    self._sm.transition(GeneratorAction.AWAIT_TOOL)   # DISPATCHING_TOOL → WAITING_FOR_TOOL

    sub = ZmqSubscriber(PROXY_BACKEND_ADDR, subscriptions=[res_subject], bind=False)
    try:
        self._pub.publish(self._make_envelope(
            req_subject, "tool_request",
            {"tool": name, "args": args,
             "session_id": None, "query_id": correlation_id},
            correlation_id, None,
        ))
        deadline = time.time() + self._tool_timeout
        while time.time() < deadline:
            msg = sub.receive_with_timeout(200)
            if msg and msg.correlation_id == correlation_id:
                p = msg.payload
                if p.get("error"):
                    self._sm.transition(GeneratorAction.TOOL_RESULT)
                    return f"[tool error: {p['error']}]"
                self._sm.transition(GeneratorAction.TOOL_RESULT)
                return p.get("result") or ""
    finally:
        sub.close()

    self._sm.transition(GeneratorAction.TOOL_TIMEOUT)
    logger.warning("tool %r timed out after %ss", name, self._tool_timeout)
    return f"[tool timeout after {self._tool_timeout}s]"

State machine update — add AWAIT_TOOL action + transition

# generator_actions.py — add:
AWAIT_TOOL = "await_tool"   # DISPATCHING_TOOL → WAITING_FOR_TOOL

# generator_transitions.py — add:
(S.DISPATCHING_TOOL, A.AWAIT_TOOL):   S.WAITING_FOR_TOOL,

# Existing transitions already cover result/timeout back to GENERATING:
# (S.WAITING_FOR_TOOL, A.TOOL_RESULT):  S.GENERATING  ← already present
# (S.WAITING_FOR_TOOL, A.TOOL_TIMEOUT): S.GENERATING  ← already present
# (S.WAITING_FOR_TOOL, A.FAIL):         S.ERROR        ← already present (via FAIL loop)

For multiple tool calls in one response, the outer loop re-enters DISPATCHING_TOOL between each call. The _generate() outer loop transitions back to DISPATCHING_TOOL before calling _execute_tool() for the next tool.

_generate() outer loop — updated transitions

self._sm.transition(GeneratorAction.DISPATCH_TOOL)   # GENERATING → DISPATCHING_TOOL
for tc in tool_calls:
    fn = tc.get("function") or {}
    name = fn.get("name", "")
    args = fn.get("arguments") or {}
    result = self._execute_tool(name, args, correlation_id)
    # _execute_tool handles DISPATCHING_TOOL → WAITING_FOR_TOOL → GENERATING internally
    # After return, state is GENERATING — re-enter DISPATCHING_TOOL for next tc
    if len(tool_calls) > 1 and tc is not tool_calls[-1]:
        self._sm.transition(GeneratorAction.DISPATCH_TOOL)  # loop: GENERATING → DISPATCHING_TOOL
    tool_call_log.append({"tool": name, "args": args, "result": str(result)})
    messages.append({"role": "tool", "content": str(result), "name": name})
# State is GENERATING after last tool — ready for next ollama.chat() iteration

Tool schemas — add to config/generator.yaml

tools:
  - name: web_search
    description: >
      Search the web for current information, news, prices, or recent events.
      Use when the answer requires up-to-date facts not in your training data.
    parameters:
      type: object
      properties:
        query:
          type: string
          description: Concise search query (English preferred)
      required: [query]

  - name: web_fetch
    description: >
      Fetch and extract the full text content of a specific URL.
      Use after web_search when snippets are insufficient and you need
      the complete page content. Requires a known URL.
    parameters:
      type: object
      properties:
        url:
          type: string
          description: Full URL to fetch (must start with http:// or https://)
      required: [url]

run_local.py — start tool agents

def _start_web_search(model: str) -> None:
    from local.tools.web_search_tool import WebSearchTool
    WebSearchTool().run()

def _start_web_fetch() -> None:
    from local.tools.web_fetch_tool import WebFetchTool
    WebFetchTool().run()

# In main(), after generator thread:
ws_thread = threading.Thread(target=_start_web_search, daemon=True, name="web_search")
ws_thread.start()
wf_thread = threading.Thread(target=_start_web_fetch, daemon=True, name="web_fetch")
wf_thread.start()
time.sleep(0.2)  # let tool agents subscribe before generator accepts queries

State Machine — Complete Phase 1b Transitions

FromActionToWhen
IDLERECEIVERECEIVINGquery.received arrives
RECEIVINGSTART_GENERATIONGENERATINGmessages built, ollama.chat() starting
GENERATINGDISPATCH_TOOLDISPATCHING_TOOLtool_calls detected in response
DISPATCHING_TOOLAWAIT_TOOLWAITING_FOR_TOOLtool.request.* published, now polling for result
WAITING_FOR_TOOLTOOL_RESULTGENERATINGtool.result.* received with matching correlation_id
WAITING_FOR_TOOLTOOL_TIMEOUTGENERATINGno tool.result.* within tool_timeout seconds
GENERATINGPUBLISHPUBLISHINGfinal text response, no tool calls
PUBLISHINGRESETIDLEresponse.generation published
ERRORRESETIDLEerror path cleanup
any non-IDLEFAILERRORunhandled exception

For multiple tool calls in one response: after WAITING_FOR_TOOL → GENERATING, if there are remaining tool calls in the list, transition GENERATING → DISPATCHING_TOOL again for the next one. The loop is in _generate().

Stories S3 & S4

S3 — Web search fires

story_id: S3
title: "Web search — current data query"
turns:
  - query: "What is the current price of Bitcoin in USD?"
    expected_content:
      - "$"          # any price figure
    must_not_contain:
      - "I don't have access"
      - "I cannot browse"

expected_bus_events:
  present:
    - "tool.request.web_search"
    - "tool.result.web_search"
    - "response.generation"
  absent:
    - "tool.request.web_fetch"

response_generation_checks:
  answer_not_empty: true
  tool_calls_not_empty: true  # at least one tool call logged

S4 — Web fetch fires

story_id: S4
title: "Web fetch — URL content extraction"
turns:
  - query: "Fetch and summarize the content at https://example.com"
    expected_content:
      - "Example Domain"   # known h1 on example.com
    must_not_contain:
      - "I cannot access"
      - "I don't have the ability"

expected_bus_events:
  present:
    - "tool.request.web_fetch"
    - "tool.result.web_fetch"
    - "response.generation"

response_generation_checks:
  answer_not_empty: true
  tool_calls_not_empty: true

Notes on story reliability

Known Constraints & Gotchas

gemma4:e4b tool calling reliability
Default model is gemma4:e4b (Google's recommendation). Tool calling reliability scales with model size — if S3 consistently fails to trigger web_search, upgrade to gemma4:26b via the --model flag or by editing config/generator.yaml.
Tool result race condition
The short-lived subscriber in _execute_tool() must be created BEFORE publishing the request, otherwise the result may arrive on the bus before the subscriber is bound and the message is missed. The implementation above does this correctly — sub = ZmqSubscriber(...) before self._pub.publish(...).
Streaming + tool calls
With stream=True, tool calls are assembled from stream chunks. In the current implementation iter_tool_calls takes the last non-None chunk.message.tool_calls value. Verify that the Ollama Python library delivers complete tool call objects in the final stream chunk — if tool call args are partial in intermediate chunks, intermediate values must not be used. If streaming tool calls prove unreliable, fall back to stream=False only when tool schemas are active.
SearXNG must be running locally
The default provider is SearXNG at http://localhost:8080. If it's not running, web_search will timeout and Gemma will receive a tool error. For initial testing, set provider: brave or provider: tavily with the relevant API key, or use a live SearXNG instance.
Tool timeout must be long enough for slow search providers
Default tool_timeout in generator config should be ≥ 15s to allow for slow search providers. SearXNG aggregates multiple search engines and can be slow. Tavily is typically faster (<3s). Set per-provider expectations in tests using mocks.
LoCAL2 Phase 1b Detailed Plan · 2026-05-31