Skip to content

Compliance Checking Tutorial

This tutorial covers using the built-in compliance frameworks (EU AI Act, SOC2, HIPAA) to evaluate AI system operations against regulatory requirements.

Overview

Compliance checking evaluates audit entries against regulatory frameworks to identify:

  • Violations -- Specific rules that were not followed
  • Risk levels -- Severity of identified issues
  • Remediation -- Steps to fix violations
  • Gaps -- Areas needing improvement

Understanding Compliance Frameworks

Framework Structure

Each framework consists of rules organized by category:

Framework (e.g., EU AI Act)
├── Category: Transparency
│   ├── Rule: User Notification (EUAI-002)
│   └── ...
├── Category: Oversight
│   ├── Rule: Human Oversight Documentation (EUAI-001)
│   └── ...
├── Category: Risk Management
│   ├── Rule: Risk Assessment (EUAI-003)
│   └── ...
└── ...

Creating Audit Entries for Compliance

Audit entries need specific fields for compliance checking:

from datetime import datetime
from rotalabs_comply.frameworks.base import AuditEntry, RiskLevel

entry = AuditEntry(
    # Required identifiers
    entry_id="entry-001",
    timestamp=datetime.utcnow(),

    # Event classification
    event_type="inference",       # Type of operation
    actor="user@example.com",     # Who performed it
    action="AI response generation",

    # Resource being accessed
    resource="customer_support_model",
    system_id="prod-ai-001",

    # Risk and data classification
    risk_level=RiskLevel.HIGH,
    data_classification="confidential",

    # Compliance-relevant flags
    user_notified=True,           # Transparency: user knows about AI
    human_oversight=True,         # Oversight: human reviewed
    error_handled=True,           # Robustness: errors handled gracefully
    documentation_ref="DOC-001",  # Documentation: reference to docs

    # Framework-specific metadata
    metadata={
        # EU AI Act
        "risk_assessment_documented": True,
        "accuracy_monitored": True,
        "security_validated": True,
        "data_governance_documented": True,

        # SOC2
        "access_controlled": True,
        "monitored": True,
        "change_approved": True,

        # HIPAA (if PHI involved)
        "encryption_enabled": True,
        "authenticated": True,
        "purpose_documented": True,
        "minimum_necessary_applied": True,
    },
)

EU AI Act Compliance

The EU AI Act (2024) regulates AI systems based on risk level. The framework focuses on high-risk system requirements.

Rule Categories

Category Focus
transparency User notification of AI interaction
oversight Human oversight documentation
risk_management Risk assessment, error handling, accuracy
documentation Technical and data governance docs
security Cybersecurity measures

Basic Usage

import asyncio
from datetime import datetime
from rotalabs_comply.frameworks.eu_ai_act import EUAIActFramework
from rotalabs_comply.frameworks.base import AuditEntry, ComplianceProfile, RiskLevel

async def main():
    # Create framework
    framework = EUAIActFramework()

    # List available rules
    print("EU AI Act Rules:")
    for rule in framework.rules:
        print(f"  {rule.rule_id}: {rule.name} [{rule.severity.value}]")

    # Create an audit entry
    entry = AuditEntry(
        entry_id="eu-test-001",
        timestamp=datetime.utcnow(),
        event_type="inference",
        actor="api-user",
        action="AI chatbot response",
        risk_level=RiskLevel.HIGH,
        user_notified=True,
        human_oversight=True,
        metadata={
            "risk_assessment_documented": True,
            "security_validated": True,
        },
    )

    # Create profile
    profile = ComplianceProfile(
        profile_id="eu-ai-profile",
        name="EU AI Act Compliance",
        enabled_frameworks=["EU AI Act"],
    )

    # Check compliance
    result = await framework.check(entry, profile)

    print(f"\nCompliance Check Result:")
    print(f"  Compliant: {result.is_compliant}")
    print(f"  Rules checked: {result.rules_checked}")
    print(f"  Rules passed: {result.rules_passed}")

    if result.violations:
        print(f"\nViolations ({len(result.violations)}):")
        for v in result.violations:
            print(f"  [{v.severity.value.upper()}] {v.rule_name}")
            print(f"    Evidence: {v.evidence}")

asyncio.run(main())

Key Requirements

EUAI-001: Human Oversight

# For high-risk operations, human_oversight must be True
entry = AuditEntry(
    ...,
    risk_level=RiskLevel.HIGH,
    human_oversight=True,  # Required for high-risk
)

EUAI-002: Transparency

# For user-facing interactions, user must be notified
entry = AuditEntry(
    ...,
    event_type="inference",  # User-facing event
    user_notified=True,      # User knows it's AI
)

EUAI-003: Risk Assessment

# High-risk operations need documented risk assessment
entry = AuditEntry(
    ...,
    risk_level=RiskLevel.HIGH,
    metadata={"risk_assessment_documented": True},
)

SOC2 Compliance

SOC2 Type II evaluates operational effectiveness of security controls based on AICPA Trust Service Criteria.

Trust Service Categories

Category Code Focus
Security CC Access controls, monitoring, incident response
Availability A SLA monitoring, recovery objectives
Processing Integrity PI Input validation, data accuracy
Confidentiality C Data classification
Privacy P Privacy notices for personal data

Basic Usage

import asyncio
from datetime import datetime
from rotalabs_comply.frameworks.soc2 import SOC2Framework
from rotalabs_comply.frameworks.base import AuditEntry, ComplianceProfile, RiskLevel

async def main():
    framework = SOC2Framework()

    # List categories
    categories = framework.list_categories()
    print(f"SOC2 Categories: {categories}")

    # Create audit entry with SOC2-relevant metadata
    entry = AuditEntry(
        entry_id="soc2-test-001",
        timestamp=datetime.utcnow(),
        event_type="data_access",
        actor="admin@company.com",  # Authenticated user
        action="Query customer database",
        system_id="prod-db-001",
        data_classification="confidential",
        metadata={
            "access_controlled": True,
            "monitored": True,
            "change_approved": True,
        },
    )

    profile = ComplianceProfile(
        profile_id="soc2-profile",
        name="SOC2 Compliance",
        enabled_frameworks=["SOC2 Type II"],
    )

    result = await framework.check(entry, profile)

    print(f"\nSOC2 Check Result:")
    print(f"  Compliant: {result.is_compliant}")
    print(f"  Violations: {len(result.violations)}")

asyncio.run(main())

Key Requirements

CC6.1: Logical Access Controls

# All access events need authentication and authorization
entry = AuditEntry(
    ...,
    event_type="data_access",
    actor="user@company.com",  # Must be authenticated (not "anonymous")
    metadata={"access_controlled": True},
)

CC6.3: Change Management

# Changes need approval and documentation
entry = AuditEntry(
    ...,
    event_type="deployment",
    documentation_ref="CHG-001",  # Change ticket
    metadata={"change_approved": True},
)

C1.1: Confidentiality Classification

# Data must be classified
entry = AuditEntry(
    ...,
    event_type="data_access",
    data_classification="confidential",  # Not "unclassified"
)

HIPAA Compliance

HIPAA applies to systems processing Protected Health Information (PHI). Rules are only evaluated for PHI-related entries.

PHI Detection

The framework identifies PHI-related entries by data classification:

# These classifications trigger HIPAA evaluation
phi_classifications = {
    "PHI", "ePHI", "protected_health_information",
    "health_data", "medical", "clinical"
}

Basic Usage

import asyncio
from datetime import datetime
from rotalabs_comply.frameworks.hipaa import HIPAAFramework
from rotalabs_comply.frameworks.base import AuditEntry, ComplianceProfile, RiskLevel

async def main():
    framework = HIPAAFramework()

    # PHI-related entry (HIPAA rules apply)
    phi_entry = AuditEntry(
        entry_id="hipaa-test-001",
        timestamp=datetime.utcnow(),
        event_type="inference",
        actor="doctor@hospital.com",
        action="AI diagnostic assistance",
        data_classification="PHI",  # Triggers HIPAA
        metadata={
            "access_controlled": True,
            "encryption_enabled": True,
            "authenticated": True,
            "purpose_documented": True,
            "minimum_necessary_applied": True,
        },
    )

    # Non-PHI entry (HIPAA rules don't apply)
    non_phi_entry = AuditEntry(
        entry_id="hipaa-test-002",
        timestamp=datetime.utcnow(),
        event_type="inference",
        actor="user@company.com",
        action="General AI query",
        data_classification="internal",  # Not PHI
    )

    profile = ComplianceProfile(
        profile_id="hipaa-profile",
        name="HIPAA Compliance",
        enabled_frameworks=["HIPAA"],
    )

    # Check PHI entry
    result1 = await framework.check(phi_entry, profile)
    print(f"PHI Entry - Rules checked: {result1.rules_checked}")

    # Check non-PHI entry
    result2 = await framework.check(non_phi_entry, profile)
    print(f"Non-PHI Entry - Rules checked: {result2.rules_checked}")  # 0

asyncio.run(main())

Key Requirements

164.312(a): Access Control

# PHI access requires authentication, authorization, and encryption
entry = AuditEntry(
    ...,
    data_classification="PHI",
    actor="nurse@hospital.com",  # Authenticated
    metadata={
        "access_controlled": True,
        "encryption_enabled": True,
    },
)

164.312(d): Authentication

# Strong authentication required, MFA for high-risk
entry = AuditEntry(
    ...,
    event_type="bulk_access",  # High-risk operation
    data_classification="PHI",
    metadata={
        "authenticated": True,
        "mfa_verified": True,  # Required for high-risk
    },
)

164.502: Minimum Necessary

# PHI use must be limited to minimum necessary
entry = AuditEntry(
    ...,
    data_classification="PHI",
    metadata={
        "purpose_documented": True,
        "minimum_necessary_applied": True,
    },
)

Multi-Framework Compliance

Check against multiple frameworks simultaneously:

import asyncio
from datetime import datetime
from rotalabs_comply.frameworks.eu_ai_act import EUAIActFramework
from rotalabs_comply.frameworks.soc2 import SOC2Framework
from rotalabs_comply.frameworks.hipaa import HIPAAFramework
from rotalabs_comply.frameworks.base import AuditEntry, ComplianceProfile, RiskLevel

async def main():
    # Initialize all frameworks
    frameworks = {
        "eu_ai_act": EUAIActFramework(),
        "soc2": SOC2Framework(),
        "hipaa": HIPAAFramework(),
    }

    # Create comprehensive entry
    entry = AuditEntry(
        entry_id="multi-test-001",
        timestamp=datetime.utcnow(),
        event_type="inference",
        actor="clinician@hospital.eu",
        action="AI-assisted diagnosis",
        system_id="diag-ai-001",
        risk_level=RiskLevel.HIGH,
        data_classification="PHI",  # HIPAA relevant
        user_notified=True,
        human_oversight=True,
        documentation_ref="DOC-DIAG-001",
        metadata={
            # EU AI Act
            "risk_assessment_documented": True,
            "accuracy_monitored": True,
            "security_validated": True,

            # SOC2
            "access_controlled": True,
            "monitored": True,

            # HIPAA
            "encryption_enabled": True,
            "authenticated": True,
            "purpose_documented": True,
            "minimum_necessary_applied": True,
        },
    )

    profile = ComplianceProfile(
        profile_id="multi-framework",
        name="Multi-Framework Compliance",
        enabled_frameworks=["EU AI Act", "SOC2 Type II", "HIPAA"],
    )

    # Check against all frameworks
    all_violations = []
    for name, framework in frameworks.items():
        result = await framework.check(entry, profile)
        all_violations.extend(result.violations)
        print(f"\n{name}:")
        print(f"  Compliant: {result.is_compliant}")
        print(f"  Violations: {len(result.violations)}")

    # Overall summary
    print(f"\n=== Overall Summary ===")
    print(f"Total violations: {len(all_violations)}")

    # Group by severity
    for severity in ["critical", "high", "medium", "low"]:
        count = sum(
            1 for v in all_violations
            if v.severity.value.lower() == severity
        )
        if count > 0:
            print(f"  {severity.upper()}: {count}")

asyncio.run(main())

Filtering Compliance Checks

By Category

# Only check security-related rules
profile = ComplianceProfile(
    profile_id="security-only",
    name="Security Focus",
    enabled_categories=["security", "access_control"],
)

By Severity

# Only report medium severity and above
profile = ComplianceProfile(
    profile_id="high-priority",
    name="High Priority Only",
    min_severity=RiskLevel.MEDIUM,
)

Exclude Specific Rules

# Skip specific rules
profile = ComplianceProfile(
    profile_id="customized",
    name="Custom Profile",
    excluded_rules=["EUAI-007", "SOC2-CC8.1"],
)

Handling Violations

Analyzing Violations

from collections import defaultdict

# Group violations by framework
by_framework = defaultdict(list)
for v in all_violations:
    by_framework[v.framework].append(v)

# Group by severity
by_severity = defaultdict(list)
for v in all_violations:
    by_severity[v.severity.value].append(v)

# Group by category
by_category = defaultdict(list)
for v in all_violations:
    by_category[v.category].append(v)

Remediation Tracking

# Track remediation progress
remediations = {}

for violation in all_violations:
    remediations[violation.rule_id] = {
        "rule_name": violation.rule_name,
        "severity": violation.severity.value,
        "remediation": violation.remediation,
        "status": "pending",
        "assigned_to": None,
        "due_date": None,
    }

# Assign and track
remediations["EUAI-001"]["assigned_to"] = "compliance-team"
remediations["EUAI-001"]["due_date"] = "2026-02-15"
remediations["EUAI-001"]["status"] = "in_progress"

Custom Rule Checks

Add custom validation logic to rules:

from rotalabs_comply.frameworks.base import ComplianceRule, RiskLevel

def check_custom_requirement(entry):
    """Custom check: require specific metadata field."""
    return entry.metadata.get("custom_approval", False)

custom_rule = ComplianceRule(
    rule_id="CUSTOM-001",
    name="Custom Approval Required",
    description="All AI operations require custom approval flag",
    severity=RiskLevel.MEDIUM,
    category="custom",
    check_fn=check_custom_requirement,  # Custom check function
    remediation="Set custom_approval=True in metadata",
)

Best Practices

1. Document Everything

# Ensure documentation references exist for significant events
significant_events = ["deployment", "training", "model_update"]
if entry.event_type in significant_events:
    assert entry.documentation_ref, "Documentation required"

2. Consistent Metadata Schema

# Define required metadata by framework
required_metadata = {
    "eu_ai_act": ["risk_assessment_documented"],
    "soc2": ["access_controlled", "monitored"],
    "hipaa": ["encryption_enabled", "authenticated"],
}

# Validate before logging
def validate_metadata(entry, frameworks):
    for fw in frameworks:
        for field in required_metadata.get(fw, []):
            if field not in entry.metadata:
                raise ValueError(f"Missing {field} for {fw}")

3. Regular Compliance Scans

async def daily_compliance_scan(logger, frameworks, profile):
    """Run daily compliance scan on recent entries."""
    from datetime import datetime, timedelta

    end = datetime.utcnow()
    start = end - timedelta(days=1)

    entries = await logger.get_entries(start, end)

    all_violations = []
    for entry in entries:
        for fw in frameworks.values():
            result = await fw.check(entry, profile)
            all_violations.extend(result.violations)

    return {
        "entries_checked": len(entries),
        "total_violations": len(all_violations),
        "critical": sum(1 for v in all_violations if v.severity.value == "critical"),
        "high": sum(1 for v in all_violations if v.severity.value == "high"),
    }