A forma mais prática de conseguir **multiagentes “sem chefe”** é tratar seu sistema como um **sistema distribuído**: você não remove a coordenação — você substitui “um coordenador” por **protocolos + estado compartilhado + regras de término**.

Abaixo vai um desenho bem concreto (com opções), incluindo **comunicação entre agentes (não all‑to‑all)** e **como qualquer agente pode pedir esclarecimentos ao usuário** sem virar bagunça.

---

## 1) Princípios de design para “sem hierarquia” (mas com ordem)

### O que você quer evitar

* Um **agente fixo** que:

  * interpreta a tarefa,
  * decide o plano,
  * atribui subtarefas,
  * consolida resultado.

### O que você ainda precisa ter

* Um jeito de:

  1. **quebrar tarefa em subtarefas**
  2. **alocar subtarefas**
  3. **coordenar dependências**
  4. **detectar quando acabou**
  5. **pedir dúvidas ao usuário** (human-in-the-loop)

Você pode fazer isso de forma “não hierárquica” com 2 padrões clássicos:

1. **Blackboard / Task Board (stigmergy):** agentes coordenam escrevendo/lendo um quadro comum (estado compartilhado).
   Há inclusive trabalhos recentes discutindo blackboard aplicado a LLM multi-agent. ([arxiv.org][1])

2. **Market-based / Contract Net:** cada subtarefa vira um “pedido”; agentes fazem “lances”/propostas; o “gerente” é **temporário por subtarefa** (quem abriu a subtarefa), não um chefe fixo. O Contract Net original (Smith, 1980) é o clássico aqui. ([reidgsmith.com][2])

Na prática, **misturar** os dois dá o melhor custo/benefício.

---

## 2) Componentes mínimos da arquitetura

Pense nestes blocos:

### A) Agentes (workers)

* Cada agente tem:

  * **perfil/capacidades** (“sou bom em X”)
  * ferramentas (HTTP, DB, código, etc.)
  * memória local (curta) + acesso a memória compartilhada (longa)
  * um **loop**: observar → decidir → agir → publicar estado

### B) Um “Task Board” (estado compartilhado)

Pode ser Postgres/Redis/document store.

* Guarda: tarefas, subtarefas, dependências, status, artefatos, perguntas ao usuário, votos de conclusão.

> Isso **não é um coordenador** (não decide nada). É uma “lousa”.

### C) Um barramento de mensagens (para evitar all-to-all)

* Pub/sub (NATS, Redis streams, Kafka, etc.) ou filas.
* Para mensagem direta: “mailbox” por agente.

### D) Um canal de usuário (UI/API)

* O usuário conversa com o sistema.
* Importante: **não precisa ser “agente-chefe”** — pode ser só um gateway que:

  * recebe mensagens do usuário e publica no board/bus
  * entrega perguntas dos agentes ao usuário

### E) Observabilidade

* tracing por tarefa/subtarefa, logs estruturados, replay.
* Isso vira essencial quando você tem vários agentes “falando”.

---

## 3) Como os agentes se organizam sem chefe

### 3.1 Protocolo de inicialização (planejamento distribuído)

Quando chega uma tarefa “T”:

1. O gateway cria um objeto `Task(T)` no board com:

   * objetivo
   * restrições
   * critérios de aceite (“Definition of Done”)
   * contexto inicial

2. Um “round” curto de propostas:

   * agentes interessados publicam **propostas de plano** (plan fragments) no board:

     * subtarefas sugeridas
     * dependências
     * riscos/perguntas

3. Em vez de “um agente consolidar”, você usa uma regra determinística:

   * **merge por união** + deduplicação (por hash/embedding)
   * conflitos resolvidos por:

     * votação simples (quorum)
     * ou “melhor justificativa” (cada proposta inclui custo/benefício)

> Isso é importante: “sem chefe” normalmente vira “sem merge” se você não padronizar esse passo.

### 3.2 Alocação de subtarefas (sem coordenador fixo)

Use um mecanismo tipo Contract Net, mas **por subtarefa**:

* Uma subtarefa `S` fica em estado `OPEN_FOR_BIDS`.

* Agentes enviam “bids” com:

  * confiança
  * custo estimado (tempo/tokens)
  * ferramentas necessárias
  * dependências

* A subtarefa é “claimada” pelo melhor bid via regra:

  * menor custo com confiança mínima
  * ou votação/ranqueamento

**Quem abriu a subtarefa age como “manager daquela subtarefa”** (função temporária). Isso mantém a ideia não hierárquica: o papel gira o tempo todo. ([reidgsmith.com][2])

---

## 4) Comunicação entre agentes sem all‑to‑all

Você tem 3 estratégias (pode combinar):

### Estratégia A — Pub/Sub por tópicos (recomendado)

* Em vez de mandar para todos, publique em tópicos:

  * `task.<task_id>`
  * `capability.search`
  * `capability.coding`
  * `artifact.review`
* Só quem “assina” recebe.

Isso reduz ruído e escala melhor.

### Estratégia B — Diretório de capacidades + roteamento

* Cada agente publica no board algo como:

  * `agent_id`, `skills`, `tools`, `current_load`
* Para pedir ajuda, o agente consulta o diretório e manda direto para 1–3 candidatos.

### Estratégia C — “Gossip” para descoberta (se você quiser mais descentralizado)

Se você realmente quiser menos centralização e tolerância a falhas, pode usar protocolos tipo gossip para espalhar:

* presença de agentes
* carga
* “quem está fazendo o quê”
* atualizações de estado

Isso é comum em sistemas P2P; por exemplo, Cassandra descreve gossip como troca periódica de estado entre nós (peer‑to‑peer). ([Apache Cassandra][3])

> Para a maioria dos projetos, A + B já resolve sem complexidade de C.

---

## 5) Como “saber que acabou” (detecção de término)

Esse é o ponto que mais quebra sistemas multiagente.

### Abordagem prática (90% dos casos)

Defina um **Definition of Done (DoD)** e um **protocolo de fechamento**:

1. O board mantém:

   * subtarefas com status (`OPEN`, `CLAIMED`, `IN_PROGRESS`, `BLOCKED`, `NEEDS_USER`, `DONE`)
   * dependências
   * artefatos produzidos
   * checklist de DoD

2. Quando um agente acha que terminou:

   * marca sua subtarefa como `DONE` + anexa evidências (links, outputs, testes)
   * publica um evento `READY_FOR_CLOSE(task_id)`

3. “Fechamento por quorum”:

   * cada agente faz uma checagem rápida:

     * “há subtarefa aberta?”
     * “DoD satisfeito?”
     * “há pergunta pendente ao usuário?”
   * se **k de n** confirmam, o task vira `CLOSED`.
   * se alguém vetar, reabre com justificativa.

Vantagens: simples, funciona bem quando existe um board confiável.

### Abordagem formal (se você quiser “modo distribuído sério”)

Use um algoritmo de **distributed termination detection**, como o **Dijkstra–Scholten**, que trata “processos ativos/passivos” e mensagens em trânsito para detectar término global. ([Wikipedia][4])

Isso faz mais sentido se:

* você tem execução realmente distribuída,
* sem um estado central confiável,
* e quer provar que “não tem mensagem pendente”.

---

## 6) Como permitir que agentes perguntem ao usuário (sem virar spam)

Você quer “qualquer agente pode falar com o usuário”, mas precisa de **um protocolo social**.

### Padrão recomendado: “Question Tickets”

Em vez de cada agente mandar mensagem direto:

1. O agente cria um `QuestionTicket` no board:

   * pergunta
   * por que precisa
   * opções sugeridas (A/B/C)
   * urgência
   * que subtarefas estão bloqueadas

2. Regras anti-spam:

   * **apenas 1 ticket ativo por tarefa** (ou por thread)
   * tickets têm *cooldown*
   * tickets devem ser “fecháveis” (tem como responder objetivamente)

3. Quem pergunta ao usuário?

* Opção 1 (mais limpa): o gateway/UI puxa o ticket mais urgente e pergunta ao usuário.
* Opção 2 (mais “peer”): o agente que criou o ticket adquire um **lock** `user_question_token` no board e pergunta diretamente; ao terminar, libera o token.

O efeito: **qualquer agente pode requisitar dúvida**, mas o sistema mantém cadência.

> Frameworks modernos já têm noções de “human-in-the-loop” embutidas; por exemplo, o OpenAI Agents SDK lista mecanismos de human-in-the-loop, sessões (memória persistente) e tracing. ([openai.github.io][5])
> LangGraph também enfatiza fluxos stateful/long-running e human-in-the-loop. ([DeepWiki][6])

---

## 7) Um protocolo de mensagens que funciona (bem pé no chão)

Padronize mensagens como eventos JSON (ou Pydantic), por exemplo:

* `TASK_PROPOSAL`
* `SUBTASK_CREATED`
* `BID_SUBMITTED`
* `SUBTASK_CLAIMED`
* `ARTIFACT_PUBLISHED`
* `QUESTION_TICKET_CREATED`
* `READY_FOR_CLOSE`
* `CLOSE_VOTE`

Campos essenciais:

* `task_id`, `subtask_id`
* `sender_agent_id`, `target_agent_id` (opcional)
* `type`
* `payload`
* `requires_ack`
* `correlation_id` (para encadear)
* `timestamp`

Isso facilita replay, auditoria e debug.

---

## 8) Sugestões de implementação (com frameworks atuais)

Você pode implementar “do zero” (fila + board), mas vale conhecer ferramentas que já resolvem partes chatas:

### OpenAI Agents SDK (produção, primitives simples)

* A documentação descreve o SDK como upgrade “production‑ready” do Swarm, com primitives como **Agents**, **Handoffs** (delegação entre agentes), **Guardrails**, além de **sessions**, **human‑in‑the‑loop** e **tracing**. ([openai.github.io][5])
* O repositório do Swarm deixa claro que era um framework educacional e não mantém estado entre chamadas. ([GitHub][7])

**Como encaixa no seu objetivo:** você implementa o *protocolo descentralizado* (board + bids + quorum) e usa o SDK para rodar agentes, handoffs, validação e tracing.

### AutoGen (multi-agent por conversas)

* AutoGen é um framework open-source voltado a aplicações com múltiplos agentes conversando. ([arxiv.org][8])
* A própria Microsoft Research menciona evolução do AutoGen (ex.: v0.4). ([microsoft.com][9])

**Como encaixa:** bom para patterns de conversação e coordenação por diálogo; você ainda pode forçar um modo não hierárquico com protocolos.

### LangGraph (orquestração por grafo, stateful)

* LangGraph se posiciona como framework de orquestração para fluxos com agentes, incluindo multi-agent, com foco em fluxos realistas e robustez. ([langchain.com][10])
* Documentação/visão geral menciona execução durável, streaming e human-in-the-loop. ([DeepWiki][6])

**Como encaixa:** excelente para modelar “quem fala com quem” como um grafo (evita all-to-all por design).

### CrewAI (times/flows)

* CrewAI se descreve como framework open-source para orquestrar agentes autônomos com “Crews” e “Flows”. ([CrewAI Docs][11])

**Como encaixa:** é ótimo para equipes com papéis; você pode adaptar para menos hierarquia, mas ele tende a ter uma “estrutura de fluxo”.

---

## 9) Leituras para expandir a ideia (do clássico ao moderno)

### Multi-agent “clássico” (coordenação e comunicação)

* **Contract Net Protocol (Smith, 1980)** – alocação distribuída por propostas/contratos. ([reidgsmith.com][2])
* **FIPA ACL** – estrutura de mensagens de comunicação entre agentes (performatives etc.). ([PPGIA PUCPR][12])

### Coordenação por estado compartilhado (blackboard/stigmergy)

* Paper recente sobre **LLM multi-agent + blackboard** (ideia de repetição de seleção/execução até consenso no quadro). ([arxiv.org][1])
* Artigo clássico sobre coordenação por **stigmergy** (em contexto industrial, mas o princípio é o mesmo). ([ScienceDirect][13])

### Detecção de término (para “saber quando acabou”)

* **Dijkstra–Scholten termination detection** (referência clássica) e discussões sobre término em sistemas distribuídos. ([Wikipedia][4])

### Fundamentos de execução concorrente por mensagens

* **Actor Model** (atores como primitivo de concorrência por mensagens; ótimo para pensar em agentes). ([arxiv.org][14])

### Estado replicado sem coordenador (se você for full P2P)

* **CRDTs** – estruturas de dados replicadas que convergem mesmo com falhas/reordenação. ([Perso Lip6][15])
* Para descoberta disseminada, veja **gossip** em sistemas reais (ex.: Cassandra). ([Apache Cassandra][3])

---

## 10) Um “template” de arquitetura que eu implementaria primeiro

Se eu fosse montar hoje uma primeira versão pragmática (sem chefe, mas simples):

1. **Task Board** em Postgres (ou Redis) com estados + lock leve.
2. **Pub/Sub** para eventos por `task_id` e por `capability`.
3. **Protocolo**:

   * round de propostas de plano
   * criação de subtarefas
   * bidding/claim
   * execução + publicação de artefatos
   * tickets de pergunta ao usuário
   * fechamento por quorum + DoD
4. **Rate limit** e “question token” para o usuário.
5. **Tracing** desde o início (você vai agradecer depois).

Depois, se precisar mais descentralização:

* replicar o board via CRDT + gossip (bem mais complexo).

---

Se você me disser **em que domínio** seus agentes vão atuar (ex.: engenharia de software, suporte, pesquisa, automações internas) e **quais ferramentas** eles precisam usar (web, DB, repositório git, APIs internas), eu consigo sugerir:

* uma divisão inicial de “papéis/capacidades” dos agentes,
* o esquema do Task Board (campos e estados),
* e um protocolo de mensagens minimamente completo (com exemplos de payloads).

[1]: https://arxiv.org/abs/2507.01701?utm_source=chatgpt.com "Exploring Advanced LLM Multi-Agent Systems Based on Blackboard Architecture"
[2]: https://reidgsmith.com/The_Contract_Net_Protocol_Dec-1980.pdf?utm_source=chatgpt.com "The Contract Net Protocol: High-Level Communication and Control in a ..."
[3]: https://cassandra.apache.org/doc/latest/cassandra/architecture/dynamo.html?utm_source=chatgpt.com "Dynamo | Apache Cassandra Documentation"
[4]: https://en.wikipedia.org/wiki/Dijkstra%E2%80%93Scholten_algorithm?utm_source=chatgpt.com "Dijkstra–Scholten algorithm"
[5]: https://openai.github.io/openai-agents-python/ "OpenAI Agents SDK"
[6]: https://deepwiki.com/langchain-ai/docs/2.2-langgraph-framework-documentation?utm_source=chatgpt.com "LangGraph Framework Documentation | langchain-ai/docs | DeepWiki"
[7]: https://github.com/openai/swarm?utm_source=chatgpt.com "GitHub - openai/swarm: Educational framework exploring ergonomic ..."
[8]: https://arxiv.org/abs/2308.08155?utm_source=chatgpt.com "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation"
[9]: https://www.microsoft.com/en-us/research/project/autogen/?utm_source=chatgpt.com "AutoGen - Microsoft Research"
[10]: https://www.langchain.com/langgraph?utm_source=chatgpt.com "LangGraph: Agent Orchestration Framework for Reliable AI Agents - LangChain"
[11]: https://docs.crewai.com/en/introduction?utm_source=chatgpt.com "Introduction - CrewAI"
[12]: https://ppgia.pucpr.br/~fabricio/ftp/Aulas/Mestrado/AS/Aula3/SC00061G_FIPA_ACL.pdf?utm_source=chatgpt.com "FIPA ACL Message Structure Specification"
[13]: https://www.sciencedirect.com/science/article/pii/S0166361503001234?utm_source=chatgpt.com "Multi-agent coordination and control using stigmergy"
[14]: https://arxiv.org/vc/arxiv/papers/1008/1008.1459v8.pdf?utm_source=chatgpt.com "Actor Model of Computation - arXiv.org"
[15]: https://perso.lip6.fr/Marc.Shapiro/papers/CRDTs_SSS-2011.pdf?utm_source=chatgpt.com "Conﬂict-freeReplicatedDataTyp"

SPLIT


Perfeito — dá para validar **só o “sistema nervoso”** (comunicação + estado compartilhado + fechamento) com uma versão mínima em Python, sem tools, e com uma **abstração bem fina** sobre OpenAI.

Abaixo eu te deixo um “mini‑swarm” **leaderless na prática**:

* **Sem agente coordenador fixo**
* Coordenação via:

  * **Board** (estado compartilhado in‑memory)
  * **MessageBus** (pub/sub por “capability” + mensagens diretas)
* Agentes:

  * `planner` cria o DAG de subtarefas (papel temporário, não “chefe”)
  * `asker` pergunta ao usuário
  * `writer` rascunha
  * `reviewer` revisa
  * `synth` sintetiza e fecha
* LLM:

  * `OpenAIResponsesLLM` (usa `AsyncOpenAI` + `client.responses.create(...)`)
  * fallback `MockLLM` (se não tiver `OPENAI_API_KEY`)

> A integração com OpenAI está no padrão atual do SDK: `client.responses.create(...)` e `response.output_text`, com versão async via `AsyncOpenAI`. ([GitHub][1])

---

# Versão mínima (um arquivo): `swarm_min.py`

> **Rode sem chave** (mock) ou com `OPENAI_API_KEY` (real).
> **Sem tools**, apenas para validar comunicação + protocolo.

```python
# swarm_min.py
"""
Mini-swarm não-hierárquico (MVP) — apenas comunicação + estado.

- MessageBus: pub/sub por capability + direct messages (sem all-to-all)
- Board: estado compartilhado (in-memory) com subtarefas e dependências
- Agents: planner -> ask -> draft -> review -> synth
- LLM: OpenAI Responses API (AsyncOpenAI) ou MockLLM

Como rodar:
  pip install openai
  export OPENAI_API_KEY="..."
  export OPENAI_MODEL="gpt-5.2"   # opcional (ou outro modelo que você tenha)
  python swarm_min.py

Sem OPENAI_API_KEY, roda com MockLLM (bom para testar o protocolo).
"""

from __future__ import annotations

import asyncio
import os
import time
import uuid
import datetime
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Set, List, Protocol


# ----------------------------
# Infra: mensagens + bus
# ----------------------------

@dataclass(frozen=True)
class Message:
    type: str
    sender: str
    topic: str
    task_id: Optional[str] = None
    subtask_id: Optional[str] = None
    payload: Dict[str, Any] = field(default_factory=dict)
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    ts: float = field(default_factory=lambda: time.time())


class MessageBus:
    """
    Bus simples:
      - publish(topic): entrega para todos os subscribers daquele topic
      - send(agent_id): entrega direto no inbox do agent
    """
    def __init__(self) -> None:
        self._subs: Dict[str, Set[str]] = {}
        self._inboxes: Dict[str, asyncio.Queue[Message]] = {}
        self._lock = asyncio.Lock()

    async def register(self, agent_id: str) -> asyncio.Queue[Message]:
        async with self._lock:
            if agent_id in self._inboxes:
                raise ValueError(f"agent_id already registered: {agent_id}")
            q: asyncio.Queue[Message] = asyncio.Queue()
            self._inboxes[agent_id] = q
            return q

    async def subscribe(self, agent_id: str, topic: str) -> None:
        async with self._lock:
            self._subs.setdefault(topic, set()).add(agent_id)

    async def publish(self, topic: str, msg: Message) -> None:
        async with self._lock:
            targets = list(self._subs.get(topic, set()))
        for agent_id in targets:
            q = self._inboxes.get(agent_id)
            if q is not None:
                await q.put(msg)

    async def send(self, agent_id: str, msg: Message) -> None:
        q = self._inboxes.get(agent_id)
        if q is None:
            raise ValueError(f"unknown agent_id: {agent_id}")
        await q.put(msg)


# ----------------------------
# Infra: board (estado compartilhado)
# ----------------------------

class Status:
    PENDING = "PENDING"   # criada, mas esperando dependências
    OPEN = "OPEN"         # pronta para claim
    CLAIMED = "CLAIMED"   # em execução por um agente
    DONE = "DONE"         # concluída


@dataclass
class SubtaskState:
    subtask_id: str
    title: str
    required_capability: str
    depends_on: Set[str] = field(default_factory=set)
    status: str = Status.PENDING
    owner: Optional[str] = None
    output: Optional[str] = None


@dataclass
class TaskState:
    task_id: str
    goal: str
    created_at: float = field(default_factory=lambda: time.time())
    subtasks: Dict[str, SubtaskState] = field(default_factory=dict)
    closed: bool = False
    final_output: Optional[str] = None


class Board:
    """
    Board in-memory com lock.
    Para produção, isso viraria Postgres/Redis + CAS/leases.
    """
    def __init__(self) -> None:
        self._tasks: Dict[str, TaskState] = {}
        self._lock = asyncio.Lock()

    async def create_task(self, goal: str) -> TaskState:
        async with self._lock:
            task_id = str(uuid.uuid4())
            t = TaskState(task_id=task_id, goal=goal)
            self._tasks[task_id] = t
            return t

    async def get_task(self, task_id: str) -> TaskState:
        async with self._lock:
            return self._tasks[task_id]

    async def add_subtasks(self, task_id: str, subtasks: List[SubtaskState]) -> List[SubtaskState]:
        """
        Adiciona subtarefas, marca OPEN se deps satisfeitas.
        Retorna as que ficaram OPEN (para publicar eventos).
        """
        newly_open: List[SubtaskState] = []
        async with self._lock:
            t = self._tasks[task_id]
            for st in subtasks:
                st.status = Status.OPEN if len(st.depends_on) == 0 else Status.PENDING
                t.subtasks[st.subtask_id] = st
                if st.status == Status.OPEN:
                    newly_open.append(st)
        return newly_open

    async def claim_subtask(self, task_id: str, subtask_id: str, agent_id: str) -> bool:
        async with self._lock:
            st = self._tasks[task_id].subtasks[subtask_id]
            if st.status != Status.OPEN:
                return False
            st.status = Status.CLAIMED
            st.owner = agent_id
            return True

    async def complete_subtask(self, task_id: str, subtask_id: str, agent_id: str, output: str) -> List[SubtaskState]:
        """
        Marca DONE, abre subtarefas que ficaram prontas,
        e fecha o task se tudo DONE.
        """
        newly_open: List[SubtaskState] = []
        async with self._lock:
            t = self._tasks[task_id]
            st = t.subtasks[subtask_id]
            if st.owner != agent_id:
                raise ValueError("only owner can complete subtask")
            st.status = Status.DONE
            st.output = output

            done_ids = {sid for sid, s in t.subtasks.items() if s.status == Status.DONE}
            for s in t.subtasks.values():
                if s.status == Status.PENDING and s.depends_on.issubset(done_ids):
                    s.status = Status.OPEN
                    newly_open.append(s)

            if (not t.closed) and all(s.status == Status.DONE for s in t.subtasks.values()):
                t.closed = True
                # Heurística: o output final vem do subtask de capability "synth"
                for s in t.subtasks.values():
                    if s.required_capability == "synth":
                        t.final_output = s.output
        return newly_open


# ----------------------------
# LLM: abstração fina
# ----------------------------

class LLM(Protocol):
    async def generate(self, *, instructions: str, input_text: str) -> str: ...


class MockLLM:
    async def generate(self, *, instructions: str, input_text: str) -> str:
        low = instructions.lower()
        if "asker" in low or "pergunta" in low:
            return "Qual formato você prefere para a resposta (bullet points, texto corrido, ou passo a passo)?"
        if "writer" in low:
            return "Rascunho (mock): uma resposta inicial com estrutura, explicações e passos."
        if "review" in low or "critic" in low:
            return "Revisão (mock): está claro. Sugestões: adicionar 1 exemplo e uma lista de próximos passos."
        if "synth" in low:
            return "Resposta final (mock): versão consolidada com base no rascunho e na revisão."
        if "planner" in low:
            return "Plano (mock): 1) esclarecer 2) rascunho 3) revisão 4) resposta final."
        return f"(mock) {input_text[:120]}"


class OpenAIResponsesLLM:
    """
    Wrapper mínimo do OpenAI Python SDK usando Responses API.
    """
    def __init__(self, model: str) -> None:
        from openai import AsyncOpenAI  # pip install openai

        self.model = model
        self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

    async def generate(self, *, instructions: str, input_text: str) -> str:
        resp = await self.client.responses.create(
            model=self.model,
            instructions=instructions,
            input=input_text,
        )
        # Conveniência do SDK: texto agregado
        return (resp.output_text or "").strip()

    async def aclose(self) -> None:
        await self.client.close()


# ----------------------------
# Agentes
# ----------------------------

def now_str() -> str:
    return datetime.datetime.now().strftime("%H:%M:%S")


class Agent:
    def __init__(self, agent_id: str, capabilities: Set[str], llm: LLM, bus: MessageBus, board: Board):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.llm = llm
        self.bus = bus
        self.board = board
        self.inbox: asyncio.Queue[Message] = None  # type: ignore
        self._running = True

    async def start(self) -> None:
        self.inbox = await self.bus.register(self.agent_id)
        for cap in self.capabilities:
            await self.bus.subscribe(self.agent_id, f"cap.{cap}")
        await self.bus.subscribe(self.agent_id, "user.answer")

    def log(self, text: str) -> None:
        print(f"[{now_str()}] [{self.agent_id}] {text}")

    async def run(self) -> None:
        self.log("started")
        while self._running:
            msg = await self.inbox.get()
            if msg.type == "STOP":
                self._running = False
                self.log("stopping")
                break
            await self.handle(msg)

    async def handle(self, msg: Message) -> None:
        self.log(f"got {msg.type} on {msg.topic}")


async def publish_subtask_open(bus: MessageBus, sender: str, task_id: str, st: SubtaskState) -> None:
    await bus.publish(
        f"cap.{st.required_capability}",
        Message(
            type="SUBTASK_OPEN",
            sender=sender,
            topic=f"cap.{st.required_capability}",
            task_id=task_id,
            subtask_id=st.subtask_id,
            payload={"title": st.title},
        ),
    )


class PlannerAgent(Agent):
    def __init__(self, agent_id: str, llm: LLM, bus: MessageBus, board: Board):
        super().__init__(agent_id, {"plan"}, llm, bus, board)
        self._busy = False

    async def handle(self, msg: Message) -> None:
        if msg.type == "TASK_CREATED":
            if self._busy:
                return
            self._busy = True
            asyncio.create_task(self._plan(msg.task_id))
        elif msg.type == "DIRECT":
            self.log(f"direct from {msg.sender}: {msg.payload.get('text')}")

    async def _plan(self, task_id: Optional[str]) -> None:
        assert task_id
        t = await self.board.get_task(task_id)
        self.log(f"planning: {t.goal!r}")

        # MVP: DAG fixo (ask -> draft -> review -> synth)
        q_id = f"q-{uuid.uuid4().hex[:8]}"
        d_id = f"d-{uuid.uuid4().hex[:8]}"
        r_id = f"r-{uuid.uuid4().hex[:8]}"
        f_id = f"f-{uuid.uuid4().hex[:8]}"

        subtasks = [
            SubtaskState(q_id, "Esclarecer preferências com o usuário", "ask"),
            SubtaskState(d_id, "Produzir rascunho da resposta", "draft", depends_on={q_id}),
            SubtaskState(r_id, "Revisar rascunho e sugerir melhorias", "review", depends_on={d_id}),
            SubtaskState(f_id, "Sintetizar resposta final", "synth", depends_on={d_id, r_id}),
        ]
        newly_open = await self.board.add_subtasks(task_id, subtasks)
        for st in newly_open:
            await publish_subtask_open(self.bus, self.agent_id, task_id, st)

        self._busy = False


class AskAgent(Agent):
    def __init__(self, agent_id: str, llm: LLM, bus: MessageBus, board: Board):
        super().__init__(agent_id, {"ask"}, llm, bus, board)
        self._busy = False
        self._waiting: Dict[str, asyncio.Future[str]] = {}

    async def handle(self, msg: Message) -> None:
        if msg.type == "SUBTASK_OPEN" and msg.topic == "cap.ask":
            if self._busy:
                return
            assert msg.task_id and msg.subtask_id
            if not await self.board.claim_subtask(msg.task_id, msg.subtask_id, self.agent_id):
                return
            self._busy = True
            asyncio.create_task(self._do_ask(msg.task_id, msg.subtask_id))

        elif msg.type == "USER_ANSWER":
            if msg.subtask_id and msg.subtask_id in self._waiting:
                fut = self._waiting.pop(msg.subtask_id)
                if not fut.done():
                    fut.set_result(str(msg.payload.get("answer", "")))

        elif msg.type == "DIRECT":
            self.log(f"direct from {msg.sender}: {msg.payload.get('text')}")

    async def _do_ask(self, task_id: str, subtask_id: str) -> None:
        t = await self.board.get_task(task_id)
        instructions = "Você é um agente asker. Crie uma pergunta curta para esclarecer formato/nível de detalhe."
        question = await self.llm.generate(instructions=instructions, input_text=f"Tarefa: {t.goal}\nCrie UMA pergunta.")
        question = question.strip()

        self.log(f"asking user: {question}")
        await self.bus.publish(
            "user.ask",
            Message(
                type="USER_QUESTION",
                sender=self.agent_id,
                topic="user.ask",
                task_id=task_id,
                subtask_id=subtask_id,
                payload={"question": question},
            ),
        )

        fut: asyncio.Future[str] = asyncio.get_event_loop().create_future()
        self._waiting[subtask_id] = fut
        answer = await fut

        self.log(f"user answered: {answer!r}")
        newly_open = await self.board.complete_subtask(task_id, subtask_id, self.agent_id, output=answer)
        for st in newly_open:
            await publish_subtask_open(self.bus, self.agent_id, task_id, st)

        self._busy = False


class DraftAgent(Agent):
    def __init__(self, agent_id: str, llm: LLM, bus: MessageBus, board: Board, reviewer_id: str):
        super().__init__(agent_id, {"draft"}, llm, bus, board)
        self._busy = False
        self._reviewer_id = reviewer_id

    async def handle(self, msg: Message) -> None:
        if msg.type == "SUBTASK_OPEN" and msg.topic == "cap.draft":
            if self._busy:
                return
            assert msg.task_id and msg.subtask_id
            if not await self.board.claim_subtask(msg.task_id, msg.subtask_id, self.agent_id):
                return
            self._busy = True
            asyncio.create_task(self._do_draft(msg.task_id, msg.subtask_id))

        elif msg.type == "DIRECT":
            self.log(f"direct from {msg.sender}: {msg.payload.get('text')}")

    async def _do_draft(self, task_id: str, subtask_id: str) -> None:
        t = await self.board.get_task(task_id)
        user_pref = next((s.output for s in t.subtasks.values() if s.required_capability == "ask" and s.status == Status.DONE), None)

        instructions = "Você é um agente writer. Produza um rascunho curto, claro e bem estruturado."
        input_text = f"Tarefa: {t.goal}\nPreferência do usuário: {user_pref}\nEscreva o rascunho."
        draft = await self.llm.generate(instructions=instructions, input_text=input_text)

        self.log("draft ready")
        newly_open = await self.board.complete_subtask(task_id, subtask_id, self.agent_id, output=draft)

        # Mensagem direta (teste de comunicação point-to-point)
        await self.bus.send(
            self._reviewer_id,
            Message(type="DIRECT", sender=self.agent_id, topic="direct", task_id=task_id,
                    payload={"text": "Rascunho pronto. A revisão deve abrir agora."}),
        )

        for st in newly_open:
            await publish_subtask_open(self.bus, self.agent_id, task_id, st)

        self._busy = False


class ReviewAgent(Agent):
    def __init__(self, agent_id: str, llm: LLM, bus: MessageBus, board: Board, synth_id: str):
        super().__init__(agent_id, {"review"}, llm, bus, board)
        self._busy = False
        self._synth_id = synth_id

    async def handle(self, msg: Message) -> None:
        if msg.type == "SUBTASK_OPEN" and msg.topic == "cap.review":
            if self._busy:
                return
            assert msg.task_id and msg.subtask_id
            if not await self.board.claim_subtask(msg.task_id, msg.subtask_id, self.agent_id):
                return
            self._busy = True
            asyncio.create_task(self._do_review(msg.task_id, msg.subtask_id))

        elif msg.type == "DIRECT":
            self.log(f"direct from {msg.sender}: {msg.payload.get('text')}")

    async def _do_review(self, task_id: str, subtask_id: str) -> None:
        t = await self.board.get_task(task_id)
        draft_text = next((s.output for s in t.subtasks.values() if s.required_capability == "draft" and s.status == Status.DONE), "")

        instructions = "Você é um agente reviewer. Faça uma revisão concisa e liste melhorias."
        review = await self.llm.generate(instructions=instructions, input_text=f"Rascunho:\n{draft_text}")

        self.log("review ready")
        newly_open = await self.board.complete_subtask(task_id, subtask_id, self.agent_id, output=review)

        await self.bus.send(
            self._synth_id,
            Message(type="DIRECT", sender=self.agent_id, topic="direct", task_id=task_id,
                    payload={"text": "Revisão concluída. A síntese deve abrir agora."}),
        )

        for st in newly_open:
            await publish_subtask_open(self.bus, self.agent_id, task_id, st)

        self._busy = False


class SynthAgent(Agent):
    def __init__(self, agent_id: str, llm: LLM, bus: MessageBus, board: Board):
        super().__init__(agent_id, {"synth"}, llm, bus, board)
        self._busy = False

    async def handle(self, msg: Message) -> None:
        if msg.type == "SUBTASK_OPEN" and msg.topic == "cap.synth":
            if self._busy:
                return
            assert msg.task_id and msg.subtask_id
            if not await self.board.claim_subtask(msg.task_id, msg.subtask_id, self.agent_id):
                return
            self._busy = True
            asyncio.create_task(self._do_synth(msg.task_id, msg.subtask_id))

        elif msg.type == "DIRECT":
            self.log(f"direct from {msg.sender}: {msg.payload.get('text')}")

    async def _do_synth(self, task_id: str, subtask_id: str) -> None:
        t = await self.board.get_task(task_id)
        user_pref = next((s.output for s in t.subtasks.values() if s.required_capability == "ask" and s.status == Status.DONE), None)
        draft_text = next((s.output for s in t.subtasks.values() if s.required_capability == "draft" and s.status == Status.DONE), "")
        review_text = next((s.output for s in t.subtasks.values() if s.required_capability == "review" and s.status == Status.DONE), "")

        instructions = "Você é um agente synth. Gere a resposta final para o usuário."
        input_text = (
            f"Tarefa: {t.goal}\n\n"
            f"Preferência do usuário: {user_pref}\n\n"
            f"Rascunho:\n{draft_text}\n\n"
            f"Revisão:\n{review_text}\n\n"
            f"Agora gere a resposta final."
        )
        final = await self.llm.generate(instructions=instructions, input_text=input_text)

        self.log("final ready")
        await self.board.complete_subtask(task_id, subtask_id, self.agent_id, output=final)

        await self.bus.publish(
            "task.closed",
            Message(type="TASK_CLOSED", sender=self.agent_id, topic="task.closed", task_id=task_id, payload={"final": final}),
        )

        self._busy = False


# ----------------------------
# Gateway do usuário (infra, não agente)
# ----------------------------

async def user_gateway(bus: MessageBus, auto_answer: Optional[str] = None) -> None:
    """
    Recebe perguntas via tópico user.ask e devolve respostas em user.answer.
    """
    inbox = await bus.register("user-gateway")
    await bus.subscribe("user-gateway", "user.ask")

    while True:
        msg = await inbox.get()
        if msg.type == "STOP":
            break
        if msg.type != "USER_QUESTION":
            continue

        question = str(msg.payload.get("question", ""))
        print(f"\n[USER] Pergunta: {question}")

        if auto_answer is None:
            answer = await asyncio.to_thread(input, "[USER] Sua resposta: ")
        else:
            print(f"[USER] (auto) {auto_answer}")
            answer = auto_answer

        await bus.publish(
            "user.answer",
            Message(
                type="USER_ANSWER",
                sender="user",
                topic="user.answer",
                task_id=msg.task_id,
                subtask_id=msg.subtask_id,
                payload={"answer": answer},
            ),
        )


# ----------------------------
# Main
# ----------------------------

def make_llm() -> LLM:
    api_key = os.environ.get("OPENAI_API_KEY")
    if api_key:
        model = os.environ.get("OPENAI_MODEL", "gpt-5.2")
        return OpenAIResponsesLLM(model=model)
    return MockLLM()


async def main() -> None:
    bus = MessageBus()
    board = Board()
    llm = make_llm()

    planner = PlannerAgent("planner", llm, bus, board)
    asker = AskAgent("asker", llm, bus, board)
    synth = SynthAgent("synth", llm, bus, board)
    reviewer = ReviewAgent("reviewer", llm, bus, board, synth_id="synth")
    writer = DraftAgent("writer", llm, bus, board, reviewer_id="reviewer")
    agents = [planner, asker, writer, reviewer, synth]

    for a in agents:
        await a.start()

    # Main listener
    main_inbox = await bus.register("main")
    await bus.subscribe("main", "task.closed")

    # User gateway
    auto = os.environ.get("AUTO_USER_ANSWER")  # se setar, não pergunta no terminal
    gw_task = asyncio.create_task(user_gateway(bus, auto_answer=auto))

    # Start agent loops
    agent_tasks = [asyncio.create_task(a.run()) for a in agents]

    # Create task
    goal = os.environ.get("GOAL")
    if not goal:
        goal = input("Objetivo do usuário: ").strip()

    task = await board.create_task(goal)
    task_id = task.task_id

    # Publish to planning capability (não é broadcast geral)
    await bus.publish(
        "cap.plan",
        Message(type="TASK_CREATED", sender="user", topic="cap.plan", task_id=task_id, payload={"goal": goal}),
    )

    # Wait for closure
    closed = await main_inbox.get()
    final = closed.payload.get("final", "")

    print("\n" + "=" * 40)
    print("FINAL")
    print("=" * 40)
    print(final)

    # Stop everything
    stop = Message(type="STOP", sender="main", topic="stop")
    await bus.send("user-gateway", stop)
    for a in agents:
        await bus.send(a.agent_id, stop)

    await asyncio.gather(*agent_tasks, return_exceptions=True)
    await gw_task

    # Se estiver usando OpenAIResponsesLLM, fecha conexões
    if isinstance(llm, OpenAIResponsesLLM):
        await llm.aclose()


if __name__ == "__main__":
    asyncio.run(main())
```

---

# Como rodar (2 modos)

## 1) Mock (sem API) — ótimo pra validar o protocolo

```bash
python swarm_min.py
```

Ele roda com respostas “mock”, mas você vê:

* eventos `SUBTASK_OPEN` indo por tópico `cap.*` (sem all-to-all)
* mensagem direta `DIRECT` entre writer → reviewer e reviewer → synth
* pergunta ao usuário via `user.ask` e resposta via `user.answer`

## 2) OpenAI real (Responses API)

A lib oficial recomenda **Responses API** como interface principal e o SDK expõe `output_text`. ([GitHub][1])

```bash
pip install openai
export OPENAI_API_KEY="..."
export OPENAI_MODEL="gpt-5.2"   # ou o modelo que você tiver
python swarm_min.py
```

O uso assíncrono com `AsyncOpenAI` + `await client.responses.create(...)` é o fluxo padrão do SDK. ([GitHub][1])

---

# Por que isso já valida o “sistema de comunicação”

Você já consegue testar, com logs:

1. **Roteamento por capability**

   * `TASK_CREATED` vai só para `cap.plan`
   * Subtarefas “abertas” vão só para `cap.ask`, `cap.draft`, `cap.review`, `cap.synth`

2. **Mensagens diretas**

   * `bus.send(...)` para “ping”/coordenação pontual (sem broadcast)

3. **Human-in-the-loop**

   * Agente pergunta via evento
   * Gateway pergunta ao usuário
   * Resposta volta como evento (e desbloqueia o DAG)

4. **Detecção de término (mínima)**

   * Board fecha quando **todas** as subtarefas estão `DONE`
   * Publica `TASK_CLOSED`

---

# Próximos passos (sem aumentar muito a complexidade)

Quando você estiver satisfeito com o “nervo”:

1. **Vários planners (de verdade leaderless)**

   * mais de um agente em `cap.plan` propondo planos
   * board faz merge por regra (ex.: union + dedup)
2. **Leases** para claims (evitar subtarefa presa se agente morrer)
3. **Persistência** do Board (Postgres/Redis)
4. **Tools/funções** (depois)
   A Responses API foi desenhada para incluir tool calling / function calling quando você quiser evoluir. ([platform.openai.com][2])

Se você quiser, eu posso te mandar uma segunda versão do MVP onde:

* existem **2–3 planners concorrentes**,
* merge de plano é “leaderless” (CAS no board),
* e a pergunta ao usuário vira uma “fila” (um ticket por vez), sem precisar de “controle central” além do board.

[1]: https://github.com/openai/openai-python "GitHub - openai/openai-python: The official Python library for the OpenAI API"
[2]: https://platform.openai.com/docs/api-reference/responses/create?lang=python "Responses | OpenAI API Reference"

SPLIT

Dá para chegar bem perto desse “mundo ideal” (plugar agente novo e ele *automaticamente* começa a colaborar) se você tratar agentes como **plugins** com um **contrato mínimo** e uma **infra comum** (bus + board). O segredo é: o core não “conhece” agentes específicos — ele só conhece **capabilities, mensagens e estado**.

Abaixo está um desenho bem pragmático (e minimalista) para você evoluir o MVP que você já quer em Python.

---

## 1) O que precisa ser estável no core (o “contrato” do swarm)

Se você quer plugabilidade real, o core precisa definir **3 contratos** (pequenos!):

### A) Contrato de mensagens

Um envelope único (como você já tem):

* `type` (ex.: `TASK_CREATED`, `SUBTASK_OPEN`, `SUBTASK_DONE`, `HELP_REQUEST`, `QUESTION_TICKET`)
* `topic` (ex.: `cap.math`, `task.created`, `user.ask`)
* `task_id`, `subtask_id`
* `sender`
* `payload` (dict livre)
* `correlation_id` (pra request/response)

> Regra de ouro: agente novo só precisa aprender esses envelopes e alguns `type` essenciais.

### B) Contrato do Board (estado compartilhado)

Um esquema mínimo e versionado:

* Task

  * `goal`
  * `closed`
* Subtask

  * `required_capability`
  * `depends_on`
  * `status`
  * `owner`
  * `output`
  * **idempotency_key** (muito importante para “agente plugado não duplicar tudo”)

### C) Contrato de “capabilities”

Tudo que permite roteamento sem all-to-all.

* Ex.: `ask`, `draft`, `review`, `synth`, `examples`, `math`, `edge_cases`, `style`, `qa`, etc.
* Um agente novo entra e diz: “eu tenho capability X”, e pronto: ele passa a receber `SUBTASK_OPEN` daquela capability.

---

## 2) Duas formas de um agente novo “entrar no jogo”

Esse ponto é crucial: um agente novo só é útil se ele consegue **(1) ser acionado** e/ou **(2) se auto-acionar**.

### Modo 1 — Reativo (executa quando alguém pedir)

O agente novo só precisa:

* declarar `capabilities = {"examples"}`
* assinar `cap.examples`
* ao receber `SUBTASK_OPEN` com `required_capability="examples"`, ele faz `claim` e executa

✅ Vantagem: simples e controlado
⚠️ Desvantagem: alguém (um planner, ou outros agentes) precisa *criar* subtarefas “examples”.

### Modo 2 — Proativo (se pluga e já contribui)

O agente também assina `task.created` e **propõe subtarefas** no Board, sem ninguém precisar saber que ele existe.

Ex.: “sou um ExamplesAgent; sempre que surge uma tarefa, eu proponho uma subtarefa `Adicionar exemplos`”.

✅ Vantagem: plug and play de verdade
⚠️ Desvantagem: você precisa de deduplicação/idempotência e anti-loop

> Para o seu “mundo ideal”, o Modo 2 é o que faz o swarm crescer sem reescrever o core.

---

## 3) Interface de plugin mínima (Python puro)

Você não precisa de framework. Um contrato “tosco” já resolve:

* Um plugin é um módulo Python que exporta uma função `build(core) -> Agent`
* O core faz:

  * importar todos os módulos de `plugins/`
  * chamar `build(core)`
  * registrar e iniciar o agente

### O que é `core`?

Um objeto simples com:

* `bus`
* `board`
* `llm`
* `config` (opcional)

---

## 4) Loader de plugins (bem enxuto)

### Estrutura sugerida

```
swarm/
  swarm_core.py
  swarm_main.py
  plugins/
    examples_agent.py
    style_agent.py
    __init__.py   # opcional
```

### Loader (exemplo)

```python
import importlib.util
from pathlib import Path
from typing import List

def load_agents_from_dir(plugins_dir: str, core) -> List["Agent"]:
    agents = []
    p = Path(plugins_dir)
    for file in p.glob("*.py"):
        if file.name.startswith("_"):
            continue
        spec = importlib.util.spec_from_file_location(file.stem, file)
        if spec is None or spec.loader is None:
            continue
        mod = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(mod)

        build = getattr(mod, "build", None)
        if callable(build):
            agent = build(core)
            agents.append(agent)
    return agents
```

Isso já permite “dropar arquivo novo” e reiniciar o processo.

> Se depois você quiser “plugin instalável via pip”, aí você migra para entry points (`importlib.metadata.entry_points`). Mas pro MVP, pasta `plugins/` é o caminho mais rápido.

---

## 5) Como os agentes “se descobrem” sem all‑to‑all

Você não precisa que agente conheça agente. Ele só precisa:

* conhecer capabilities
* conhecer task_id/subtask_id
* ler/escrever no Board

Mesmo assim, eu adicionaria um **registry opcional** no Board (infra, não coordenação):

* `board.register_agent(agent_id, capabilities, meta)`
* `board.find_agents(capability)`

Isso te habilita mensagens diretas eficientes (escolher 1–3 alvos), sem broadcast.

---

## 6) A peça que faz plugabilidade funcionar: “propostas” + idempotência

Sem idempotência, plugin proativo vira “gera 30 subtarefas iguais”.

### Adicione `idempotency_key` em Subtask

A ideia:

* O agente ao propor uma subtarefa gera uma chave determinística, ex.:

  * `examples:{task_id}`
  * `style:{task_id}:concise`
* O Board faz `upsert` por essa chave (se já existe, ignora).

Pseudo-API do Board:

```python
async def upsert_subtask(self, task_id: str, idempotency_key: str, spec: SubtaskSpec) -> bool:
    """
    Retorna True se inseriu algo novo, False se já existia.
    """
```

Isso é *o* item que transforma “plugar agente” em “não causar caos”.

---

## 7) Exemplo de agente plugável (proativo + reativo)

### `plugins/examples_agent.py`

Ele:

1. Ao ver `TASK_CREATED`, cria uma subtarefa `Adicionar exemplos` (idempotente)
2. Ao ver `SUBTASK_OPEN` em `cap.examples`, ele executa.

```python
from dataclasses import dataclass
from typing import Set
import uuid

# Você reutiliza suas classes Agent/Message/SubtaskState/Status do core.
# Aqui vou assumir que core expõe:
# core.AgentBase, core.Message, core.SubtaskState, core.Status

def build(core):
    return ExamplesAgent(core)

class ExamplesAgent(core.AgentBase):
    def __init__(self, core):
        super().__init__(
            agent_id="examples",
            capabilities={"examples"},
            llm=core.llm,
            bus=core.bus,
            board=core.board,
        )

    async def start(self):
        await super().start()
        # Proativo: assina criação de tarefa
        await self.bus.subscribe(self.agent_id, "task.created")

    async def handle(self, msg):
        # 1) Propor subtarefa ao surgir uma tarefa
        if msg.type == "TASK_CREATED" and msg.topic == "task.created":
            task_id = msg.task_id
            key = f"examples:{task_id}"

            # cria a subtask de forma idempotente
            created = await self.board.upsert_subtask(
                task_id=task_id,
                idempotency_key=key,
                title="Adicionar exemplos e casos concretos",
                required_capability="examples",
                depends_on=set(),  # ou depende do draft/review se você quiser
            )
            if created:
                # publica evento de open se já estiver pronta
                st = await self.board.get_subtask_by_key(task_id, key)
                if st.status == core.Status.OPEN:
                    await core.publish_subtask_open(self.bus, self.agent_id, task_id, st)

        # 2) Executar quando abrir subtask da capability
        if msg.type == "SUBTASK_OPEN" and msg.topic == "cap.examples":
            task_id, subtask_id = msg.task_id, msg.subtask_id
            if not await self.board.claim_subtask(task_id, subtask_id, self.agent_id):
                return

            t = await self.board.get_task(task_id)
            instructions = "Você é um agente que cria exemplos práticos e curtos."
            examples = await self.llm.generate(
                instructions=instructions,
                input_text=f"Objetivo:\n{t.goal}\nCrie 3 exemplos concretos.",
            )

            newly_open = await self.board.complete_subtask(task_id, subtask_id, self.agent_id, examples)
            for st in newly_open:
                await core.publish_subtask_open(self.bus, self.agent_id, task_id, st)
```

Esse agente entra no swarm sem você alterar nenhum outro agente.

---

## 8) Como garantir que “agente novo interaja” com o core (sem acoplamento)

O core precisa adotar um padrão: **não hardcode pipeline fixo demais**.

O que você tinha (ask→draft→review→synth) funciona, mas se tudo estiver fechado nesse pipeline, um plugin “examples” nunca entra.

Três maneiras de abrir espaço sem virar bagunça:

### Opção A — Pipeline base + “slots”

No seu planner core, sempre crie também:

* `improvements` (capability genérica)
* `extras`
  E plugins podem competir (bids/claims) nesses slots.

### Opção B — Propostas sempre-on

Sempre publique `TASK_CREATED` em `task.created` e deixe plugins proporem subtarefas extras.
O core mantém apenas 3–4 subtarefas essenciais, e o resto “emerge”.

### Opção C — “Auction” por etapa

O core cria um estágio: “enriquecimento” e abre `OPEN_FOR_BIDS` — plugins entram com bids.

---

## 9) Regras de convivência (para o swarm não degradar conforme cresce)

Quando você pluga muitos agentes, aparecem problemas típicos. Eu colocaria **desde cedo**:

1. **Rate limit por agente** (ex.: máx. N propostas por task_id)
2. **Orçamento por task** (máx. subtarefas totais; ou exige quorum para criar mais)
3. **Namespace para capabilities e tipos**

   * capabilities: `text.examples`, `text.style`, `qa.checks`
   * tipos custom: `x.examples.SOMETHING`
4. **Política de pergunta ao usuário via tickets**

   * plugin não pergunta direto; ele cria `QuestionTicket` e o gateway escolhe 1 por vez

---

## 10) O que eu faria agora no seu MVP (passos concretos)

1. **Adicionar `task.created`** (tópico)

   * além de mandar para `cap.plan`, publique também:

     * `topic="task.created"`
       Só agentes que quiserem assinam.

2. **Implementar `upsert_subtask(idempotency_key)`**

   * isso habilita plugins proativos com segurança.

3. **Criar loader `plugins/`**

   * importa módulos, chama `build(core)`, adiciona no runtime.

4. **Converter seus agentes “core” em plugins também**

   * planner/asker/writer/reviewer/synth viram módulos plugáveis.
   * aí o core fica realmente estável.

5. **Testar com 1 plugin extra**

   * “ExamplesAgent” ou “EdgeCasesAgent”
   * e conferir se o fluxo fecha e não duplica subtarefa.

---

Se você quiser, eu posso te mandar a versão do seu `swarm_min.py` refatorada em:

* `core.py` (bus/board/llm/base agent + upsert)
* `runtime.py` (loader + start/stop)
* `plugins/core_*.py` (seus 5 agentes)
* `plugins/examples_agent.py` (um custom)

Tudo ainda “Python puro”, sem tools, e mantendo a abstração fininha de LLM.

