Source code for jeevesagent.architecture.blackboard

"""Blackboard: shared state board with a coordinator picking the
next contributor each round.

Classical AI: Erman et al. 1980 (Hearsay-II). Han & Zhang 2025
revived for LLM agents (arXiv:2507.01701). Salemi et al. 2026
(arXiv:2510.01285) reports +13-57% relative improvement on
data-discovery tasks.

Pattern
-------

1. Initialize the blackboard with the user's problem statement.
2. Each round, the **coordinator** reads the blackboard and picks
   one agent to contribute next (or terminates).
3. The chosen agent runs with the blackboard view in its prompt and
   produces a contribution.
4. The contribution is appended to the blackboard.
5. Loop until the coordinator terminates or ``max_rounds`` is hit.
6. The **decider** synthesizes the final answer from the
   blackboard. ``decider=None`` falls back to the last
   ``answer``-kind contribution, or the most recent contribution
   if no answer-kind exists.

Coordinator API
---------------

The coordinator is a :class:`Agent` that produces JSON in this shape::

    {"terminate": bool, "next_agent": str|null, "instruction": str|null}

If ``terminate`` is true, the loop ends. Otherwise ``next_agent``
must name one of the configured agents. If the coordinator emits an
unknown name or malformed JSON, the round is skipped and a warning
event fires (Blackboard does not crash on coordinator misbehavior).

Set ``coordinator=None`` to fall back to round-robin selection —
useful for testing / prototyping but defeats the "contribute when
relevant" feature of the architecture.

Strengths
---------
* Decentralized contribution; agents react to current state rather
  than being forced into a fixed delegation graph.
* Transparent state — the blackboard is the audit log.

Weaknesses
----------
* Coordinator quality is critical. A bad coordinator picks wrong
  agents and the system stalls.
* Blackboard state grows monotonically; long sessions can blow up
  the LLM context.
* "Theoretically interesting but rarely outperforms hierarchical or
  graph in production" (2026 taxonomy guide). Reserve for
  exploratory research / data-discovery problems.
"""

from __future__ import annotations

import json
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING

from ..core.types import Event
from .base import AgentSession, Dependencies
from .helpers import SubagentInvocation

if TYPE_CHECKING:
    from ..agent.api import Agent


DEFAULT_COORDINATOR_INSTRUCTIONS = """\
You coordinate a team of specialist agents on a shared problem.

Read the blackboard state below and decide:
- Should we terminate now? (Has a satisfactory answer been written?)
- If continuing, which agent should contribute next?
- What specific instruction should that agent receive?

Available agents:
{agents}

Output JSON exactly:
{{"terminate": <bool>, "next_agent": <str|null>, "instruction": <str|null>}}

If terminating, set next_agent and instruction to null.
"""


DEFAULT_DECIDER_INSTRUCTIONS = """\
You synthesize the final answer from a multi-agent blackboard
discussion. Read the blackboard state below and produce the best
answer you can. Cite specific contributions when useful;
acknowledge dissent if it matters.
"""


[docs] @dataclass class BlackboardEntry: """One contribution on the blackboard.""" timestamp: datetime author: str content: str kind: str = "contribution"
[docs] @dataclass class Blackboard: """Public + per-agent private state for the architecture.""" public: list[BlackboardEntry] = field(default_factory=list) private: dict[str, list[BlackboardEntry]] = field( default_factory=dict )
[docs] def post( self, author: str, content: str, *, kind: str = "contribution", private_to: str | None = None, ) -> BlackboardEntry: entry = BlackboardEntry( timestamp=datetime.now(UTC), author=author, content=content, kind=kind, ) if private_to: self.private.setdefault(private_to, []).append(entry) else: self.public.append(entry) return entry
[docs] def render_for(self, agent_name: str) -> str: """Format the blackboard state as a string for ``agent_name``. Includes every public entry and the agent's own private scratchpad if any. """ public_lines = [ f"[{e.kind}] {e.author}: {e.content}" for e in self.public ] private_lines = [ f"[private:{e.kind}] {e.content}" for e in self.private.get(agent_name, []) ] parts = [] if public_lines: parts.append( "=== Public board ===\n" + "\n".join(public_lines) ) if private_lines: parts.append( "=== Your private notes ===\n" + "\n".join(private_lines) ) return "\n\n".join(parts) if parts else "(empty)"
@dataclass class _CoordinatorDecision: terminate: bool next_agent: str | None instruction: str | None raw: str
[docs] class BlackboardArchitecture: """Coordinator + agents + decider, mediated by a shared blackboard.""" name = "blackboard" def __init__( self, *, agents: dict[str, Agent], coordinator: Agent | None = None, decider: Agent | None = None, max_rounds: int = 10, coordinator_instructions: str | None = None, decider_instructions: str | None = None, ) -> None: if not agents: raise ValueError( "Blackboard requires at least one agent" ) if max_rounds < 1: raise ValueError("max_rounds must be >= 1") self._agents = dict(agents) self._coordinator = coordinator self._decider = decider self._max_rounds = max_rounds self._coordinator_instructions = ( coordinator_instructions or DEFAULT_COORDINATOR_INSTRUCTIONS ) self._decider_instructions = ( decider_instructions or DEFAULT_DECIDER_INSTRUCTIONS )
[docs] def declared_workers(self) -> dict[str, Agent]: workers = dict(self._agents) if self._coordinator is not None: workers["__coordinator"] = self._coordinator if self._decider is not None: workers["__decider"] = self._decider return workers
[docs] async def run( self, session: AgentSession, deps: Dependencies, prompt: str, ) -> AsyncIterator[Event]: bb = Blackboard() bb.post("user", prompt, kind="problem") yield Event.architecture_event( session.id, "blackboard.started", agents=list(self._agents), has_coordinator=self._coordinator is not None, has_decider=self._decider is not None, ) for round_num in range(1, self._max_rounds + 1): status = await deps.budget.allows_step() if status.blocked: session.interrupted = True session.interruption_reason = ( f"budget:{status.reason}" ) yield Event.budget_exceeded(session.id, status) return # Coordinator: stream its events through, capture decision. decision_holder: list[_CoordinatorDecision] = [] async for ev in self._coordinate_streaming( session, bb, round_num, decision_holder ): yield ev decision = decision_holder[0] yield Event.architecture_event( session.id, "blackboard.coordinator_decided", round=round_num, terminate=decision.terminate, next_agent=decision.next_agent, raw=decision.raw[:300], ) if decision.terminate: break if decision.next_agent is None: yield Event.architecture_event( session.id, "blackboard.no_contributor", round=round_num, ) continue if decision.next_agent not in self._agents: bb.post( "system", f"Coordinator picked unknown agent " f"{decision.next_agent!r}; skipping.", kind="error", ) yield Event.architecture_event( session.id, "blackboard.unknown_agent", round=round_num, agent_name=decision.next_agent, ) continue picked = self._agents[decision.next_agent] view = bb.render_for(decision.next_agent) instruction = ( decision.instruction or f"Contribute to the blackboard as {decision.next_agent}." ) agent_prompt = ( f"You are {decision.next_agent}.\n\n" f"Blackboard state:\n{view}\n\n" f"Coordinator instruction: {instruction}\n\n" f"Produce ONE contribution in plain text. Be " f"specific; cite prior contributions when useful." ) yield Event.architecture_event( session.id, "blackboard.invoking", round=round_num, agent=decision.next_agent, ) sub_session_id = ( f"{session.id}__bb_{decision.next_agent}_round_{round_num}" ) invocation = SubagentInvocation( picked, agent_prompt, session_id=sub_session_id ) async for ev in invocation.events(): yield ev picked_result = invocation.result session.turns += int(picked_result.get("turns", 0) or 0) output = str(picked_result.get("output", "")) bb.post( decision.next_agent, output, kind="contribution", ) yield Event.architecture_event( session.id, "blackboard.contribution", round=round_num, agent=decision.next_agent, content=output[:300], ) # === Synthesize === final_holder: list[str] = [] async for ev in self._decide_final_streaming( session, bb, final_holder ): yield ev final = final_holder[0] session.output = final yield Event.architecture_event( session.id, "blackboard.completed", final=final[:300], board_size=len(bb.public), )
# ---- helpers ----------------------------------------------------- async def _coordinate_streaming( self, session: AgentSession, bb: Blackboard, round_num: int, decision_holder: list[_CoordinatorDecision], ) -> AsyncIterator[Event]: """Run the coordinator (LLM Agent or round-robin fallback) and stream its events; write the decision into ``decision_holder[0]``.""" if self._coordinator is None: # Round-robin fallback — no LLM call, no events. names = list(self._agents) picked = names[(round_num - 1) % len(names)] decision_holder.append( _CoordinatorDecision( terminate=False, next_agent=picked, instruction=None, raw="(round-robin fallback)", ) ) return coord_prompt = self._coordinator_instructions.format( agents="\n".join( f" - {n}: {(a.instructions or '(no description)')[:120]}" for n, a in self._agents.items() ) ) + ( f"\n\nBlackboard state:\n{bb.render_for('__coordinator')}" ) sub_session_id = ( f"{session.id}__bb_coord_round_{round_num}" ) invocation = SubagentInvocation( self._coordinator, coord_prompt, session_id=sub_session_id ) async for ev in invocation.events(): yield ev coord_result = invocation.result session.turns += int(coord_result.get("turns", 0) or 0) decision_holder.append( _parse_coordinator_decision( str(coord_result.get("output", "")) ) ) async def _decide_final_streaming( self, session: AgentSession, bb: Blackboard, final_holder: list[str], ) -> AsyncIterator[Event]: """Run the decider (LLM Agent or fallback) and stream its events; write the final answer into ``final_holder[0]``.""" if self._decider is None: # Last "answer"-kind entry wins; fall through to last # public entry; finally the empty string. for entry in reversed(bb.public): if entry.kind == "answer": final_holder.append(entry.content) return for entry in reversed(bb.public): if entry.kind == "contribution": final_holder.append(entry.content) return final_holder.append("") return decide_prompt = self._decider_instructions + ( f"\n\nFull blackboard state:\n{bb.render_for('__decider')}" ) sub_session_id = f"{session.id}__bb_decider" invocation = SubagentInvocation( self._decider, decide_prompt, session_id=sub_session_id ) async for ev in invocation.events(): yield ev decider_result = invocation.result session.turns += int(decider_result.get("turns", 0) or 0) final_holder.append(str(decider_result.get("output", "")))
def _parse_coordinator_decision(text: str) -> _CoordinatorDecision: """Parse a coordinator's JSON output. Robust to markdown fences and free-form prose; falls back to "no contributor this round" when parsing fails entirely.""" cleaned = text.strip() if cleaned.startswith("```"): lines = cleaned.split("\n") if lines[0].startswith("```"): lines = lines[1:] while lines and lines[-1].strip().startswith("```"): lines = lines[:-1] cleaned = "\n".join(lines).strip() parsed: object try: parsed = json.loads(cleaned) except (json.JSONDecodeError, ValueError): parsed = None if isinstance(parsed, dict): terminate = bool(parsed.get("terminate", False)) next_agent_raw = parsed.get("next_agent") next_agent = ( str(next_agent_raw) if next_agent_raw is not None else None ) instruction_raw = parsed.get("instruction") instruction = ( str(instruction_raw) if instruction_raw is not None else None ) return _CoordinatorDecision( terminate=terminate, next_agent=next_agent, instruction=instruction, raw=text, ) # Parse failure → conservative no-op. return _CoordinatorDecision( terminate=False, next_agent=None, instruction=None, raw=text, )