Metadata-Version: 2.3
Name: cyvest
Version: 5.4.0
Summary: Cybersecurity investigation model
Keywords: cybersecurity,investigation,threat-intel,security-analysis
Author: PakitoSec
Author-email: PakitoSec <jeromep83@gmail.com>
License: MIT
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Security
Requires-Dist: click>=8
Requires-Dist: logurich[click]>=0.9.0
Requires-Dist: pydantic>=2.12.5
Requires-Dist: rich>=13
Requires-Dist: typing-extensions>=4.15
Requires-Dist: pyvis>=0.3.2 ; extra == 'visualization'
Requires-Python: >=3.10
Project-URL: Homepage, https://github.com/PakitoSec/cyvest
Project-URL: Repository, https://github.com/PakitoSec/cyvest
Project-URL: Issues, https://github.com/PakitoSec/cyvest/issues
Provides-Extra: visualization
Description-Content-Type: text/markdown

# Cyvest - Cybersecurity Investigation Framework

[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

**Cyvest** is a Python framework for building, analyzing, and structuring cybersecurity investigations programmatically. It provides automatic scoring, level calculation, relationship tracking, and rich reporting capabilities.

## Features

- 🔍 **Structured Investigation Modeling**: Model investigations with observables, checks, threat intelligence, and enrichments
- 📊 **Automatic Scoring**: Dynamic score calculation and propagation through investigation hierarchy
- 🎯 **Level Classification**: Automatic security level assignment (TRUSTED, INFO, SAFE, NOTABLE, SUSPICIOUS, MALICIOUS)
- 🔗 **Relationship Tracking**: Lightweight relationship modeling between observables
- 🏷️ **Typed Helpers**: Built-in enums for observable types and relationships with autocomplete
- 📈 **Real-time Statistics**: Live metrics and aggregations throughout the investigation
- 🔄 **Investigation Merging**: Combine investigations from multiple threads or processes
- 🧵 **Multi-Threading Support**: Advanced thread-safe shared context available via `cyvest.shared`
- 💾 **Multiple Export Formats**: JSON and Markdown output for reporting and LLM consumption
- 🎨 **Rich Console Output**: Beautiful terminal displays with the Rich library
- 🧩 **Fluent helpers**: Convenient API with method chaining for rapid development
- 🔬 **Investigation Comparison**: Compare investigations with tolerance rules and visual diff output
- 🔎 **Observable Extraction**: Extract IOCs (IPs, URLs, domains, emails, hashes) from text, markdown, or web pages with defang/refang support

## Installation

### Using uv (recommended)

```bash
# Install uv if not already installed
curl -LsSf https://astral.sh/uv/install.sh | sh

# Clone the repository
git clone https://github.com/PakitoSec/cyvest.git
cd cyvest

# Install dependencies
uv sync

# Install in development mode
uv pip install -e .
```

### Using pip

```bash
pip install -e .
```

>  Install the optional visualization extra with\
> `pip install "cyvest[visualization]"` (or `uv pip install -e ".[visualization]"`).

## Quick Start

```python
from decimal import Decimal
from cyvest import Cyvest

# Create an investigation (root_data becomes the root observable extra)
cv = Cyvest(root_data={"type": "email"})

# For deterministic reports (enables diffing between runs), pass a custom investigation_id:
# cv = Cyvest(root_data={"type": "email"}, investigation_id="email-analysis-v1")

# Create observables
url = (
    cv.observable(cv.OBS.URL, "https://phishing-site.com", internal=False)
    .with_ti("virustotal", score=Decimal("8.5"), level=cv.LVL.MALICIOUS)
    .relate_to(cv.root(), cv.REL.RELATED_TO)
)

# Create checks
check = cv.check("url_analysis", "email_body", "Analyze suspicious URL")
check.link_observable(url)
check.with_score(Decimal("8.5"), "Malicious URL detected")

# Display results
print(f"Global Score: {cv.get_global_score()}")
print(f"Global Level: {cv.get_global_level()}")

# Export
cv.io_save_json("investigation.json")
```

### Model Proxies

Cyvest only exposes immutable model proxies. Helpers like `observable_create`, `check_create`, and the
fluent `cv.observable()`/`cv.check()` convenience methods return `ObservableProxy`, `CheckProxy`, `TagProxy`, etc.
These proxies reflect the live investigation state but raise `AttributeError` if you try to assign to their attributes.
All mutations are routed through the Investigation layer, so use the facade helpers (`cv.observable_set_level`,
`cv.check_update_score`, `cv.observable_add_threat_intel`) or the built-in fluent methods on the proxies themselves
(`with_ti`, `relate_to`, `link_observable`, `with_score`, `set_level`, …) so the score engine and audit log stay consistent.

Mutation helpers that reference existing objects (for example, `cv.observable_add_relationship`,
`cv.check_link_observable`, `cv.tag_add_check`) raise `KeyError` when a key is missing.

Safe metadata fields like `comment`, `extra`, or `internal` can be updated through the proxies without breaking score
consistency. Use `set_level()` to update the level without changing the score:

```python
url_obs.update_metadata(comment="triaged", internal=False, extra={"ticket": "INC-4242"})
check.update_metadata(description="New scope", extra={"playbook": "url-analysis"})
check.set_level(cv.LVL.SAFE, reason="Verified clean")
```

Dictionary fields merge by default; pass `merge_extra=False` (or `merge_data=False` for enrichments) to overwrite them.

### Threat Intel Drafts

When the observable is unknown yet, create a draft and attach it later:

```python
draft = cv.threat_intel_draft("vt", score=Decimal("4.2"), comment="Initial lookup")
obs = cv.observable(cv.OBS.DOMAIN, "example.com")
obs.with_ti_draft(draft)
```

Drafts are plain `ThreatIntel` objects with no `observable_key` yet; attaching generates the key.

## Core Concepts

### Observables

Observables represent cyber artifacts (URLs, IPs, domains, hashes, files, etc.).

```python
from cyvest import Cyvest

cv = Cyvest()

url_obs = cv.observable_create(cv.OBS.URL, "https://malicious.com", internal=False)

ip_obs = cv.observable_create(cv.OBS.IPV4, "192.0.2.1", internal=False)

cv.observable_add_relationship(
    url_obs,  # Can pass ObservableProxy directly
    ip_obs,   # Or use .key for string keys
    cv.REL.RELATED_TO,
    cv.DIR.BIDIRECTIONAL,
)
```

Cyvest exposes enums for observable types and relationships via the facade (`cv.OBS`, `cv.REL`, `cv.DIR`)
so IDEs can autocomplete the official vocabulary without extra imports.

### Checks

Checks represent verification steps in your investigation:

```python
check = cv.check_create(
    check_id="malware_detection",
    scope="endpoint",
    description="Verify file hash against threat intel",
    score=Decimal("8.0"),
    level=cv.LVL.MALICIOUS
)

# Link observables to checks
cv.check_link_observable(check.key, file_hash_obs.key)
```

### Threat Intelligence

Threat intelligence provides verdicts from external sources:

```python
cv.observable_add_threat_intel(
    observable.key,
    source="virustotal",
    score=Decimal("7.5"),
    level=cv.LVL.SUSPICIOUS,
    comment="15/70 vendors flagged as malicious",
    taxonomies=[cv.taxonomy(level=cv.LVL.MALICIOUS, name="scan", value="trojan")]
)
```

Taxonomies are unique by name per threat intel entry. Use the fluent helpers to add or remove them:

```python
ti = cv.observable_add_threat_intel(observable.key, source="vt", score=Decimal("7.5"))
ti.add_taxonomy(level=cv.LVL.SUSPICIOUS, name="confidence", value="medium")
ti.remove_taxonomy("confidence")
```

### Tags

Tags organize checks with automatic hierarchy based on `:` delimiter:

```python
# Simple: pass tag names directly (auto-creates tags)
check = cv.check("beacon_detection", "Detect C2 beacons")
check.tagged("network", "c2:detection", "suspicious")

# With description: create tag first, then reference it
tag = cv.tag("network:c2:detection", "C2 Detection Checks")
check.tagged(tag)

# Query hierarchy
children = cv.tag_get_children("network")  # ["network:c2"]
descendants = cv.tag_get_descendants("network")  # ["network:c2", "network:c2:detection"]
```

### Lookup Helpers

Use facade getters with either key strings or component parameters:

```python
url_obs = cv.observable_create(cv.OBS.URL, "https://malicious.com")
same_url = cv.observable_get(cv.OBS.URL, "https://malicious.com")
same_url_by_key = cv.observable_get(url_obs.key)

check = cv.check_create("malware_detection", "endpoint", "Verify file hash")
same_check = cv.check_get("malware_detection", "endpoint")
same_check_by_key = cv.check_get(check.key)

tag = cv.tag_create("network:analysis")
same_tag = cv.tag_get("network:analysis")
same_tag_by_key = cv.tag_get(tag.key)

enrichment = cv.enrichment_create("whois", {"registrar": "Example Inc"})
same_enrichment = cv.enrichment_get("whois")
same_enrichment_by_key = cv.enrichment_get(enrichment.key)
```

Low-level `Investigation` getters accept keys only; use the facade for component-based lookups.

### Multi-Threaded Investigations

**Advanced Feature**: Use `Cyvest.shared_context()` (or `SharedInvestigationContext` from `cyvest.shared`) for safe parallel task execution with automatic observable sharing:

```python
from cyvest import Cyvest
from concurrent.futures import ThreadPoolExecutor, as_completed

def email_analysis(shared_context):
    # create_cyvest() yields a task-local Cyvest that auto-merges on context exit
    with shared_context.create_cyvest() as cy:
        data = cy.root().extra
        cy.observable(cy.OBS.DOMAIN, data.get("domain"))

# Create shared context
main_cy = Cyvest(root_data=email_data, root_type=Cyvest.OBS.ARTIFACT)
shared = main_cy.shared_context()

# Run tasks in parallel - they can reference each other's observables
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(email_analysis, shared) for _ in tasks]
    for future in as_completed(futures):
        future.result()  # Auto-reconciled

# Get merged investigation (same object passed to shared_context)
final_cy = main_cy
```

See `examples/04_email.py` for a complete multi-threaded investigation example.

### Scoring & Levels

Scores and levels are automatically calculated and propagated:

- **Threat Intel → Observable**: Observable score = **max** of all threat intel scores (not sum)
- **Observable Hierarchy**: Parent observable scores include child observable scores based on relationship direction:
  - **OUTBOUND relationships**: target scores propagate to source (source is parent)
  - **INBOUND relationships**: source scores propagate to target (target is parent)
  - **BIDIRECTIONAL relationships**: no hierarchical propagation
- **Observable → Check (provenance-aware)**: Check score/level only considers observables reachable through *effective* links (`observable_links`)
  - A link is effective when `propagation_mode="GLOBAL"` or when the check's `origin_investigation_id` matches the current investigation id
- **Check → Global**: All check scores sum to global investigation score

Observable score aggregation is configurable via `score_mode_obs`:

```python
from cyvest import Cyvest
from cyvest.score import ScoreMode

cv = Cyvest(score_mode_obs=ScoreMode.MAX)  # default
cv = Cyvest(score_mode_obs=ScoreMode.SUM)  # accumulative children
```

**Provenance model**

- `Investigation.investigation_id` is a stable ULID included in exports.
- Checks keep a *canonical origin* (`origin_investigation_id`) for LOCAL_ONLY propagation; it is compared against the current investigation id.

**Audit log**

- All meaningful changes (including score/level changes) are recorded in the investigation-level audit log.
- Per-object histories are not stored; use `cv.investigation_get_audit_log()` to review changes.
- For compact, deterministic JSON output (useful for testing/diffing), exclude the audit log:
  ```python
  cv.io_save_json("output.json", include_audit_log=False)  # audit_log: null
  cv.io_to_invest(include_audit_log=False)  # schema.audit_log is None
  ```

To force cross-investigation propagation for a specific link, use a GLOBAL link:

```python
cv.check_link_observable(check.key, observable.key, propagation_mode="GLOBAL")
# or fluent:
cv.check("id", "scope", "desc").link_observable(observable, propagation_mode="GLOBAL")
```

Score to Level mapping:

- `< 0.0` → TRUSTED
- `== 0.0` → INFO
- `< 3.0` → NOTABLE
- `< 5.0` → SUSPICIOUS
- `>= 5.0` → MALICIOUS

**SAFE Level Protection:**

The SAFE level has special protection for trusted/whitelisted observables:

```python
# Mark a known-good domain as SAFE
trusted = cv.observable_create(
    cv.OBS.DOMAIN,
    "trusted.example.com",
    level=cv.LVL.SAFE
)

# Adding low-score threat intel won't downgrade to TRUSTED or INFO
cv.observable_add_threat_intel(trusted.key, "source1", score=Decimal("0"))
# Level stays SAFE, score updates to 0

# But high-score threat intel can still upgrade to MALICIOUS if warranted
cv.observable_add_threat_intel(trusted.key, "source2", score=Decimal("6.0"))
# Level upgrades to MALICIOUS, score updates to 6.0

# Threat intel with SAFE level can also mark observables as SAFE
uncertain = cv.observable_create(cv.OBS.DOMAIN, "example.com")
cv.observable_add_threat_intel(
    uncertain.key,
    "whitelist_service",
    score=Decimal("0"),
    level=cv.LVL.SAFE
)
# Observable upgraded to SAFE level with automatic downgrade protection
```

SAFE observables:
- Cannot be downgraded to lower levels (NONE, TRUSTED, INFO)
- Can be upgraded to higher levels (NOTABLE, SUSPICIOUS, MALICIOUS)
- Score values still update based on threat intelligence
- Protection is preserved during investigation merges
- Can be marked SAFE by threat intel sources (e.g., whitelists, reputation databases)

SAFE checks:
- Automatically inherit SAFE level when linked to SAFE observables (if all other observables are ≤ SAFE)
- Can still upgrade to higher levels when NOTABLE/SUSPICIOUS/MALICIOUS observables are linked

**Root Observable Barrier:**

The root observable (the investigation's entry point with `value="root"`) acts as a special barrier to prevent cross-contamination:
Its key is derived from type + value (e.g. `obs:file:root` or `obs:artifact:root`).

**Barrier as Child** - When root appears as a child of other observables, it is **skipped** in their score calculations.

**Barrier as Parent** - Root's propagation is asymmetric:
- Root **CAN** be updated when children change (aggregates child scores)
- Root **does NOT** propagate upward beyond itself (stops recursive propagation)
- Root **DOES** propagate to checks normally

This design enables flexible investigation structures while preventing unintended score contamination.

### Comparing Investigations

Compare two investigations to identify differences in checks, observables, and threat intelligence:

```python
from decimal import Decimal
from cyvest import Cyvest, ExpectedResult, Level, compare_investigations
from cyvest.io_rich import display_diff

# Create expected and actual investigations
expected = Cyvest(investigation_name="expected")
expected.check_create("domain-check", "Verify domain", score=Decimal("1.0"))

actual = Cyvest(investigation_name="actual")
actual.check_create("domain-check", "Verify domain", score=Decimal("2.0"))
actual.check_create("new-check", "New detection", score=Decimal("1.5"))

# Compare investigations
diffs = compare_investigations(actual, expected)
# diffs contains:
#   - MISMATCH for domain-check (score changed 1.0 -> 2.0)
#   - ADDED for new-check
```

**Tolerance Rules**

Use `result_expected` rules to define acceptable score variations:

```python
# Define tolerance rules
rules = [
    # Accept any score >= 1.0 for this check
    ExpectedResult(check_name="domain-check", score=">= 1.0"),
    # Accept any score < 3.0 for roger-ai
    ExpectedResult(key="chk:roger-ai", level=Level.SUSPICIOUS, score="< 3.0"),
]

# Compare with tolerance - checks satisfying rules are not flagged as diffs
diffs = compare_investigations(actual, expected, result_expected=rules)
```

Supported operators: `>=`, `<=`, `>`, `<`, `==`, `!=`

**Visual Diff Display**

Display differences in a rich table format:

```python
from cyvest.io_rich import display_diff
from logurich import logger

# Display diff table with tree structure showing observables and threat intel
display_diff(diffs, lambda r: logger.rich("INFO", r), title="Investigation Diff")
```

Output:
```
╭────────────────────────────────────────────────┬────────────────────┬─────────────────┬────────╮
│ Key                                            │      Expected      │     Actual      │ Status │
├────────────────────────────────────────────────┼────────────────────┼─────────────────┼────────┤
│ chk:new-check                                  │         -          │  NOTABLE 1.50   │   +    │
│ └── domain: example.com                        │         -          │   INFO 0.00     │        │
│     └── VirusTotal                             │         -          │   INFO 0.00     │        │
├────────────────────────────────────────────────┼────────────────────┼─────────────────┼────────┤
│ chk:domain-check                               │   NOTABLE 1.00     │  NOTABLE 2.00   │   ✗    │
╰────────────────────────────────────────────────┴────────────────────┴─────────────────┴────────╯
```

Status symbols: `+` (added), `-` (removed), `✗` (mismatch)

**Convenience Methods**

Use methods directly on Cyvest objects:

```python
# Compare and get diff items
diffs = actual.compare(expected=expected, result_expected=rules)

# Compare and display in one call
actual.display_diff(expected=expected, title="My Investigation Diff")
```

## Examples

See the `examples/` directory for complete examples:

- **01_email_basic.py**: Basic email phishing investigation
- **02_urls_and_ips.py**: Network investigation with URLs and IPs
- **03_merge_demo.py**: Multi-process investigation merging
- **04_email.py**: Multi-threaded investigation with SharedInvestigationContext
- **05_visualization.py**: Interactive HTML visualization showcasing scores, levels, and relationship flows
- **06_compare_investigations.py**: Compare investigations with tolerance rules and visual diff output

Run an example:

```bash
python examples/01_email_basic.py
python examples/04_email.py
python examples/05_visualization.py
```

## CLI Usage

Cyvest includes a command-line interface for working with investigation files:

```bash
# Display investigation
cyvest show investigation.json --graph

# Show statistics
cyvest stats investigation.json --detailed

# Export to markdown
cyvest export investigation.json -o report.md -f markdown

# Merge investigations with automatic deduplication
cyvest merge inv1.json inv2.json inv3.json -o merged.json

# Merge with statistics display
cyvest merge inv1.json inv2.json -o merged.json --stats

# Merge and display rich summary
cyvest merge inv1.json inv2.json -o merged.json -f rich --stats

# Generate an interactive visualization (requires visualization extra)
cyvest visualize investigation.json --min-level SUSPICIOUS --group-by-type

# Extract observables (IOCs) from text
echo "Check IP 192.168.1.1 and https://evil.com" | cyvest extract
cyvest extract report.txt -t url -t ip -o iocs.txt
cyvest extract --from-url https://example.com/indicators.txt -f json

# Output the JSON Schema describing serialized investigations and generate types
uv run cyvest schema -o ./schema/cyvest.schema.json && pnpm -C js/packages/cyvest-js run generate:types
```

### Observable Extraction

Extract cyber observables (IOCs) from raw text, markdown, or web pages:

```bash
# From stdin (pipe text)
echo "Malicious IP: 192[.]168[.]1[.]1, URL: hxxps://evil[.]com/malware" | cyvest extract

# From file
cyvest extract threat_report.txt

# Filter by type
cyvest extract report.txt -t url -t ip -t hash

# Output as JSON
cyvest extract report.txt -f json -o extracted.json

# Markdown output for LLM consumption
cyvest extract report.txt --format markdown --title "Threat IOCs"
cyvest extract report.txt --format markdown-table --defang-output

# Fetch from URL and extract
cyvest extract --from-url https://example.com/ioc-feed.txt

# Keep defanged format (don't refang)
cyvest extract -R < defanged_iocs.txt
```

**Supported observable types:**

| Type | Description | Examples |
|------|-------------|----------|
| `url` | URLs with various schemes | http, https, ftp, sftp, tcp, udp |
| `ip` / `ipv4` / `ipv6` | IP addresses | 192.168.1.1, 2001:db8::1 |
| `email` | Email addresses | user@example.com |
| `hash` | Cryptographic hashes | MD5, SHA1, SHA256, SHA512 |
| `domain` | Domain names | example.com |

**Defanged indicator support:**

- URLs: `hxxp://`, `hxxps://`, `[.]`, `[/]`
- IPs: `192[.]168[.]1[.]1`, `10(dot)0(dot)0(dot)1`
- Emails: `user[@]example.com`, `user at example.com`

**Programmatic usage:**

```python
from cyvest.extract import (
    extract_all,
    extract_from_url,
    refang,
    defang,
    observables_to_markdown,
    observables_to_markdown_table,
)

# Extract from text
text = "Check IP 192[.]168[.]1[.]1 and hxxps://evil[.]com"
observables = extract_all(text)
for obs in observables:
    print(f"{obs.obs_type.value}: {obs.value} (count: {obs.count})")

# Generate markdown for LLM consumption
md = observables_to_markdown(observables, title="Extracted IOCs", group_by_type=True)
print(md)

# Generate compact markdown table
table = observables_to_markdown_table(observables, defang_values=True)
print(table)

# Extract from URL
observables = extract_from_url("https://example.com/ioc-feed.txt")

# Refang/defang utilities
safe_text = defang("https://malware.com")  # -> hxxps://malware[.]com
original = refang("hxxps://malware[.]com")  # -> https://malware.com
```

## Development

### Setup Development Environment

```bash
# Install development dependencies
uv sync --all-extras

# Run tests
pytest

# Run tests with coverage
pytest --cov=cyvest --cov-report=html

# Format code
ruff format .

# Lint code
ruff check .
```

### Running Tests

```bash
# Run all tests
pytest

# Run specific test file
pytest tests/test_score.py

# Run with verbose output
pytest -v

# Run with coverage
pytest --cov=cyvest
```

## Documentation

Build the documentation with MkDocs:

```bash
# Install docs dependencies
uv sync --all-extras

# Serve documentation locally
mkdocs serve

# Build documentation
mkdocs build
```

## JavaScript packages

The repo includes a PNPM workspace under `js/` with three packages:

- `@cyvest/cyvest-js`: TypeScript types, schema validation, and helpers for Cyvest investigations.
- `@cyvest/cyvest-vis`: React components for graph visualization (Cytoscape + ELK/Dagre, depends on `@cyvest/cyvest-js`).
- `@cyvest/cyvest-app`: Vite demo that bundles the JS packages with sample investigations.

The JS packages track the generated schema; serialized investigations should include fields like
`investigation_id`, `investigation_name`, `audit_log`, `score_display`, `check_links`, and
`observable_links`. The investigation start time is recorded as an `INVESTIGATION_STARTED` event
in the `audit_log`.

See `docs/js-packages.md` for workspace commands and usage snippets.

## Contributing

Contributions are welcome! Please:

1. Fork the repository
2. Create a feature branch
3. Make your changes with tests
4. Run the test suite
5. Submit a pull request

## License

This project is licensed under the MIT License - see the LICENSE file for details.

## Use Cases

Cyvest is designed for:

- **Security Operations Centers (SOCs)**: Automate investigation workflows
- **Incident Response**: Structure and document incident investigations
- **Threat Hunting**: Build repeatable hunting methodologies
- **Malware Analysis**: Track relationships between artifacts
- **Phishing Analysis**: Analyze emails and linked resources
- **Integration**: Combine results from multiple security tools
- **Regression Testing**: Compare investigation outputs across rule or detection updates

## Architecture Highlights

- **Concurrency**: Advanced `SharedInvestigationContext` (via `cyvest.shared`) enables safe parallel task execution
- **Deterministic Keys**: Same objects always generate same keys for merging
- **Deterministic IDs**: Optional `investigation_id` parameter for reproducible reports and diffing
- **Score Propagation**: Automatic hierarchical score calculation
- **Flexible Export**: JSON for storage, Markdown for LLM analysis
- **Audit Trail**: Score change history for debugging
- **Investigation Comparison**: Compare investigations with tolerance rules for regression testing

## Future Enhancements

- Database persistence layer
- Additional export formats (PDF, HTML)
