Metadata-Version: 2.4
Name: floorctl
Version: 0.4.2
Summary: Contention-based multi-agent coordination with transactional floor control
Project-URL: Homepage, https://github.com/ravi-labs/moltbot-agents/tree/main/floorctl
Project-URL: Repository, https://github.com/ravi-labs/moltbot-agents
Project-URL: Issues, https://github.com/ravi-labs/moltbot-agents/issues
Author: Ravi Kanagasikamani
License-Expression: MIT
License-File: LICENSE
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.11
Provides-Extra: all
Requires-Dist: firebase-admin>=6.0; extra == 'all'
Requires-Dist: openai>=1.0; extra == 'all'
Requires-Dist: websockets>=13.0; extra == 'all'
Provides-Extra: dev
Requires-Dist: mypy>=1.10; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Requires-Dist: websockets>=13.0; extra == 'dev'
Provides-Extra: firestore
Requires-Dist: firebase-admin>=6.0; extra == 'firestore'
Provides-Extra: openai
Requires-Dist: openai>=1.0; extra == 'openai'
Provides-Extra: websocket
Requires-Dist: websockets>=13.0; extra == 'websocket'
Description-Content-Type: text/markdown

# floorctl

Contention-based multi-agent coordination with transactional floor control.

A Python framework for building multi-agent systems where agents **compete** for a shared floor rather than being orchestrated by a central controller.

## Key Innovations

- **Transactional Floor Control**: Atomic claim/release ensures only one agent speaks at a time
- **Urgency-Based Scheduling**: Agents compute urgency scores to decide when to compete
- **Self-Validating Agents**: Each agent validates its own output before posting
- **Reactive Moderator**: Observer pattern — watches and intervenes, doesn't control
- **Modular Capabilities**: Extend agents with RAG, memory, tools, and custom hooks — no subclassing
- **Research-Grade Metrics**: Gini coefficient, floor dynamics, validation rates

## Install

```bash
pip install floorctl               # core library (zero dependencies)
pip install "floorctl[openai]"     # + OpenAI for LLM-powered agents
pip install "floorctl[websocket]"  # + WebSocket for distributed sessions
pip install "floorctl[firestore]"  # + Firebase Firestore for production
pip install "floorctl[all]"        # all optional backends
pip install "floorctl[dev]"        # + pytest, mypy, ruff for development
```

## Quick Start

```python
from floorctl import FloorAgent, FloorSession, AgentProfile, InMemoryBackend

def my_llm(agent_name, context):
    # Your LLM call here
    return f"{agent_name}: My response about {context['topic']}"

backend = InMemoryBackend()

agent1 = FloorAgent(
    name="Optimist",
    profile=AgentProfile(name="Optimist", react_to=["problem", "risk"], temperament="passionate"),
    generate_fn=my_llm,
    backend=backend,
)
agent2 = FloorAgent(
    name="Skeptic",
    profile=AgentProfile(name="Skeptic", react_to=["opportunity", "vision"], temperament="reactive"),
    generate_fn=my_llm,
    backend=backend,
)

session = FloorSession(backend=backend)
session.add_agent(agent1)
session.add_agent(agent2)
result = session.run("debate-001", topic="Should we colonize Mars?")
```

## Core Concepts

| Concept | What it does |
|---------|-------------|
| **FloorAgent** | An autonomous agent that listens, computes urgency, claims the floor, generates a response, self-validates, and posts |
| **FloorSession** | Orchestrates the lifecycle — starts agents in threads, runs the moderator, collects metrics |
| **ModeratorObserver** | Watches the conversation and intervenes only on anomalies (silence, dominance, escalation) |
| **Backend** | The coordination substrate — atomic floor claims, turn posting, pub/sub. Swap between InMemory, WebSocket, or Firestore |
| **Urgency Scorer** | 8 composable components that decide whether an agent should compete for the floor |
| **Validators** | Self-validation pipeline — agents check their own output before posting |
| **Capabilities** | Modular plugins (RAG, memory, tools) that hook into the generate pipeline without subclassing |

## Full Example with OpenAI + Moderator

```python
import os
from openai import OpenAI
from floorctl import (
    FloorAgent, FloorSession, ModeratorObserver,
    AgentProfile, ArenaConfig, PhaseConfig, PhaseSequence,
    FloorConfig, ModeratorConfig, InMemoryBackend,
    SpeakerPrefixValidator, DuplicateValidator,
    LengthValidator, BannedPhraseValidator,
)

client = OpenAI()  # uses OPENAI_API_KEY from env

# --- Generator: wraps any LLM ---
def generate(agent_name: str, context: dict) -> str:
    retry_info = ""
    if context.get("retry_failures"):
        retry_info = "\nFix these issues:\n" + "\n".join(
            f"- {f}" for f in context["retry_failures"]
        )

    prompt = f"""You are {agent_name}.
Personality: {context.get('personality', '')}
Topic: {context['topic']}
Phase: {context['phase']}

Recent discussion:
{context.get('recent_turns', '(none)')}

RULES:
- Prefix response with "{agent_name}:"
- {context.get('phase_min_words', 15)}-{context.get('phase_max_words', 80)} words
- Be substantive, no filler
{retry_info}

Respond:"""

    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=200, temperature=0.8,
    )
    return resp.choices[0].message.content.strip()


# --- Moderator ---
def moderate(prompt_type: str, context: dict) -> str:
    prompts = {
        "intro": f"You moderate a discussion on '{context.get('topic')}' with {context.get('agents')}. 2-sentence intro.",
        "invite_opening": f"Invite {context.get('agent_name')} to share their view. 1 sentence.",
        "phase_transition": f"Transition from {context.get('previous_phase')} to {context.get('phase')}. 2 sentences.",
        "intervention": f"Moderate: {context.get('intervention_type')} detected for {context.get('target_agent')}. 1 sentence.",
        "closing": f"Close the discussion on '{context.get('topic')}'. 2 sentences.",
    }
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompts.get(prompt_type, "Brief comment.")}],
        max_tokens=150, temperature=0.7,
    )
    return resp.choices[0].message.content.strip()


# --- Setup ---
backend = InMemoryBackend()

phases = PhaseSequence(phases=[
    PhaseConfig(name="IDEAS", is_opening=True, max_turns=12, max_words=80, min_words=10),
    PhaseConfig(name="DEBATE", min_turns=6, max_turns=16, max_words=120, min_words=20),
    PhaseConfig(name="CLOSING", is_terminal=True),
])

config = ArenaConfig(
    phases=phases,
    floor=FloorConfig(timeout_seconds=30, min_turns_between_speaking=1),
    moderator=ModeratorConfig(silence_threshold=3, dominance_threshold=3),
    banned_phrases=["As an AI", "Let's face it"],
    max_total_turns=20,
)

validators = [
    SpeakerPrefixValidator(),
    DuplicateValidator(),
    LengthValidator(),
    BannedPhraseValidator(banned_phrases=config.banned_phrases),
]

optimist = FloorAgent(
    name="Optimist",
    profile=AgentProfile(
        name="Optimist",
        personality="Bold futurist. Thinks in moonshots.",
        react_to=["risk", "problem", "can't", "impossible"],
        temperament="passionate",
        urgency_boost_keywords=["future", "opportunity", "transform"],
    ),
    generate_fn=generate, backend=backend,
    validators=validators, config=config,
)

skeptic = FloorAgent(
    name="Skeptic",
    profile=AgentProfile(
        name="Skeptic",
        personality="Pragmatic engineer. Argues with data.",
        react_to=["dream", "vision", "should", "must"],
        temperament="reactive",
        urgency_boost_keywords=["cost", "evidence", "precedent"],
    ),
    generate_fn=generate, backend=backend,
    validators=validators, config=config,
)

session_id = "demo-001"
backend.create_session(session_id, {
    "topic": "Should AI be given legal personhood?",
    "phase": "IDEAS",
    "participants": ["Optimist", "Skeptic"],
})

moderator = ModeratorObserver(
    agent_names=["Optimist", "Skeptic"],
    moderator_fn=moderate,
    backend=backend,
    session_id=session_id,
    phase_sequence=phases,
    config=config.moderator,
)

backend.subscribe_turns(session_id, lambda t: print(f"[{t.speaker}] {t.text[:150]}"))

session = FloorSession(backend=backend, config=config)
session.add_agent(optimist)
session.add_agent(skeptic)
session.set_moderator(moderator)

result = session.run(session_id, topic="Should AI be given legal personhood?", timeout_seconds=120)

print(f"\nDone! {result.total_turns} turns in {result.duration_seconds:.1f}s")
print(f"Gini: {result.session_metrics.get('participation', {}).get('gini', 'N/A')}")
```

## Temperaments

Each agent has a temperament that sets its urgency threshold:

| Temperament | Threshold | Behaviour |
|------------|-----------|-----------|
| `reactive` | 0.35 | Jumps in quickly, speaks often |
| `passionate` | 0.40 | Engaged, responds to triggers fast |
| `provocative` | 0.42 | Challenges others, slightly restrained |
| `mediating` | 0.45 | Balanced, waits for the right moment |
| `deliberate` | 0.50 | Thoughtful, speaks with purpose |
| `patient` | 0.55 | Waits longest, speaks only when necessary |

## Urgency Components

The urgency score is computed from 8 composable components. When the total exceeds the agent's threshold, it competes for the floor:

| Component | Max Score | Triggers on |
|-----------|-----------|-------------|
| `keyword_react` | 0.6 | Agent's `react_to` keywords found in last turn |
| `boost_keywords` | 0.3 | Agent's `urgency_boost_keywords` in last turn |
| `name_mention` | 0.4 | Agent's name mentioned by another speaker |
| `moderator_question` | 0.35 | Moderator asks a question or opens the floor |
| `silence_bonus` | 0.40 | Agent hasn't spoken for 4+ turns |
| `recent_speaker_penalty` | -0.5 | Agent spoke too recently |
| `jitter` | 0.15 | Random tie-breaking |
| `phase_boost` | 0.10 | Active discussion phase |

## Validators

Agents self-validate before posting. If validation fails, the agent retries with failure reasons injected into the prompt:

```python
from floorctl import (
    SpeakerPrefixValidator,   # response must start with "AgentName:"
    DuplicateValidator,       # rejects near-duplicates (cosine similarity)
    LengthValidator,          # enforces min/max word count from phase config
    BannedPhraseValidator,    # blocks "As an AI" etc.
    StyleContractValidator,   # enforces custom regex patterns
    PhaseValidator,           # phase-specific rules
)

# Custom style contract per agent
from floorctl import StyleContract, AgentProfile

profile = AgentProfile(
    name="DataScientist",
    style_contract=StyleContract(
        description="Must cite at least one number or statistic",
        required_patterns=[r"\d+"],           # must contain a number
        forbidden_patterns=[r"(?i)obviously"], # no "obviously"
        max_sentences=4,
    ),
    react_to=["data", "model"],
    temperament="deliberate",
)
```

## Phases

Phases structure the conversation flow:

```python
phases = PhaseSequence(phases=[
    PhaseConfig(
        name="OPENING",
        is_opening=True,         # moderator invites agents one by one
        max_turns=8,
        max_words=60,
        allow_critiques=False,   # no attacking during opening
        constraints="State your position clearly.",
    ),
    PhaseConfig(
        name="DISCUSSION",
        min_turns=6,             # must run at least 6 agent turns
        max_turns=20,            # transitions after 20 agent turns
        max_words=120,
        min_words=20,
    ),
    PhaseConfig(name="CLOSING", is_terminal=True),
])
```

## Moderator Interventions

The moderator observes and intervenes automatically:

| Intervention | Trigger | What happens |
|-------------|---------|--------------|
| **Silence** | An agent hasn't spoken for N turns | Moderator invites them by name |
| **Dominance** | One agent took 3+ of the last 6 turns | Moderator redirects to quieter agent |
| **Escalation** | Heated language detected | Moderator de-escalates and reframes |

```python
ModeratorConfig(
    silence_threshold=3,
    dominance_window=6,
    dominance_threshold=3,
    escalation_threshold=2,
)
```

## Capabilities (v0.4)

Extend agent behavior without subclassing. Capabilities are modular plugins that hook into the generate pipeline:

```python
from floorctl import AgentCapability, FloorAgent

class RAGCapability(AgentCapability):
    name = "rag"

    def __init__(self, retriever):
        self.retriever = retriever

    def enrich_context(self, context):
        context["rag:docs"] = self.retriever.search(context["topic"])
        return context

class MemoryCapability(AgentCapability):
    name = "memory"

    def __init__(self):
        self.history = []

    def on_turn_received(self, turn, agent_name):
        self.history.append({"speaker": turn.speaker, "text": turn.text})

    def enrich_context(self, context):
        context["memory:history"] = self.history[-20:]
        return context

# Attach to any agent
agent = FloorAgent(name="Researcher", ...)
agent.add_capability(RAGCapability(my_retriever))
agent.add_capability(MemoryCapability())
```

### Capability Hooks

| Hook | When | Use for |
|------|------|---------|
| `enrich_context(context)` | Before LLM call | Inject data: RAG docs, search results, DB records |
| `post_process(response, context)` | After LLM call, before validation | Transform response: append sources, tool execution |
| `on_turn_received(turn, agent_name)` | On every new turn | Side-effects: logging, memory, analytics |

### Pipeline

```
New turn arrives
  |
  v
on_turn_received()        <-- all capabilities notified (side-effects only)
  |
  v
compute urgency  -->  claim floor (atomic)  -->  build base context
  |
  v
enrich_context()          <-- capabilities inject data (chained, insertion order)
  |
  v
generate_fn()             <-- LLM call with enriched context
  |
  v
post_process()            <-- capabilities transform the response (chained)
  |
  v
validators  -->  post turn
```

Capabilities are **error-resilient** — a failing capability is logged and skipped without crashing the agent. Context keys are namespaced by convention (`"rag:docs"`, `"memory:history"`) to avoid collisions.

### More Capability Examples

**Tool Use** — detect tool calls in the response and execute them:

```python
import re

class ToolCapability(AgentCapability):
    name = "tools"

    def __init__(self, tools: dict):
        self.tools = tools  # {"search": search_fn, "calculate": calc_fn}

    def post_process(self, response, context):
        pattern = r'\[TOOL:(\w+):([^\]]*)\]'
        for tool_name, args in re.findall(pattern, response):
            if tool_name in self.tools:
                result = self.tools[tool_name](args)
                response = response.replace(f"[TOOL:{tool_name}:{args}]", f"[{result}]")
        return response
```

**Composing capabilities** — just add them in order:

```python
agent = FloorAgent(name="FullStack", ...)
agent.add_capability(RAGCapability(my_retriever))     # 1. retrieve docs
agent.add_capability(MemoryCapability())               # 2. add conversation memory
agent.add_capability(ToolCapability({"search": fn}))   # 3. execute tool calls
```

### Testing Capabilities

Capabilities are plain Python objects — test without running a full session:

```python
def test_rag_enriches_context():
    cap = RAGCapability(mock_retriever)
    context = {"topic": "AI Safety", "phase": "DISCUSS"}
    result = cap.enrich_context(context)
    assert "rag:docs" in result

def test_tool_replaces_patterns():
    cap = ToolCapability({"upper": str.upper})
    response = "Result: [TOOL:upper:hello]"
    result = cap.post_process(response, {})
    assert result == "Result: [HELLO]"
```

## Backends

floorctl ships with three backends:

```python
# Local development (default)
from floorctl import InMemoryBackend
backend = InMemoryBackend()

# Distributed via WebSocket relay
from floorctl import WebSocketBackend
backend = WebSocketBackend("ws://relay.example.com:8765", api_key="key", agent_name="Agent1")
backend.connect()

# Production with Firestore (transactional)
from floorctl import FirestoreBackend
backend = FirestoreBackend(project_id="my-project", session_collection="sessions")
```

## Bringing Your Own LLM

floorctl is LLM-agnostic. The `generate_fn` is just a function:

```python
def generate(agent_name: str, context: dict) -> str:
    # context contains: topic, phase, personality, recent_turns,
    #                   phase_min_words, phase_max_words, style_contract,
    #                   retry_failures (if retrying after validation failure),
    #                   plus any keys injected by capabilities
    return f"{agent_name}: Your response here"
```

Works with OpenAI, Anthropic, local models, or any HTTP API:

```python
# Anthropic Claude
import anthropic
client = anthropic.Anthropic()

def generate(agent_name, context):
    resp = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=200,
        messages=[{"role": "user", "content": f"You are {agent_name}. Topic: {context['topic']}. Respond:"}],
    )
    return resp.content[0].text

# Local Ollama
import requests

def generate(agent_name, context):
    resp = requests.post("http://localhost:11434/api/generate", json={
        "model": "llama3",
        "prompt": f"You are {agent_name}. Topic: {context['topic']}. Respond:",
    })
    return resp.json()["response"]

# AWS Bedrock
import boto3, json

bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")

def generate(agent_name, context):
    response = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 200,
            "messages": [{"role": "user", "content": f"You are {agent_name}. Topic: {context['topic']}. Respond:"}],
        }),
    )
    result = json.loads(response["body"].read())
    return result["content"][0]["text"]
```

## Saving Transcripts

Export a rich Markdown transcript with floor contention tables and session metrics:

```python
from floorctl import export_transcript

result = session.run(...)
path = export_transcript(result, "transcripts/my-session.md")
```

## Metrics & Fairness

Every session produces research-grade metrics:

```python
result = session.run(...)

# Session-level
gini = result.session_metrics["participation"]["gini"]  # 0.0 = perfect equality

# Per-agent
for name, metrics in result.agent_metrics.items():
    floor = metrics["floor"]
    print(f"{name}: {floor['claims_won']}/{floor['claims_attempted']} claims won")
    print(f"  Avg urgency: {metrics['urgency']['mean']:.3f}")
    print(f"  Validation pass rate: {metrics['validation']['pass_rate']:.0%}")
```

## Examples

| Example | Agents | Demonstrates |
|---------|--------|-------------|
| `examples/debate.py` | 2 | Basic floor contention, urgency, TurnLoggerCapability |
| `examples/brainstorm.py` | 4 | Multi-party contention, dominance detection, IdeaTrackerCapability |
| `examples/code_review.py` | 3 | Style contracts, domain expertise, FindingsTrackerCapability |
| `examples/investment_committee.py` | 4 | Phase transitions, escalation, RiskRegisterCapability |
| `examples/architecture_adr.py` | 4 | Artifacts, consensus, conviction tracking, EvidenceExtractorCapability |
| `examples/distributed_debate.py` | 2 | WebSocket relay, multi-machine, PositionMemoryCapability |
| **`examples/customer_support.py`** | 3 | **Extensive** — RAG, sentiment, resolution tracking, escalation (5 capabilities) |
| **`examples/research_team.py`** | 4 | **Extensive** — Citations, claims, quality scoring, brief building (5 capabilities, shared state) |

## License

MIT
