rtmx
RTMX - Requirements Traceability Matrix toolkit for GenAI-driven development.
This package provides tools for managing requirements traceability in software projects, with special focus on compliance frameworks (CMMC, FedRAMP) and GenAI integration.
Example:
from rtmx import RTMDatabase, Status db = RTMDatabase.load("docs/rtm_database.csv") incomplete = db.filter(status=Status.MISSING) cycles = db.find_cycles()
1"""RTMX - Requirements Traceability Matrix toolkit for GenAI-driven development. 2 3This package provides tools for managing requirements traceability in software projects, 4with special focus on compliance frameworks (CMMC, FedRAMP) and GenAI integration. 5 6Example: 7 >>> from rtmx import RTMDatabase, Status 8 >>> db = RTMDatabase.load("docs/rtm_database.csv") 9 >>> incomplete = db.filter(status=Status.MISSING) 10 >>> cycles = db.find_cycles() 11""" 12 13from rtmx.config import RTMXConfig, load_config 14from rtmx.models import ( 15 Priority, 16 Requirement, 17 RequirementNotFoundError, 18 RTMDatabase, 19 RTMError, 20 RTMValidationError, 21 Status, 22) 23 24__version__ = "0.0.2" 25__all__ = [ 26 "RTMDatabase", 27 "Requirement", 28 "Status", 29 "Priority", 30 "RTMError", 31 "RequirementNotFoundError", 32 "RTMValidationError", 33 "RTMXConfig", 34 "load_config", 35 "__version__", 36]
317class RTMDatabase: 318 """Collection of requirements with query and modification operations. 319 320 The RTMDatabase is the primary interface for working with requirements data. 321 It supports loading from CSV files, querying requirements, and saving changes. 322 323 Example: 324 >>> db = RTMDatabase.load("docs/rtm_database.csv") 325 >>> req = db.get("REQ-SW-001") 326 >>> incomplete = db.filter(status=Status.MISSING) 327 >>> db.update("REQ-SW-001", status=Status.COMPLETE) 328 >>> db.save() 329 """ 330 331 def __init__(self, requirements: list[Requirement], path: Path | None = None) -> None: 332 """Initialize RTM database. 333 334 Args: 335 requirements: List of requirements 336 path: Optional path to the source CSV file 337 """ 338 self._requirements: dict[str, Requirement] = {r.req_id: r for r in requirements} 339 self._path = path 340 self._graph: DependencyGraph | None = None 341 342 @classmethod 343 def load(cls, path: str | Path | None = None) -> Self: 344 """Load RTM database from CSV file. 345 346 Args: 347 path: Path to CSV file. If None, searches for docs/rtm_database.csv 348 349 Returns: 350 RTMDatabase instance 351 352 Raises: 353 RTMError: If file not found or invalid 354 """ 355 from rtmx.parser import find_rtm_database, load_csv 356 357 resolved_path = find_rtm_database() if path is None else Path(path) 358 359 if not resolved_path.exists(): 360 raise RTMError(f"RTM database not found: {resolved_path}") 361 362 requirements = load_csv(resolved_path) 363 return cls(requirements, resolved_path) 364 365 def save(self, path: str | Path | None = None) -> None: 366 """Save RTM database to CSV file. 367 368 Args: 369 path: Path to save to. If None, uses original load path. 370 371 Raises: 372 RTMError: If no path specified and database was not loaded from file 373 """ 374 from rtmx.parser import save_csv 375 376 save_path = Path(path) if path else self._path 377 if save_path is None: 378 raise RTMError("No save path specified and database was not loaded from file") 379 380 save_csv(list(self._requirements.values()), save_path) 381 self._path = save_path 382 383 def get(self, req_id: str) -> Requirement: 384 """Get requirement by ID. 385 386 Args: 387 req_id: Requirement identifier 388 389 Returns: 390 Requirement instance 391 392 Raises: 393 RequirementNotFoundError: If requirement not found 394 """ 395 if req_id not in self._requirements: 396 available = list(self._requirements.keys())[:5] 397 raise RequirementNotFoundError( 398 f"Requirement {req_id} not found. Available: {', '.join(available)}..." 399 ) 400 return self._requirements[req_id] 401 402 def exists(self, req_id: str) -> bool: 403 """Check if requirement exists. 404 405 Args: 406 req_id: Requirement identifier 407 408 Returns: 409 True if requirement exists 410 """ 411 return req_id in self._requirements 412 413 def update(self, req_id: str, **fields: Any) -> Requirement: 414 """Update requirement fields. 415 416 Args: 417 req_id: Requirement identifier 418 **fields: Field name/value pairs to update 419 420 Returns: 421 Updated requirement 422 423 Raises: 424 RequirementNotFoundError: If requirement not found 425 """ 426 req = self.get(req_id) 427 428 for key, value in fields.items(): 429 if key == "status" and isinstance(value, str): 430 value = Status.from_string(value) 431 elif key == "priority" and isinstance(value, str): 432 value = Priority.from_string(value) 433 elif ( 434 key == "dependencies" 435 and isinstance(value, str) 436 or key == "blocks" 437 and isinstance(value, str) 438 ): 439 from rtmx.parser import parse_dependencies 440 441 value = parse_dependencies(value) 442 443 if hasattr(req, key): 444 setattr(req, key, value) 445 else: 446 req.extra[key] = value 447 448 # Invalidate cached graph 449 self._graph = None 450 451 return req 452 453 def add(self, requirement: Requirement) -> None: 454 """Add a new requirement. 455 456 Args: 457 requirement: Requirement to add 458 459 Raises: 460 RTMError: If requirement ID already exists 461 """ 462 if requirement.req_id in self._requirements: 463 raise RTMError(f"Requirement {requirement.req_id} already exists") 464 self._requirements[requirement.req_id] = requirement 465 self._graph = None 466 467 def remove(self, req_id: str) -> Requirement: 468 """Remove a requirement. 469 470 Args: 471 req_id: Requirement identifier 472 473 Returns: 474 Removed requirement 475 476 Raises: 477 RequirementNotFoundError: If requirement not found 478 """ 479 if req_id not in self._requirements: 480 raise RequirementNotFoundError(f"Requirement {req_id} not found") 481 req = self._requirements.pop(req_id) 482 self._graph = None 483 return req 484 485 def filter( 486 self, 487 *, 488 status: Status | None = None, 489 priority: Priority | None = None, 490 category: str | None = None, 491 subcategory: str | None = None, 492 phase: int | None = None, 493 has_test: bool | None = None, 494 ) -> list[Requirement]: 495 """Filter requirements by criteria. 496 497 Args: 498 status: Filter by status 499 priority: Filter by priority 500 category: Filter by category 501 subcategory: Filter by subcategory 502 phase: Filter by phase 503 has_test: Filter by test presence 504 505 Returns: 506 List of matching requirements 507 """ 508 results = list(self._requirements.values()) 509 510 if status is not None: 511 results = [r for r in results if r.status == status] 512 if priority is not None: 513 results = [r for r in results if r.priority == priority] 514 if category is not None: 515 results = [r for r in results if r.category == category] 516 if subcategory is not None: 517 results = [r for r in results if r.subcategory == subcategory] 518 if phase is not None: 519 results = [r for r in results if r.phase == phase] 520 if has_test is not None: 521 results = [r for r in results if r.has_test() == has_test] 522 523 return results 524 525 def all(self) -> list[Requirement]: 526 """Get all requirements. 527 528 Returns: 529 List of all requirements 530 """ 531 return list(self._requirements.values()) 532 533 def __len__(self) -> int: 534 """Return number of requirements.""" 535 return len(self._requirements) 536 537 def __iter__(self) -> Iterator[Requirement]: 538 """Iterate over requirements.""" 539 return iter(self._requirements.values()) 540 541 def __contains__(self, req_id: str) -> bool: 542 """Check if requirement exists.""" 543 return req_id in self._requirements 544 545 # Graph operations (delegate to DependencyGraph) 546 547 def _get_graph(self) -> DependencyGraph: 548 """Get or create dependency graph.""" 549 if self._graph is None: 550 from rtmx.graph import DependencyGraph 551 552 self._graph = DependencyGraph.from_database(self) 553 return self._graph 554 555 def find_cycles(self) -> list[list[str]]: 556 """Find circular dependency cycles. 557 558 Returns: 559 List of cycles, where each cycle is a list of requirement IDs 560 """ 561 return self._get_graph().find_cycles() 562 563 def transitive_blocks(self, req_id: str) -> set[str]: 564 """Get all requirements transitively blocked by a requirement. 565 566 Args: 567 req_id: Requirement identifier 568 569 Returns: 570 Set of blocked requirement IDs 571 """ 572 return self._get_graph().transitive_blocks(req_id) 573 574 def critical_path(self) -> list[str]: 575 """Get critical path through dependency graph. 576 577 Returns: 578 List of requirement IDs on critical path 579 """ 580 return self._get_graph().critical_path() 581 582 # Validation operations (delegate to validation module) 583 584 def validate(self) -> list[str]: 585 """Validate RTM structure and data. 586 587 Returns: 588 List of validation error messages (empty if valid) 589 """ 590 from rtmx.validation import validate_schema 591 592 return validate_schema(self) 593 594 def check_reciprocity(self) -> list[tuple[str, str, str]]: 595 """Check dependency/blocks reciprocity. 596 597 Returns: 598 List of (req_id, related_id, issue) tuples 599 """ 600 from rtmx.validation import check_reciprocity 601 602 return check_reciprocity(self) 603 604 def fix_reciprocity(self) -> int: 605 """Fix dependency/blocks reciprocity violations. 606 607 Returns: 608 Number of violations fixed 609 """ 610 from rtmx.validation import fix_reciprocity 611 612 return fix_reciprocity(self) 613 614 # Statistics 615 616 def status_counts(self) -> dict[Status, int]: 617 """Get count of requirements by status. 618 619 Returns: 620 Dictionary mapping status to count 621 """ 622 counts: dict[Status, int] = dict.fromkeys(Status, 0) 623 for req in self._requirements.values(): 624 counts[req.status] += 1 625 return counts 626 627 def completion_percentage(self) -> float: 628 """Calculate completion percentage. 629 630 PARTIAL requirements count as 50%. 631 632 Returns: 633 Completion percentage (0-100) 634 """ 635 if not self._requirements: 636 return 0.0 637 638 counts = self.status_counts() 639 complete = counts[Status.COMPLETE] 640 partial = counts[Status.PARTIAL] 641 total = len(self._requirements) 642 643 return ((complete + partial * 0.5) / total) * 100 644 645 @property 646 def path(self) -> Path | None: 647 """Get path to source file.""" 648 return self._path
Collection of requirements with query and modification operations.
The RTMDatabase is the primary interface for working with requirements data. It supports loading from CSV files, querying requirements, and saving changes.
Example:
db = RTMDatabase.load("docs/rtm_database.csv") req = db.get("REQ-SW-001") incomplete = db.filter(status=Status.MISSING) db.update("REQ-SW-001", status=Status.COMPLETE) db.save()
331 def __init__(self, requirements: list[Requirement], path: Path | None = None) -> None: 332 """Initialize RTM database. 333 334 Args: 335 requirements: List of requirements 336 path: Optional path to the source CSV file 337 """ 338 self._requirements: dict[str, Requirement] = {r.req_id: r for r in requirements} 339 self._path = path 340 self._graph: DependencyGraph | None = None
Initialize RTM database.
Args: requirements: List of requirements path: Optional path to the source CSV file
342 @classmethod 343 def load(cls, path: str | Path | None = None) -> Self: 344 """Load RTM database from CSV file. 345 346 Args: 347 path: Path to CSV file. If None, searches for docs/rtm_database.csv 348 349 Returns: 350 RTMDatabase instance 351 352 Raises: 353 RTMError: If file not found or invalid 354 """ 355 from rtmx.parser import find_rtm_database, load_csv 356 357 resolved_path = find_rtm_database() if path is None else Path(path) 358 359 if not resolved_path.exists(): 360 raise RTMError(f"RTM database not found: {resolved_path}") 361 362 requirements = load_csv(resolved_path) 363 return cls(requirements, resolved_path)
Load RTM database from CSV file.
Args: path: Path to CSV file. If None, searches for docs/rtm_database.csv
Returns: RTMDatabase instance
Raises: RTMError: If file not found or invalid
365 def save(self, path: str | Path | None = None) -> None: 366 """Save RTM database to CSV file. 367 368 Args: 369 path: Path to save to. If None, uses original load path. 370 371 Raises: 372 RTMError: If no path specified and database was not loaded from file 373 """ 374 from rtmx.parser import save_csv 375 376 save_path = Path(path) if path else self._path 377 if save_path is None: 378 raise RTMError("No save path specified and database was not loaded from file") 379 380 save_csv(list(self._requirements.values()), save_path) 381 self._path = save_path
Save RTM database to CSV file.
Args: path: Path to save to. If None, uses original load path.
Raises: RTMError: If no path specified and database was not loaded from file
383 def get(self, req_id: str) -> Requirement: 384 """Get requirement by ID. 385 386 Args: 387 req_id: Requirement identifier 388 389 Returns: 390 Requirement instance 391 392 Raises: 393 RequirementNotFoundError: If requirement not found 394 """ 395 if req_id not in self._requirements: 396 available = list(self._requirements.keys())[:5] 397 raise RequirementNotFoundError( 398 f"Requirement {req_id} not found. Available: {', '.join(available)}..." 399 ) 400 return self._requirements[req_id]
Get requirement by ID.
Args: req_id: Requirement identifier
Returns: Requirement instance
Raises: RequirementNotFoundError: If requirement not found
402 def exists(self, req_id: str) -> bool: 403 """Check if requirement exists. 404 405 Args: 406 req_id: Requirement identifier 407 408 Returns: 409 True if requirement exists 410 """ 411 return req_id in self._requirements
Check if requirement exists.
Args: req_id: Requirement identifier
Returns: True if requirement exists
413 def update(self, req_id: str, **fields: Any) -> Requirement: 414 """Update requirement fields. 415 416 Args: 417 req_id: Requirement identifier 418 **fields: Field name/value pairs to update 419 420 Returns: 421 Updated requirement 422 423 Raises: 424 RequirementNotFoundError: If requirement not found 425 """ 426 req = self.get(req_id) 427 428 for key, value in fields.items(): 429 if key == "status" and isinstance(value, str): 430 value = Status.from_string(value) 431 elif key == "priority" and isinstance(value, str): 432 value = Priority.from_string(value) 433 elif ( 434 key == "dependencies" 435 and isinstance(value, str) 436 or key == "blocks" 437 and isinstance(value, str) 438 ): 439 from rtmx.parser import parse_dependencies 440 441 value = parse_dependencies(value) 442 443 if hasattr(req, key): 444 setattr(req, key, value) 445 else: 446 req.extra[key] = value 447 448 # Invalidate cached graph 449 self._graph = None 450 451 return req
Update requirement fields.
Args: req_id: Requirement identifier **fields: Field name/value pairs to update
Returns: Updated requirement
Raises: RequirementNotFoundError: If requirement not found
453 def add(self, requirement: Requirement) -> None: 454 """Add a new requirement. 455 456 Args: 457 requirement: Requirement to add 458 459 Raises: 460 RTMError: If requirement ID already exists 461 """ 462 if requirement.req_id in self._requirements: 463 raise RTMError(f"Requirement {requirement.req_id} already exists") 464 self._requirements[requirement.req_id] = requirement 465 self._graph = None
Add a new requirement.
Args: requirement: Requirement to add
Raises: RTMError: If requirement ID already exists
467 def remove(self, req_id: str) -> Requirement: 468 """Remove a requirement. 469 470 Args: 471 req_id: Requirement identifier 472 473 Returns: 474 Removed requirement 475 476 Raises: 477 RequirementNotFoundError: If requirement not found 478 """ 479 if req_id not in self._requirements: 480 raise RequirementNotFoundError(f"Requirement {req_id} not found") 481 req = self._requirements.pop(req_id) 482 self._graph = None 483 return req
Remove a requirement.
Args: req_id: Requirement identifier
Returns: Removed requirement
Raises: RequirementNotFoundError: If requirement not found
485 def filter( 486 self, 487 *, 488 status: Status | None = None, 489 priority: Priority | None = None, 490 category: str | None = None, 491 subcategory: str | None = None, 492 phase: int | None = None, 493 has_test: bool | None = None, 494 ) -> list[Requirement]: 495 """Filter requirements by criteria. 496 497 Args: 498 status: Filter by status 499 priority: Filter by priority 500 category: Filter by category 501 subcategory: Filter by subcategory 502 phase: Filter by phase 503 has_test: Filter by test presence 504 505 Returns: 506 List of matching requirements 507 """ 508 results = list(self._requirements.values()) 509 510 if status is not None: 511 results = [r for r in results if r.status == status] 512 if priority is not None: 513 results = [r for r in results if r.priority == priority] 514 if category is not None: 515 results = [r for r in results if r.category == category] 516 if subcategory is not None: 517 results = [r for r in results if r.subcategory == subcategory] 518 if phase is not None: 519 results = [r for r in results if r.phase == phase] 520 if has_test is not None: 521 results = [r for r in results if r.has_test() == has_test] 522 523 return results
Filter requirements by criteria.
Args: status: Filter by status priority: Filter by priority category: Filter by category subcategory: Filter by subcategory phase: Filter by phase has_test: Filter by test presence
Returns: List of matching requirements
525 def all(self) -> list[Requirement]: 526 """Get all requirements. 527 528 Returns: 529 List of all requirements 530 """ 531 return list(self._requirements.values())
Get all requirements.
Returns: List of all requirements
555 def find_cycles(self) -> list[list[str]]: 556 """Find circular dependency cycles. 557 558 Returns: 559 List of cycles, where each cycle is a list of requirement IDs 560 """ 561 return self._get_graph().find_cycles()
Find circular dependency cycles.
Returns: List of cycles, where each cycle is a list of requirement IDs
563 def transitive_blocks(self, req_id: str) -> set[str]: 564 """Get all requirements transitively blocked by a requirement. 565 566 Args: 567 req_id: Requirement identifier 568 569 Returns: 570 Set of blocked requirement IDs 571 """ 572 return self._get_graph().transitive_blocks(req_id)
Get all requirements transitively blocked by a requirement.
Args: req_id: Requirement identifier
Returns: Set of blocked requirement IDs
574 def critical_path(self) -> list[str]: 575 """Get critical path through dependency graph. 576 577 Returns: 578 List of requirement IDs on critical path 579 """ 580 return self._get_graph().critical_path()
Get critical path through dependency graph.
Returns: List of requirement IDs on critical path
584 def validate(self) -> list[str]: 585 """Validate RTM structure and data. 586 587 Returns: 588 List of validation error messages (empty if valid) 589 """ 590 from rtmx.validation import validate_schema 591 592 return validate_schema(self)
Validate RTM structure and data.
Returns: List of validation error messages (empty if valid)
594 def check_reciprocity(self) -> list[tuple[str, str, str]]: 595 """Check dependency/blocks reciprocity. 596 597 Returns: 598 List of (req_id, related_id, issue) tuples 599 """ 600 from rtmx.validation import check_reciprocity 601 602 return check_reciprocity(self)
Check dependency/blocks reciprocity.
Returns: List of (req_id, related_id, issue) tuples
604 def fix_reciprocity(self) -> int: 605 """Fix dependency/blocks reciprocity violations. 606 607 Returns: 608 Number of violations fixed 609 """ 610 from rtmx.validation import fix_reciprocity 611 612 return fix_reciprocity(self)
Fix dependency/blocks reciprocity violations.
Returns: Number of violations fixed
616 def status_counts(self) -> dict[Status, int]: 617 """Get count of requirements by status. 618 619 Returns: 620 Dictionary mapping status to count 621 """ 622 counts: dict[Status, int] = dict.fromkeys(Status, 0) 623 for req in self._requirements.values(): 624 counts[req.status] += 1 625 return counts
Get count of requirements by status.
Returns: Dictionary mapping status to count
627 def completion_percentage(self) -> float: 628 """Calculate completion percentage. 629 630 PARTIAL requirements count as 50%. 631 632 Returns: 633 Completion percentage (0-100) 634 """ 635 if not self._requirements: 636 return 0.0 637 638 counts = self.status_counts() 639 complete = counts[Status.COMPLETE] 640 partial = counts[Status.PARTIAL] 641 total = len(self._requirements) 642 643 return ((complete + partial * 0.5) / total) * 100
Calculate completion percentage.
PARTIAL requirements count as 50%.
Returns: Completion percentage (0-100)
87@dataclass 88class Requirement: 89 """Single requirement in the RTM. 90 91 Attributes: 92 req_id: Unique identifier (e.g., REQ-SW-001) 93 category: High-level grouping (e.g., SOFTWARE, MODE, PERFORMANCE) 94 subcategory: Detailed classification within category 95 requirement_text: Human-readable requirement description 96 target_value: Quantitative acceptance criteria 97 test_module: Python test file implementing validation 98 test_function: Specific test function name 99 validation_method: Testing approach description 100 status: Current completion status 101 priority: Criticality level 102 phase: Development phase (1, 2, 3, etc.) 103 notes: Additional context 104 effort_weeks: Estimated effort in weeks 105 dependencies: Set of requirement IDs this depends on 106 blocks: Set of requirement IDs this blocks 107 assignee: Person responsible 108 sprint: Target sprint/version 109 started_date: When work began (YYYY-MM-DD) 110 completed_date: When completed (YYYY-MM-DD) 111 requirement_file: Path to detailed specification markdown 112 external_id: ID in external system (GitHub issue #, Jira key) 113 extra: Additional fields not in core schema 114 """ 115 116 req_id: str 117 category: str = "" 118 subcategory: str = "" 119 requirement_text: str = "" 120 target_value: str = "" 121 test_module: str = "" 122 test_function: str = "" 123 validation_method: str = "" 124 status: Status = Status.MISSING 125 priority: Priority = Priority.MEDIUM 126 phase: int | None = None 127 notes: str = "" 128 effort_weeks: float | None = None 129 dependencies: set[str] = field(default_factory=set) 130 blocks: set[str] = field(default_factory=set) 131 assignee: str = "" 132 sprint: str = "" 133 started_date: str = "" 134 completed_date: str = "" 135 requirement_file: str = "" 136 external_id: str = "" 137 extra: dict[str, Any] = field(default_factory=dict) 138 139 def has_test(self) -> bool: 140 """Check if requirement has an associated test.""" 141 return self.test_module not in ("", "MISSING") and self.test_function not in ("", "MISSING") 142 143 def is_complete(self) -> bool: 144 """Check if requirement is fully complete.""" 145 return self.status == Status.COMPLETE 146 147 def is_blocked(self, db: RTMDatabase) -> bool: 148 """Check if requirement is blocked by incomplete dependencies.""" 149 for dep_id in self.dependencies: 150 try: 151 dep = db.get(dep_id) 152 if dep.status != Status.COMPLETE: 153 return True 154 except RequirementNotFoundError: 155 pass 156 return False 157 158 # Convenience aliases for adapters 159 @property 160 def id(self) -> str: 161 """Alias for req_id.""" 162 return self.req_id 163 164 @property 165 def text(self) -> str: 166 """Alias for requirement_text.""" 167 return self.requirement_text 168 169 @property 170 def rationale(self) -> str: 171 """Get rationale from notes or extra fields.""" 172 return self.extra.get("rationale", self.notes) 173 174 @property 175 def acceptance(self) -> str: 176 """Get acceptance criteria from target_value or extra fields.""" 177 return self.extra.get("acceptance", self.target_value) 178 179 def to_dict(self) -> dict[str, Any]: 180 """Convert requirement to dictionary for serialization.""" 181 data = { 182 "req_id": self.req_id, 183 "category": self.category, 184 "subcategory": self.subcategory, 185 "requirement_text": self.requirement_text, 186 "target_value": self.target_value, 187 "test_module": self.test_module, 188 "test_function": self.test_function, 189 "validation_method": self.validation_method, 190 "status": self.status.value, 191 "priority": self.priority.value, 192 "phase": self.phase if self.phase is not None else "", 193 "notes": self.notes, 194 "effort_weeks": self.effort_weeks if self.effort_weeks is not None else "", 195 "dependencies": "|".join(sorted(self.dependencies)), 196 "blocks": "|".join(sorted(self.blocks)), 197 "assignee": self.assignee, 198 "sprint": self.sprint, 199 "started_date": self.started_date, 200 "completed_date": self.completed_date, 201 "requirement_file": self.requirement_file, 202 "external_id": self.external_id, 203 } 204 # Add extra fields 205 data.update(self.extra) 206 return data 207 208 @classmethod 209 def from_dict(cls, data: dict[str, Any]) -> Self: 210 """Create requirement from dictionary.""" 211 from rtmx.parser import parse_dependencies 212 213 # Extract known fields 214 req_id = str(data.get("req_id", data.get("Req_ID", ""))) 215 category = str(data.get("category", data.get("Category", ""))) 216 subcategory = str(data.get("subcategory", data.get("Subcategory", ""))) 217 218 # Parse status 219 status_str = str(data.get("status", data.get("Status", "MISSING"))) 220 status = Status.from_string(status_str) 221 222 # Parse priority 223 priority_str = str(data.get("priority", data.get("Priority", "MEDIUM"))) 224 priority = Priority.from_string(priority_str) 225 226 # Parse phase 227 phase_val = data.get("phase", data.get("Phase")) 228 phase: int | None = None 229 if phase_val not in (None, "", "phase"): 230 with contextlib.suppress(ValueError, TypeError): 231 phase = int(str(phase_val)) 232 233 # Parse effort_weeks 234 effort_val = data.get("effort_weeks", data.get("Effort_Weeks")) 235 effort_weeks: float | None = None 236 if effort_val not in (None, ""): 237 with contextlib.suppress(ValueError, TypeError): 238 effort_weeks = float(str(effort_val)) 239 240 # Parse dependencies and blocks 241 deps_str = str(data.get("dependencies", data.get("Dependencies", ""))) 242 blocks_str = str(data.get("blocks", data.get("Blocks", ""))) 243 244 # Collect extra fields 245 known_fields = { 246 "req_id", 247 "Req_ID", 248 "category", 249 "Category", 250 "subcategory", 251 "Subcategory", 252 "requirement_text", 253 "Requirement_Text", 254 "target_value", 255 "Target_Value", 256 "test_module", 257 "Test_Module", 258 "test_function", 259 "Test_Function", 260 "validation_method", 261 "Validation_Method", 262 "status", 263 "Status", 264 "priority", 265 "Priority", 266 "phase", 267 "Phase", 268 "notes", 269 "Notes", 270 "effort_weeks", 271 "Effort_Weeks", 272 "dependencies", 273 "Dependencies", 274 "blocks", 275 "Blocks", 276 "assignee", 277 "Assignee", 278 "sprint", 279 "Sprint", 280 "started_date", 281 "Started_Date", 282 "completed_date", 283 "Completed_Date", 284 "requirement_file", 285 "Requirement_File", 286 "external_id", 287 "External_ID", 288 } 289 extra = {k: v for k, v in data.items() if k not in known_fields} 290 291 return cls( 292 req_id=req_id, 293 category=category, 294 subcategory=subcategory, 295 requirement_text=str(data.get("requirement_text", data.get("Requirement_Text", ""))), 296 target_value=str(data.get("target_value", data.get("Target_Value", ""))), 297 test_module=str(data.get("test_module", data.get("Test_Module", ""))), 298 test_function=str(data.get("test_function", data.get("Test_Function", ""))), 299 validation_method=str(data.get("validation_method", data.get("Validation_Method", ""))), 300 status=status, 301 priority=priority, 302 phase=phase, 303 notes=str(data.get("notes", data.get("Notes", ""))), 304 effort_weeks=effort_weeks, 305 dependencies=parse_dependencies(deps_str), 306 blocks=parse_dependencies(blocks_str), 307 assignee=str(data.get("assignee", data.get("Assignee", ""))), 308 sprint=str(data.get("sprint", data.get("Sprint", ""))), 309 started_date=str(data.get("started_date", data.get("Started_Date", ""))), 310 completed_date=str(data.get("completed_date", data.get("Completed_Date", ""))), 311 requirement_file=str(data.get("requirement_file", data.get("Requirement_File", ""))), 312 external_id=str(data.get("external_id", data.get("External_ID", ""))), 313 extra=extra, 314 )
Single requirement in the RTM.
Attributes: req_id: Unique identifier (e.g., REQ-SW-001) category: High-level grouping (e.g., SOFTWARE, MODE, PERFORMANCE) subcategory: Detailed classification within category requirement_text: Human-readable requirement description target_value: Quantitative acceptance criteria test_module: Python test file implementing validation test_function: Specific test function name validation_method: Testing approach description status: Current completion status priority: Criticality level phase: Development phase (1, 2, 3, etc.) notes: Additional context effort_weeks: Estimated effort in weeks dependencies: Set of requirement IDs this depends on blocks: Set of requirement IDs this blocks assignee: Person responsible sprint: Target sprint/version started_date: When work began (YYYY-MM-DD) completed_date: When completed (YYYY-MM-DD) requirement_file: Path to detailed specification markdown external_id: ID in external system (GitHub issue #, Jira key) extra: Additional fields not in core schema
139 def has_test(self) -> bool: 140 """Check if requirement has an associated test.""" 141 return self.test_module not in ("", "MISSING") and self.test_function not in ("", "MISSING")
Check if requirement has an associated test.
143 def is_complete(self) -> bool: 144 """Check if requirement is fully complete.""" 145 return self.status == Status.COMPLETE
Check if requirement is fully complete.
147 def is_blocked(self, db: RTMDatabase) -> bool: 148 """Check if requirement is blocked by incomplete dependencies.""" 149 for dep_id in self.dependencies: 150 try: 151 dep = db.get(dep_id) 152 if dep.status != Status.COMPLETE: 153 return True 154 except RequirementNotFoundError: 155 pass 156 return False
Check if requirement is blocked by incomplete dependencies.
164 @property 165 def text(self) -> str: 166 """Alias for requirement_text.""" 167 return self.requirement_text
Alias for requirement_text.
169 @property 170 def rationale(self) -> str: 171 """Get rationale from notes or extra fields.""" 172 return self.extra.get("rationale", self.notes)
Get rationale from notes or extra fields.
174 @property 175 def acceptance(self) -> str: 176 """Get acceptance criteria from target_value or extra fields.""" 177 return self.extra.get("acceptance", self.target_value)
Get acceptance criteria from target_value or extra fields.
179 def to_dict(self) -> dict[str, Any]: 180 """Convert requirement to dictionary for serialization.""" 181 data = { 182 "req_id": self.req_id, 183 "category": self.category, 184 "subcategory": self.subcategory, 185 "requirement_text": self.requirement_text, 186 "target_value": self.target_value, 187 "test_module": self.test_module, 188 "test_function": self.test_function, 189 "validation_method": self.validation_method, 190 "status": self.status.value, 191 "priority": self.priority.value, 192 "phase": self.phase if self.phase is not None else "", 193 "notes": self.notes, 194 "effort_weeks": self.effort_weeks if self.effort_weeks is not None else "", 195 "dependencies": "|".join(sorted(self.dependencies)), 196 "blocks": "|".join(sorted(self.blocks)), 197 "assignee": self.assignee, 198 "sprint": self.sprint, 199 "started_date": self.started_date, 200 "completed_date": self.completed_date, 201 "requirement_file": self.requirement_file, 202 "external_id": self.external_id, 203 } 204 # Add extra fields 205 data.update(self.extra) 206 return data
Convert requirement to dictionary for serialization.
208 @classmethod 209 def from_dict(cls, data: dict[str, Any]) -> Self: 210 """Create requirement from dictionary.""" 211 from rtmx.parser import parse_dependencies 212 213 # Extract known fields 214 req_id = str(data.get("req_id", data.get("Req_ID", ""))) 215 category = str(data.get("category", data.get("Category", ""))) 216 subcategory = str(data.get("subcategory", data.get("Subcategory", ""))) 217 218 # Parse status 219 status_str = str(data.get("status", data.get("Status", "MISSING"))) 220 status = Status.from_string(status_str) 221 222 # Parse priority 223 priority_str = str(data.get("priority", data.get("Priority", "MEDIUM"))) 224 priority = Priority.from_string(priority_str) 225 226 # Parse phase 227 phase_val = data.get("phase", data.get("Phase")) 228 phase: int | None = None 229 if phase_val not in (None, "", "phase"): 230 with contextlib.suppress(ValueError, TypeError): 231 phase = int(str(phase_val)) 232 233 # Parse effort_weeks 234 effort_val = data.get("effort_weeks", data.get("Effort_Weeks")) 235 effort_weeks: float | None = None 236 if effort_val not in (None, ""): 237 with contextlib.suppress(ValueError, TypeError): 238 effort_weeks = float(str(effort_val)) 239 240 # Parse dependencies and blocks 241 deps_str = str(data.get("dependencies", data.get("Dependencies", ""))) 242 blocks_str = str(data.get("blocks", data.get("Blocks", ""))) 243 244 # Collect extra fields 245 known_fields = { 246 "req_id", 247 "Req_ID", 248 "category", 249 "Category", 250 "subcategory", 251 "Subcategory", 252 "requirement_text", 253 "Requirement_Text", 254 "target_value", 255 "Target_Value", 256 "test_module", 257 "Test_Module", 258 "test_function", 259 "Test_Function", 260 "validation_method", 261 "Validation_Method", 262 "status", 263 "Status", 264 "priority", 265 "Priority", 266 "phase", 267 "Phase", 268 "notes", 269 "Notes", 270 "effort_weeks", 271 "Effort_Weeks", 272 "dependencies", 273 "Dependencies", 274 "blocks", 275 "Blocks", 276 "assignee", 277 "Assignee", 278 "sprint", 279 "Sprint", 280 "started_date", 281 "Started_Date", 282 "completed_date", 283 "Completed_Date", 284 "requirement_file", 285 "Requirement_File", 286 "external_id", 287 "External_ID", 288 } 289 extra = {k: v for k, v in data.items() if k not in known_fields} 290 291 return cls( 292 req_id=req_id, 293 category=category, 294 subcategory=subcategory, 295 requirement_text=str(data.get("requirement_text", data.get("Requirement_Text", ""))), 296 target_value=str(data.get("target_value", data.get("Target_Value", ""))), 297 test_module=str(data.get("test_module", data.get("Test_Module", ""))), 298 test_function=str(data.get("test_function", data.get("Test_Function", ""))), 299 validation_method=str(data.get("validation_method", data.get("Validation_Method", ""))), 300 status=status, 301 priority=priority, 302 phase=phase, 303 notes=str(data.get("notes", data.get("Notes", ""))), 304 effort_weeks=effort_weeks, 305 dependencies=parse_dependencies(deps_str), 306 blocks=parse_dependencies(blocks_str), 307 assignee=str(data.get("assignee", data.get("Assignee", ""))), 308 sprint=str(data.get("sprint", data.get("Sprint", ""))), 309 started_date=str(data.get("started_date", data.get("Started_Date", ""))), 310 completed_date=str(data.get("completed_date", data.get("Completed_Date", ""))), 311 requirement_file=str(data.get("requirement_file", data.get("Requirement_File", ""))), 312 external_id=str(data.get("external_id", data.get("External_ID", ""))), 313 extra=extra, 314 )
Create requirement from dictionary.
48class Status(str, Enum): 49 """Requirement completion status.""" 50 51 COMPLETE = "COMPLETE" 52 PARTIAL = "PARTIAL" 53 MISSING = "MISSING" 54 NOT_STARTED = "NOT_STARTED" 55 56 @classmethod 57 def from_string(cls, value: str) -> Status: 58 """Parse status from string, handling variations.""" 59 normalized = value.strip().upper().replace("-", "_").replace(" ", "_") 60 for member in cls: 61 if member.value == normalized: 62 return member 63 # Default to MISSING for unknown values 64 return cls.MISSING
Requirement completion status.
56 @classmethod 57 def from_string(cls, value: str) -> Status: 58 """Parse status from string, handling variations.""" 59 normalized = value.strip().upper().replace("-", "_").replace(" ", "_") 60 for member in cls: 61 if member.value == normalized: 62 return member 63 # Default to MISSING for unknown values 64 return cls.MISSING
Parse status from string, handling variations.
67class Priority(str, Enum): 68 """Requirement priority level.""" 69 70 P0 = "P0" # Critical 71 HIGH = "HIGH" 72 MEDIUM = "MEDIUM" 73 LOW = "LOW" 74 75 @classmethod 76 def from_string(cls, value: str) -> Priority: 77 """Parse priority from string, handling variations.""" 78 normalized = value.strip().upper() 79 if normalized in ("P0", "CRITICAL"): 80 return cls.P0 81 for member in cls: 82 if member.value == normalized: 83 return member 84 return cls.MEDIUM
Requirement priority level.
75 @classmethod 76 def from_string(cls, value: str) -> Priority: 77 """Parse priority from string, handling variations.""" 78 normalized = value.strip().upper() 79 if normalized in ("P0", "CRITICAL"): 80 return cls.P0 81 for member in cls: 82 if member.value == normalized: 83 return member 84 return cls.MEDIUM
Parse priority from string, handling variations.
Base exception for RTM-related errors.
36class RequirementNotFoundError(RTMError): 37 """Raised when a requirement ID is not found in the RTM.""" 38 39 pass
Raised when a requirement ID is not found in the RTM.
Raised when RTM data fails validation.
173@dataclass 174class RTMXConfig: 175 """Complete RTMX configuration.""" 176 177 database: Path = field(default_factory=lambda: Path("docs/rtm_database.csv")) 178 requirements_dir: Path = field(default_factory=lambda: Path("docs/requirements")) 179 schema: str = "core" 180 pytest: PytestConfig = field(default_factory=PytestConfig) 181 agents: AgentsConfig = field(default_factory=AgentsConfig) 182 adapters: AdaptersConfig = field(default_factory=AdaptersConfig) 183 mcp: MCPConfig = field(default_factory=MCPConfig) 184 sync: SyncConfig = field(default_factory=SyncConfig) 185 186 # Path where config was loaded from (if any) 187 _config_path: Path | None = None 188 189 @classmethod 190 def from_dict(cls, data: dict[str, Any], config_path: Path | None = None) -> RTMXConfig: 191 """Create RTMXConfig from dictionary. 192 193 Args: 194 data: Configuration dictionary (usually from YAML) 195 config_path: Path where config was loaded from 196 197 Returns: 198 RTMXConfig instance 199 """ 200 rtmx_data = data.get("rtmx", data) 201 202 config = cls( 203 database=Path(rtmx_data.get("database", "docs/rtm_database.csv")), 204 requirements_dir=Path(rtmx_data.get("requirements_dir", "docs/requirements")), 205 schema=rtmx_data.get("schema", "core"), 206 ) 207 208 if "pytest" in rtmx_data: 209 config.pytest = PytestConfig.from_dict(rtmx_data["pytest"]) 210 if "agents" in rtmx_data: 211 config.agents = AgentsConfig.from_dict(rtmx_data["agents"]) 212 if "adapters" in rtmx_data: 213 config.adapters = AdaptersConfig.from_dict(rtmx_data["adapters"]) 214 if "mcp" in rtmx_data: 215 config.mcp = MCPConfig.from_dict(rtmx_data["mcp"]) 216 if "sync" in rtmx_data: 217 config.sync = SyncConfig.from_dict(rtmx_data["sync"]) 218 219 config._config_path = config_path 220 return config 221 222 def to_dict(self) -> dict[str, Any]: 223 """Convert config to dictionary for serialization.""" 224 return { 225 "rtmx": { 226 "database": str(self.database), 227 "requirements_dir": str(self.requirements_dir), 228 "schema": self.schema, 229 "pytest": { 230 "marker_prefix": self.pytest.marker_prefix, 231 "register_markers": self.pytest.register_markers, 232 }, 233 "agents": { 234 "claude": { 235 "enabled": self.agents.claude.enabled, 236 "config_path": self.agents.claude.config_path, 237 }, 238 "cursor": { 239 "enabled": self.agents.cursor.enabled, 240 "config_path": self.agents.cursor.config_path, 241 }, 242 "copilot": { 243 "enabled": self.agents.copilot.enabled, 244 "config_path": self.agents.copilot.config_path, 245 }, 246 "template_dir": self.agents.template_dir, 247 }, 248 "adapters": { 249 "github": { 250 "enabled": self.adapters.github.enabled, 251 "repo": self.adapters.github.repo, 252 "token_env": self.adapters.github.token_env, 253 "labels": self.adapters.github.labels, 254 "status_mapping": self.adapters.github.status_mapping, 255 }, 256 "jira": { 257 "enabled": self.adapters.jira.enabled, 258 "server": self.adapters.jira.server, 259 "project": self.adapters.jira.project, 260 "token_env": self.adapters.jira.token_env, 261 "issue_type": self.adapters.jira.issue_type, 262 "status_mapping": self.adapters.jira.status_mapping, 263 }, 264 }, 265 "mcp": { 266 "enabled": self.mcp.enabled, 267 "port": self.mcp.port, 268 "host": self.mcp.host, 269 }, 270 "sync": { 271 "conflict_resolution": self.sync.conflict_resolution, 272 }, 273 } 274 }
Complete RTMX configuration.
189 @classmethod 190 def from_dict(cls, data: dict[str, Any], config_path: Path | None = None) -> RTMXConfig: 191 """Create RTMXConfig from dictionary. 192 193 Args: 194 data: Configuration dictionary (usually from YAML) 195 config_path: Path where config was loaded from 196 197 Returns: 198 RTMXConfig instance 199 """ 200 rtmx_data = data.get("rtmx", data) 201 202 config = cls( 203 database=Path(rtmx_data.get("database", "docs/rtm_database.csv")), 204 requirements_dir=Path(rtmx_data.get("requirements_dir", "docs/requirements")), 205 schema=rtmx_data.get("schema", "core"), 206 ) 207 208 if "pytest" in rtmx_data: 209 config.pytest = PytestConfig.from_dict(rtmx_data["pytest"]) 210 if "agents" in rtmx_data: 211 config.agents = AgentsConfig.from_dict(rtmx_data["agents"]) 212 if "adapters" in rtmx_data: 213 config.adapters = AdaptersConfig.from_dict(rtmx_data["adapters"]) 214 if "mcp" in rtmx_data: 215 config.mcp = MCPConfig.from_dict(rtmx_data["mcp"]) 216 if "sync" in rtmx_data: 217 config.sync = SyncConfig.from_dict(rtmx_data["sync"]) 218 219 config._config_path = config_path 220 return config
Create RTMXConfig from dictionary.
Args: data: Configuration dictionary (usually from YAML) config_path: Path where config was loaded from
Returns: RTMXConfig instance
222 def to_dict(self) -> dict[str, Any]: 223 """Convert config to dictionary for serialization.""" 224 return { 225 "rtmx": { 226 "database": str(self.database), 227 "requirements_dir": str(self.requirements_dir), 228 "schema": self.schema, 229 "pytest": { 230 "marker_prefix": self.pytest.marker_prefix, 231 "register_markers": self.pytest.register_markers, 232 }, 233 "agents": { 234 "claude": { 235 "enabled": self.agents.claude.enabled, 236 "config_path": self.agents.claude.config_path, 237 }, 238 "cursor": { 239 "enabled": self.agents.cursor.enabled, 240 "config_path": self.agents.cursor.config_path, 241 }, 242 "copilot": { 243 "enabled": self.agents.copilot.enabled, 244 "config_path": self.agents.copilot.config_path, 245 }, 246 "template_dir": self.agents.template_dir, 247 }, 248 "adapters": { 249 "github": { 250 "enabled": self.adapters.github.enabled, 251 "repo": self.adapters.github.repo, 252 "token_env": self.adapters.github.token_env, 253 "labels": self.adapters.github.labels, 254 "status_mapping": self.adapters.github.status_mapping, 255 }, 256 "jira": { 257 "enabled": self.adapters.jira.enabled, 258 "server": self.adapters.jira.server, 259 "project": self.adapters.jira.project, 260 "token_env": self.adapters.jira.token_env, 261 "issue_type": self.adapters.jira.issue_type, 262 "status_mapping": self.adapters.jira.status_mapping, 263 }, 264 }, 265 "mcp": { 266 "enabled": self.mcp.enabled, 267 "port": self.mcp.port, 268 "host": self.mcp.host, 269 }, 270 "sync": { 271 "conflict_resolution": self.sync.conflict_resolution, 272 }, 273 } 274 }
Convert config to dictionary for serialization.
303def load_config(path: Path | str | None = None) -> RTMXConfig: 304 """Load RTMX configuration from file. 305 306 If path is not specified, searches upward from cwd for rtmx.yaml. 307 If no config file is found, returns default configuration. 308 309 Args: 310 path: Path to config file (optional) 311 312 Returns: 313 RTMXConfig instance 314 """ 315 config_path: Path | None = None 316 317 if path is not None: 318 config_path = Path(path) 319 if not config_path.exists(): 320 # Return defaults if specified path doesn't exist 321 return RTMXConfig() 322 else: 323 config_path = find_config_file() 324 325 if config_path is None: 326 # No config file found, return defaults 327 return RTMXConfig() 328 329 with config_path.open() as f: 330 data = yaml.safe_load(f) or {} 331 332 return RTMXConfig.from_dict(data, config_path)
Load RTMX configuration from file.
If path is not specified, searches upward from cwd for rtmx.yaml. If no config file is found, returns default configuration.
Args: path: Path to config file (optional)
Returns: RTMXConfig instance