Python 3.11+ required.
pip install -e packages/arbiter-ops
# Pick what you need
pip install -e "packages/arbiter-ops[slack,neo4j,integrations,llm,kafka]"
# ML opt-ins (each adds xgboost + numpy + sklearn / scipy)
pip install -e "packages/arbiter-ops[ml-decision]" # XGBoost triage classifier
pip install -e "packages/arbiter-ops[ml-intelligence]" # XGBoost cost predictor
pip install -e "packages/arbiter-ops[ml-surrogate]" # GA surrogate fitness
# Or everything
pip install -e "packages/arbiter-ops[all]"
python -c "import arbiter_ops; print(arbiter_ops.__version__)"
# 1.0.0
python scripts/arbiter_ops_leak_check.py
# leaks: 0 across 482 files scanned
python -m pytest packages/arbiter-ops/tests/ -q
# 1034 passed, 8 skipped in ~62s
The fastest path from nothing to a working substrate: write an audit record using the default LocalAuditAdapter.
from arbiter_ops.governance import (
make_default_audit_port, AuditRequest, AuditRecord,
Verdict, VerdictOutcome,
)
audit = make_default_audit_port() # writes JSON-line evidence to stdout
# 1. Authorize a candidate action BEFORE side effects
verdict = await audit.authorize(AuditRequest(
actor="ops.bot",
action="restart_service",
subject="payments-api",
context={"incident_id": "INC-2031", "blast_radius": "low"},
))
assert verdict.outcome in (VerdictOutcome.ALLOW, VerdictOutcome.DENY)
# 2. Record what actually happened (post-hoc evidence)
await audit.record(AuditRecord(
actor="ops.bot",
action="restart_service",
subject="payments-api",
succeeded=True,
metadata={"verdict_id": verdict.verdict_id, "duration_ms": 412},
))
ARBITER_OPS_AUDIT_PATH=/var/log/arbiter-ops.jsonl
to write to a file, or swap to a hosted backend (Postgres, Kafka) by registering your own
AuditPort implementation through the application container.
Two C4-style diagrams orient the rest of the guide. C4 is Simon Brown's notation for software architecture · four nested levels (System Context · Container · Component · Code). The dev guide ships levels 1 and 2; the per-plane deep-dives below correspond to level 3.
arbiter-ops as a single system surrounded by the actors and systems it interacts with.
Internal decomposition · 9 planes flowing left-to-right, governance underpinning everything, supporting containers around the perimeter.
| # | Plane | Responsibility | Default adapter | ML opt-in? |
|---|---|---|---|---|
| 1 | sensing | Ingest from observability sources | in-memory + http | — |
| 2 | context | Entity + topology resolver | in-memory · neo4j extra | — |
| 3 | feature | Feature engineering | pure-python | — |
| 4 | intelligence | LLM + ML reasoners (provider-neutral) | HistoricalMeanCostPredictor (Welford) | ml-intelligence |
| 5 | reasoning | Ensemble + causal classifier | rule-based | — |
| 6 | decision | Policy engine + autonomy levels | HeuristicTriageClassifier | ml-decision |
| 7 | action | Invokers · 6 SOAR vendors + adjacent | per-vendor REST + simulation | — |
| 8 | evidence | Facade over governance audit | LocalAuditAdapter | — |
| 9 | improvement | Offline GA · policy evolution | ReplayFitnessEvaluator | ml-surrogate |
Plus first-class supporting containers: governance (audit primitive · the foundation everything else depends on),
control (operator HTTP plane), hil (Temporal worker), workflow (agent-workflow boards),
triage_room (Slack/Teams bot), operator (recipe-driven CLI), conformance (architecture probe),
identity (tenancy + auth), knowledge (runbooks · post-mortems), agent (long-running loop).
domain/ directory (pure Python · zero infra imports),
an application/ directory (orchestration · uses ports), and an adapters/ directory (concrete tech).
You swap a plane by registering a different adapter through the DI container — never by editing engine code.
Foundation everything else depends on. Two-step decision audit: authorize() before, record() after.
LocalAuditAdapterfrom arbiter_ops.governance import make_default_audit_port
audit = make_default_audit_port()
Writes JSON-line evidence to stdout, or to a file at $ARBITER_OPS_AUDIT_PATH.
from arbiter_ops.governance.audit_port import AuditPort, AuditRecord, AuditRequest, Verdict
class PostgresAuditAdapter(AuditPort):
def __init__(self, conn): self.conn = conn
async def authorize(self, req: AuditRequest) -> Verdict:
# consult your policy engine, write the request row
...
async def record(self, rec: AuditRecord) -> None:
# append to your evidence table
...
Then wire it into the application container; the rest of arbiter-ops is unchanged.
Every incident gets a triage prediction: auto_approved, route_to_human, route_to_lead, or rejected · plus a confidence score and feature importances. Two interchangeable adapters via TriageClassifierPort.
HeuristicTriageClassifier (no ML dep)from arbiter_ops.decision.adapters.heuristic_triage import HeuristicTriageClassifier
from arbiter_ops.decision.domain.models import DecisionInput
clf = HeuristicTriageClassifier()
assert clf.is_loaded() # always True
prediction = clf.predict(DecisionInput(
incident_id="INC-2031",
blast_score=0.18, # fraction of fleet at risk
asset_score=0.42, # asset criticality
reversibility_score=0.85, # 1.0 = fully reversible
confidence_score=0.91, # upstream model confidence
reasoning_status="succeeded",
))
print(prediction.predicted_outcome) # DecisionOutcome.AUTO_APPROVED
print(prediction.confidence) # 0.97
print(prediction.feature_importances) # {'reasoning_status': 0.30, 'blast_score': 0.20, ...}
XGBoostTriageClassifier# Step 1: install the extra
pip install -e "packages/arbiter-ops[ml-decision]"
# Step 2: train (or use the bundled reference model)
python packages/arbiter-ops/scripts/train_triage_classifier.py \
--corpus packages/arbiter-ops/artifacts/triage_classifier/example_corpus.jsonl \
--output packages/arbiter-ops/artifacts/triage_classifier/v1/
# Step 3: load + predict
from arbiter_ops.decision.adapters.xgboost_triage import XGBoostTriageClassifier
clf = XGBoostTriageClassifier(
model_path="packages/arbiter-ops/artifacts/triage_classifier/v1/model.json",
class_map_path="packages/arbiter-ops/artifacts/triage_classifier/v1/class_map.json",
metadata_path="packages/arbiter-ops/artifacts/triage_classifier/v1/metadata.json",
)
if not clf.is_loaded():
clf = HeuristicTriageClassifier() # graceful fallback
TriagePrediction with the same shape: outcome_probabilities,
predicted_outcome, confidence, feature_importances, model_version,
prediction_id. Swap the adapter behind TriageClassifierPort; downstream code never changes.
Predicts per-(model, task) cost · success probability · latency, then picks the model with the highest expected value subject to RoutingPolicy constraints.
HistoricalMeanCostPredictor (Welford streaming · no ML dep)from arbiter_ops.intelligence.adapters import HistoricalMeanCostPredictor
pred = HistoricalMeanCostPredictor()
# Observe historical executions (live · stream from your billing pipe)
for record in cost_records:
pred.observe(
model_id=record["model_id"],
task_class=record["task_class"],
cost_usd=record["cost"],
latency_ms=record["latency_ms"],
succeeded=record["success"],
)
# Cold below 10 samples per (model, task) — call falls back to policy default
if pred.is_loaded(model_id="claude-sonnet", task_class="triage"):
p = pred.predict(model_id="claude-sonnet", task_class="triage")
print(p.expected_cost_usd, p.cost_confidence_interval, p.expected_value)
XGBoostCostPredictor (3 boosters)pip install -e "packages/arbiter-ops[ml-intelligence]"
python packages/arbiter-ops/scripts/train_cost_predictor.py \
--corpus packages/arbiter-ops/artifacts/cost_predictor/example_corpus.jsonl \
--output packages/arbiter-ops/artifacts/cost_predictor/
# Loads cost_regressor.json + success_classifier.json + latency_regressor.json
from arbiter_ops.intelligence.adapters.xgboost_cost_predictor import XGBoostCostPredictor
pred = XGBoostCostPredictor(model_dir="packages/arbiter-ops/artifacts/cost_predictor/")
CostAwareRouterfrom arbiter_ops.intelligence.application.cost_aware_router import CostAwareRouter
from arbiter_ops.intelligence.domain.models import RoutingPolicy
router = CostAwareRouter(predictor=pred)
policy = RoutingPolicy(
candidates=["claude-haiku", "claude-sonnet", "gpt-4o-mini"],
capabilities={"reasoning", "json_mode"},
compliance={"soc2"},
max_cost_per_call_usd=0.05,
default_model_id="claude-haiku", # fallback when no candidate qualifies
)
choice = router.route(task_class="triage", policy=policy)
print(choice.model_id, choice.expected_value)
# claude-sonnet 0.84 # highest expected_value under the cost cap
The provider-neutral LLMInvokerPort takes a completion callable that matches the OpenAI chat-completions shape. Two production adapters ship — pick by SDK · the rest of the substrate is identical:
# Option A · LiteLLM gateway (the original ADR-0006 default)
pip install -e "packages/arbiter-ops[litellm]"
from litellm import acompletion
from arbiter_ops.intelligence.adapters import GatedLLMInvoker
# (LiteLLMInvoker is the planned production wrapper · for v0.1 you can
# inject acompletion directly into the audit-gated invoker chain)
invoker = ...
# Option B · Portkey AI Gateway (v0.1+ · this release)
pip install -e "packages/arbiter-ops[portkey]"
from portkey_ai import AsyncPortkey
from arbiter_ops.intelligence.adapters import PortkeyInvoker
pk = AsyncPortkey(api_key=os.environ["PORTKEY_API_KEY"])
invoker = PortkeyInvoker(
completion=pk.chat.completions.create,
virtual_key="vk-tenant-acme-anthropic", # per-tenant credential vault
portkey_config="cfg-prod-fallback-v3", # gateway-side fallback chain
default_metadata={"environment": "prod"},
)
outcome = await invoker.invoke(
model=model_descriptor,
request=invocation_request,
)
# outcome.cost_usd · outcome.input_tokens · outcome.output_tokens
# outcome.metadata["portkey_trace_id"] pins back to the AIDecisionRecord
FallbackChain), semantic caching (gateway-side), and per-request observability metadata with trace_id correlation back to the substrate's AIDecisionRecord.decision_id. LiteLLM gives a slimmer pure-Python abstraction over the same set of provider SDKs without the gateway value-add.
The same gateway choice applies to the reasoning plane · LiteLLMReasoner and PortkeyReasoner are interchangeable at the ReasonerPort surface · both consume a completion callable and produce a Hypothesis.
The GA evolves policy configurations against historical replay. Ground-truth evaluation is expensive (replay ≈ seconds per genome) — the surrogate is fast (XGBoost prediction ≈ μs). HybridFitnessEvaluator blends both: surrogate every generation · ground-truth every N generations · drift detection emits an audit row when the surrogate diverges from reality.
pip install -e "packages/arbiter-ops[ml-surrogate]"
python packages/arbiter-ops/scripts/train_fitness_surrogate.py \
--corpus packages/arbiter-ops/artifacts/fitness_surrogate/example_corpus.jsonl \
--output packages/arbiter-ops/artifacts/fitness_surrogate/
from arbiter_ops.improvement.adapters.xgboost_fitness_surrogate import XGBoostFitnessSurrogate
from arbiter_ops.improvement.application.hybrid_fitness import HybridFitnessEvaluator
from arbiter_ops.governance import make_default_audit_port
# (replay-based ground-truth evaluator is your existing FitnessEvaluatorPort)
evaluator = HybridFitnessEvaluator(
surrogate=XGBoostFitnessSurrogate(model_dir="packages/arbiter-ops/artifacts/fitness_surrogate/"),
ground_truth=replay_evaluator,
audit=make_default_audit_port(),
validation_every_n_generations=5,
validation_sample_size=8,
divergence_threshold=0.05, # MAE on scalar fitness
fallback_window_generations=5, # pin to ground-truth on drift
rng_seed=42,
)
scores = await evaluator.evaluate_population(genomes, generation=27)
# Inspect what happened
print(evaluator.metrics.surrogate_calls, # cheap calls
evaluator.metrics.ground_truth_calls, # validation calls
evaluator.metrics.using_fallback, # True if drift kicked in
evaluator.metrics.last_validation_mae)
surrogate.drift_detected audit record via the governance port,
pins to ground-truth for the next fallback_window_generations, and exposes the event on
HybridMetrics.drift_events. Wire this into your alerting; surrogate divergence is a leading indicator of
policy-corpus drift.
6 SOAR vendors shipped. All capabilities ship with reversibility=COSTLY · requires_simulation=True ·
requires_rollback_artifact=True · the Action plane refuses to invoke without a simulation pass and a recorded
rollback artifact.
| Vendor | Capabilities | Auth |
|---|---|---|
| Splunk SOAR (Phantom) | run_playbook · create_container | token |
| Cortex XSOAR | run_playbook · create_incident | API key + ID |
| Tines | send_to_story (webhook) · create_record | per-story webhook secret |
| Swimlane (Turbine) | run_playbook · create_record | Private-Token header |
| Google Chronicle SOAR | run_playbook · create_case | OAuth Bearer (static or callable) |
| Microsoft Sentinel | run_playbook (Logic Apps) · update_incident (ARM) | SAS-signed URL · Azure AD Bearer |
from arbiter_ops.action.adapters.invokers.swimlane import (
SwimlaneInvoker, build_swimlane_tool,
)
from arbiter_ops.action.domain.models import ProposedAction
tool = build_swimlane_tool(
tool_id="swimlane/turbine",
base_url="https://swimlane.acme.io",
)
invoker = SwimlaneInvoker(
base_url="https://swimlane.acme.io",
api_token="",
http_client=httpx_client, # injected
)
action = ProposedAction(
tool_id="swimlane/turbine",
capability_id="swimlane.run_playbook",
parameters={"playbook_id": "pb-block-user", "inputs": {"user_id": "u-123"}},
aiops_request_id="req-abc",
aiops_decision_id="dec-xyz",
aiops_tenant_id="tenant-acme",
idempotency_key="idem-2031-01",
)
result = await invoker.invoke(tool, action)
# {'capability': 'swimlane.run_playbook',
# 'playbook_id': 'pb-block-user',
# 'run_id': 'swim-run-123',
# 'status_code': 200, ...}
from arbiter_ops.action.adapters.invokers.sentinel import SentinelInvoker
invoker = SentinelInvoker(
default_callback_url="https://prod-15.eastus.logic.azure.com/...&sig=...",
azure_ad_token_provider=lambda: get_azure_ad_token(), # for ARM update_incident
http_client=httpx_client,
)
# Logic Apps webhook — signed URL, no auth header
res = await invoker.invoke(tool, ProposedAction(
tool_id="sentinel",
capability_id="sentinel.run_playbook",
parameters={"trigger_inputs": {"alert_id": "A-77"}},
...
))
print(res["workflow_run_id"]) # extracted from x-ms-workflow-run-id response header
Workflows ship as YAML cards under src/arbiter_ops/workflow/workflows/<name>/. The default substrate is
the agent-workflow open-standard board protocol (Apache-2.0).
# Example: aiops-incident workflow card
src/arbiter_ops/workflow/workflows/aiops-incident/
├── workflow.yaml # state machine + transitions
└── card.schema.json # JSON Schema for the workflow card
# Drive a board run
from arbiter_ops.workflow.adapters import LocalBoardClient
client = LocalBoardClient()
run_id = await client.start("aiops-incident", inputs={"incident_id": "INC-2031"})
status = await client.status(run_id)
FastAPI app · 8 routers · launchable via console entry-point.
arbiter-ops-control --host 0.0.0.0 --port 8001 --hil-gateway in_process
# Environment overrides
ARBITER_OPS_CONTROL_HOST=0.0.0.0
ARBITER_OPS_CONTROL_PORT=8001
ARBITER_OPS_CONTROL_HIL_GATEWAY=auto_approve # auto_approve | auto_reject | in_process
| Router | Purpose | Sample endpoints |
|---|---|---|
tenant | Per-tenant config + approach-band overlays | GET/POST /tenants · PUT /tenants/{id}/overlay |
policy | Policy CRUD · evolution gates · drift kill rules | GET/POST /policies · POST /policies/{id}/promote |
rbac | Roles · permissions · principal management | GET /principals · POST /roles/{id}/permissions |
killswitch | Global + per-tenant + per-capability emergency stops | POST /killswitch/global · POST /killswitch/tenant/{id} |
workflow | Start / status / cancel workflow runs | POST /workflow/start · GET /workflow/{run_id} |
conformance | Live 9-plane conformance probe | GET /conformance/probe · GET /conformance/status |
substrate | Substrate health + version + readiness | GET /healthz · GET /readyz · GET /version |
telemetry | Metrics surface + decision-event tap | GET /metrics · GET /events?since=... |
curl -H "x-arbiter-ops-operator-subject: alice@acme.io" \
-H "x-arbiter-ops-operator-role: PLATFORM_ADMIN" \
-H "x-arbiter-ops-tenant-scope: tenant-acme,tenant-globex" \
http://localhost:8001/healthz
arbiter_ops.control.application.authz.make_identity_resolver(...).
| Command | Module | Use |
|---|---|---|
arbiter-ops | arbiter_ops.cli | Top-level CLI · serve · migrate · smoke-test |
arbiter-ops-control | arbiter_ops.control.server | Control plane HTTP server (port 8001 default) |
arbiter-ops-hil-worker | arbiter_ops.hil.worker | Temporal HIL worker |
arbiter-ops-hil-submit | arbiter_ops.hil.cli.submit | Submit a HIL gate from the CLI |
arbiter-ops-agent | arbiter_ops.agent.server | Long-running agent loop (E201) |
arbiter-ops-improve | arbiter_ops.improvement.server | GA campaign server |
arbiter-ops-triage-rooms | arbiter_ops.triage_room.server | Slack/Teams triage-room bot |
arbiter-opsctl | arbiter_ops.operator.cli.main | Recipe-driven incident-response CLI |
| Extra | Adds | Default fallback | Used by |
|---|---|---|---|
ml-decision | xgboost · numpy · sklearn | HeuristicTriageClassifier | Triage classifier (decision plane) |
ml-intelligence | xgboost · numpy · sklearn | HistoricalMeanCostPredictor | Cost predictor (intelligence plane) |
ml-surrogate | xgboost · numpy · scipy | ReplayFitnessEvaluator (ground truth) | GA surrogate fitness (improvement plane) |
slack | slack_sdk | — | Slack triage-room bot |
neo4j | neo4j | in-memory context | Context plane (entity + topology) |
integrations | requests · httpx | — | SOAR invokers (action plane) |
llm | anthropic | — | Provider-neutral intelligence (direct SDK) |
litellm | litellm>=1.50 | — | LiteLLM cross-provider reasoner (litellm_reasoner.py) |
portkey | portkey-ai>=1.8 | — | Portkey AI Gateway · reasoner + invoker (portkey_reasoner.py · portkey_invoker.py) · virtual keys · semantic caching · trace_id |
kafka | confluent-kafka | in-memory event bus | Event-stream sensing |
smt | z3-solver>=4.13 | — | SMT verifier (reasoning Layer 2a · D-10) |
all | everything above | — | — |
is_loaded()
returns False on load failure, the call returns a graceful default (uniform distribution / cold-fallback / heuristic),
and the substrate keeps running. Operators install the extra when they're ready · zero behavior change for users who
stick with the default path.
python -m pytest packages/arbiter-ops/tests/ -q
# 1118 passed, 8 skipped in ~30s
python scripts/arbiter_ops_leak_check.py
# leaks: 0 across 482 files scanned
The regex catches references to upstream product names anywhere in the package tree. The full pattern lives in
scripts/arbiter_ops_leak_check.py as PAT. CI fails on any non-zero count.
...Port ABC under arbiter_ops.<plane>.domain.ports.arbiter_ops.<plane>.adapters.<your_adapter>.tests/<plane>/test_<your_adapter>.py using the shared port-conformance fixture.pytest + leak check.packages/arbiter-ops/
├── src/arbiter_ops/
│ ├── action/ decision/ improvement/ intelligence/
│ ├── workflow/ sensing/ context/ feature/
│ ├── reasoning/ governance/ evidence/ hil/
│ ├── control/ agent/ triage_room/ operator/
│ ├── conformance/ identity/ knowledge/
│ └── cli.py
├── tests/ # 1034 passing
├── artifacts/ # trained reference models
│ ├── triage_classifier/v1/
│ ├── cost_predictor/
│ └── fitness_surrogate/
├── scripts/ # train_*.py CLIs
├── docs/ # this guide + per-feature .md
├── pyproject.toml # name = "arbiter-ops" · ml-* extras
└── README.md
XGBoostTriageClassifier.is_loaded() returns FalseThe model artifacts didn't load — likely missing model.json, class_map.json, or
metadata.json in the expected directory, or an XGBoost-version mismatch with the file format.
Operations fall back gracefully to a uniform distribution; the substrate keeps running. To force the heuristic path,
inject HeuristicTriageClassifier directly.
HistoricalMeanCostPredictor reports is_loaded() == FalseNeed ≥10 observations per (model_id, task_class) pair before is_loaded() flips True. Below
that threshold the predictor is cold and the router falls back to RoutingPolicy.default_model_id. Pre-warm
with a replay step at startup.
TypeError: _estimator_type undefined when loading an XGBoost modelXGBoost 2.x sklearn-wrapper bug. Models in this package are saved with
model.get_booster().save_model(path) and loaded via xgb.Booster().load_model(path) —
which sidesteps the bug. If you trained with the sklearn API directly, re-export the booster.
Raise divergence_threshold (default 0.05 MAE on scalar fitness), increase
validation_sample_size, or extend fallback_window_generations. Drift events are also a signal
that the corpus has shifted — retraining the surrogate may be more useful than tuning the gate.
run_playbook returns 401Logic Apps callback URLs are SAS-signed and contain the auth in the URL itself — no Authorization
header. If the URL has expired (Logic Apps SAS rotates), regenerate it from the Logic App in Azure portal. ARM
update_incident uses Azure AD Bearer; check azure_ad_token_provider wiring.
Default LocalAuditAdapter writes to stdout. Set ARBITER_OPS_AUDIT_PATH to a writable file
path, or register a hosted backend through the application container. Records flush per call — no buffering.
ModuleNotFoundError: agentic_opsStale install from before the rename. Run pip uninstall agentic-ops -y followed by
pip install -e packages/arbiter-ops to clear it.