Coverage for formkit_ninja / parser / generator.py: 5.89%

343 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-06 04:12 +0000

1""" 

2Code generator for FormKit schemas. 

3 

4This module provides the CodeGenerator class which generates Django models, 

5Pydantic schemas, admin classes, and API endpoints from FormKit schemas. 

6""" 

7 

8import ast 

9from typing import List, Union 

10 

11from formkit_ninja.formkit_schema import FormKitSchema 

12from formkit_ninja.parser.formatter import CodeFormatter, FormattingError 

13from formkit_ninja.parser.generation_pipeline import CallableStep, GenerationContext, GenerationPipeline 

14from formkit_ninja.parser.generator_config import GeneratorConfig, schema_name_to_filename 

15from formkit_ninja.parser.schema_walker import SchemaWalker 

16from formkit_ninja.parser.template_loader import TemplateLoader 

17from formkit_ninja.parser.type_convert import NodePath 

18 

19 

20class CodeGenerator: 

21 """ 

22 Code generator for FormKit schemas. 

23 

24 Generates Django models, Pydantic schemas, admin classes, and API endpoints 

25 from FormKit schema definitions. 

26 

27 Args: 

28 config: Generator configuration (app name, output dir, etc.) 

29 template_loader: Template loader for Jinja2 templates 

30 formatter: Code formatter (uses ruff) 

31 """ 

32 

33 def __init__( 

34 self, 

35 config: GeneratorConfig, 

36 template_loader: TemplateLoader, 

37 formatter: CodeFormatter, 

38 ) -> None: 

39 """Initialize CodeGenerator with configuration and dependencies.""" 

40 self.config = config 

41 self.template_loader = template_loader 

42 self.formatter = formatter 

43 

44 def _collect_nodepaths(self, schema: Union[List[dict], FormKitSchema]) -> List[NodePath]: 

45 """ 

46 Collect all NodePath instances from a FormKit schema recursively. 

47 

48 Traverses the schema structure and collects all nodes (groups, repeaters, 

49 and form fields) as NodePath instances. 

50 

51 Args: 

52 schema: FormKit schema (list of dicts or FormKitSchema object) 

53 

54 Returns: 

55 List of NodePath instances representing all nodes in the schema 

56 """ 

57 abstract_base_info: dict[str, bool] = {} # Use classname as key 

58 root_child_abstract_bases: dict[str, list[str]] = {} # Use classname as key 

59 

60 walker = SchemaWalker(config=self.config) 

61 nodepaths = walker.collect_nodepaths(schema, abstract_base_info=abstract_base_info) 

62 

63 # After collecting all nodepaths, identify abstract bases if merging is enabled 

64 if self.config.merge_top_level_groups: 

65 # Only process group nodes 

66 group_nodepaths = [np for np in nodepaths if np.is_group] 

67 for node_path in group_nodepaths: 

68 # Root-level groups (depth=1, not a child) 

69 if not node_path.is_child: 

70 # This is a root group - find its immediate child groups 

71 child_abstract_bases = [] 

72 for child_path in group_nodepaths: 

73 if ( 

74 child_path.is_child 

75 and len(child_path.nodes) == 2 # depth=2 (root + child) 

76 and child_path.nodes[0] == node_path.node # parent is this root 

77 ): 

78 # This is an immediate child group of the root 

79 abstract_base_info[child_path.classname] = True 

80 child_abstract_bases.append(child_path.abstract_class_name) 

81 root_child_abstract_bases[node_path.classname] = child_abstract_bases 

82 

83 # Set child_abstract_bases for all root nodes and update abstract_base_info 

84 for node_path in nodepaths: 

85 # Update abstract_base_info dict reference so all NodePaths share the same dict 

86 node_path._abstract_base_info = abstract_base_info 

87 # Only set child_abstract_bases for group nodes (they have classnames) 

88 if node_path.is_group: 

89 try: 

90 node_path._child_abstract_bases = root_child_abstract_bases.get(node_path.classname, []) 

91 except AttributeError: 

92 # Skip nodes without names (e.g., element nodes) 

93 node_path._child_abstract_bases = [] 

94 

95 return nodepaths 

96 

97 def _generate_file( 

98 self, 

99 template_name: str, 

100 output_filename: str, 

101 nodepaths: List[NodePath], 

102 root_classname: str | None = None, 

103 ) -> str: 

104 """ 

105 Generate a single file from a template. 

106 

107 Args: 

108 template_name: Name of the Jinja2 template 

109 output_filename: Name of the output file (for reference) 

110 nodepaths: List of NodePath instances to render 

111 root_classname: Optional root node classname for imports 

112 

113 Returns: 

114 Generated code as string (before formatting) 

115 """ 

116 env = self.template_loader.get_environment() 

117 template = env.get_template(template_name) 

118 

119 # Render template with nodepaths 

120 code = template.render( 

121 nodepaths=nodepaths, 

122 app_name=self.config.app_name, 

123 include_ordinality=self.config.include_ordinality, 

124 root_classname=root_classname, 

125 custom_imports=self.config.custom_imports, 

126 ) 

127 

128 return code 

129 

130 def _generate_per_schema_file( 

131 self, 

132 template_name: str, 

133 filename: str, 

134 subdirectory: str, 

135 nodepaths: List[NodePath], 

136 root_classname: str | None = None, 

137 ) -> None: 

138 """ 

139 Generate a per-schema file in a subdirectory. 

140 

141 Args: 

142 template_name: Name of the Jinja2 template to use 

143 filename: Name of the output file 

144 subdirectory: Subdirectory within output_dir (e.g., "schemas", "admin") 

145 nodepaths: List of NodePath instances to generate code from 

146 root_classname: Optional root node classname for imports 

147 """ 

148 # Generate code from template 

149 code = self._generate_file(template_name, filename, nodepaths, root_classname=root_classname) 

150 

151 # Format code (may raise FormattingError) 

152 try: 

153 formatted_code = self.formatter.format(code) 

154 except FormattingError: 

155 # If formatting fails, use unformatted code 

156 formatted_code = code 

157 

158 # Validate code syntax 

159 self._validate_code(formatted_code, filename) 

160 

161 # Write to file in subdirectory 

162 self._write_file(filename, formatted_code, subdirectory=subdirectory) 

163 

164 def _extract_classes_from_code(self, code: str, file_type: str) -> List[str]: 

165 """ 

166 Extract class/function names from generated code based on file type. 

167 

168 Args: 

169 code: Generated Python code as string 

170 file_type: Type of file ("models", "schemas", "schemas_in", "admin", "api") 

171 

172 Returns: 

173 List of class/function/variable names to import 

174 """ 

175 try: 

176 tree = ast.parse(code) 

177 except SyntaxError: 

178 return [] 

179 

180 extracted = [] 

181 

182 for node in tree.body: 

183 if file_type == "models": 

184 # Extract concrete (non-abstract) classes 

185 if isinstance(node, ast.ClassDef): 

186 # Check if this class is abstract by looking for Meta class with abstract = True 

187 is_abstract = False 

188 for child in node.body: 

189 if isinstance(child, ast.ClassDef) and child.name == "Meta": 

190 # Check if Meta class has abstract = True 

191 for meta_attr in child.body: 

192 if isinstance(meta_attr, ast.Assign) and len(meta_attr.targets) == 1 and isinstance(meta_attr.targets[0], ast.Name) and meta_attr.targets[0].id == "abstract": 

193 # Check if the value is True 

194 if isinstance(meta_attr.value, ast.Constant) and meta_attr.value.value is True: 

195 is_abstract = True 

196 break 

197 if is_abstract: 

198 break 

199 # Only include concrete (non-abstract) classes 

200 if not is_abstract: 

201 extracted.append(node.name) 

202 elif file_type == "schemas": 

203 # Extract classes ending with "Schema" that inherit from Schema 

204 if isinstance(node, ast.ClassDef) and node.name.endswith("Schema"): 

205 # Check if class inherits from Schema 

206 for base in node.bases: 

207 if isinstance(base, ast.Name) and base.id == "Schema": 

208 extracted.append(node.name) 

209 break 

210 elif isinstance(base, ast.Attribute): 

211 # Handle cases like "schema_out.Schema" 

212 if isinstance(base.value, ast.Name) and base.attr == "Schema": 

213 extracted.append(node.name) 

214 break 

215 elif file_type == "schemas_in": 

216 # Extract all classes (BaseModel subclasses) 

217 if isinstance(node, ast.ClassDef): 

218 extracted.append(node.name) 

219 elif file_type == "admin": 

220 # Extract classes ending with "Admin" or "Inline" (exclude ReadOnlyInline) 

221 if isinstance(node, ast.ClassDef): 

222 if (node.name.endswith("Admin") or node.name.endswith("Inline")) and node.name != "ReadOnlyInline": 

223 extracted.append(node.name) 

224 elif file_type == "api": 

225 # Extract functions and router variable 

226 if isinstance(node, ast.FunctionDef): 

227 extracted.append(node.name) 

228 elif isinstance(node, ast.Assign): 

229 # Check if assigning to "router" 

230 for target in node.targets: 

231 if isinstance(target, ast.Name) and target.id == "router": 

232 extracted.append("router") 

233 

234 return extracted 

235 

236 def _generate_init_file( 

237 self, 

238 subdirectory: str, 

239 module_name: str, 

240 file_type: str, 

241 generated_file_content: str, 

242 existing_init_content: str | None = None, 

243 ) -> str: 

244 """ 

245 Generate or update __init__.py file for a subdirectory. 

246 

247 Args: 

248 subdirectory: Subdirectory name (e.g., "schemas", "admin") 

249 module_name: Name of the module file (without .py extension) 

250 file_type: Type of file ("models", "schemas", "schemas_in", "admin", "api") 

251 generated_file_content: Content of the generated per-schema file 

252 existing_init_content: Existing __init__.py content if updating 

253 

254 Returns: 

255 Generated __init__.py content as string 

256 """ 

257 # Extract classes/functions from generated file 

258 extracted_items = self._extract_classes_from_code(generated_file_content, file_type) 

259 

260 if not extracted_items: 

261 # No items to import, return existing content or empty 

262 return existing_init_content or "" 

263 

264 # Parse existing __init__.py to preserve imports 

265 existing_import_lines = [] 

266 existing_all = [] 

267 if existing_init_content: 

268 # Extract existing import statements and __all__ 

269 try: 

270 tree = ast.parse(existing_init_content) 

271 for node in tree.body: 

272 if isinstance(node, ast.ImportFrom) and node.module: 

273 # Check if it's a relative import (level > 0 means relative) 

274 is_relative = node.level > 0 

275 if is_relative: 

276 # Relative import - preserve the original line 

277 imported_names = [alias.name for alias in node.names] 

278 names_str = ", ".join(imported_names) 

279 # Reconstruct the line with proper relative import syntax 

280 # node.module doesn't include the dot, so we add it 

281 module_path = "." * node.level + (node.module or "") 

282 # Only add if not from current module 

283 if not module_path.endswith(f".{module_name}") and module_path != f".{module_name}": 

284 line = f"from {module_path} import {names_str} # noqa: F401" 

285 existing_import_lines.append(line) 

286 elif isinstance(node, ast.Assign): 

287 # Check for __all__ assignment 

288 for target in node.targets: 

289 if isinstance(target, ast.Name) and target.id == "__all__": 

290 if isinstance(node.value, (ast.List, ast.Tuple)): 

291 existing_all = [ 

292 (elt.value if isinstance(elt, ast.Constant) else (elt.s if isinstance(elt, ast.Str) else str(elt))) 

293 for elt in node.value.elts 

294 if isinstance(elt, (ast.Constant, ast.Str)) 

295 ] 

296 except SyntaxError: 

297 # If parsing fails, try to extract imports manually 

298 for line in existing_init_content.split("\n"): 

299 stripped = line.strip() 

300 if stripped.startswith("from .") and "import" in stripped: 

301 # Check if it's not from current module 

302 if f".{module_name}" not in stripped: 

303 existing_import_lines.append(stripped) 

304 elif stripped.startswith("__all__"): 

305 # Extract __all__ values manually - simple regex-like extraction 

306 import re 

307 

308 match = re.search(r"\[(.*?)\]", stripped) 

309 if match: 

310 all_content = match.group(1) 

311 # Extract quoted strings 

312 all_items = re.findall(r'"([^"]+)"', all_content) 

313 existing_all.extend(all_items) 

314 

315 # Build new __init__.py content 

316 init_lines = [] 

317 

318 # Add docstring if this is a new file 

319 if not existing_init_content: 

320 init_lines.append(f'"""{subdirectory.capitalize()} for all schemas."""') 

321 init_lines.append("") 

322 

323 # Special handling for API router merging 

324 if file_type == "api": 

325 # Collect all module names that have routers 

326 schema_modules = set() 

327 all_functions = [] 

328 

329 # Parse existing imports to get module names 

330 import re 

331 

332 for import_line in existing_import_lines: 

333 # Extract module name from "from .module import router, ..." 

334 match = re.search(r"from \.(\w+) import", import_line) 

335 if match and "router" in import_line: 

336 schema_modules.add(match.group(1)) 

337 # Extract function names from imports 

338 if "router" not in import_line: 

339 # This is a function import, preserve it 

340 all_functions.append(import_line) 

341 

342 # Add current module 

343 if extracted_items: 

344 has_router = "router" in extracted_items 

345 function_items = [item for item in extracted_items if item != "router"] 

346 

347 if has_router: 

348 schema_modules.add(module_name) 

349 

350 # Add function imports for current module 

351 if function_items: 

352 items_str = ", ".join(function_items) 

353 all_functions.append(f"from .{module_name} import {items_str} # noqa: F401") 

354 

355 # Import all functions 

356 for func_import in all_functions: 

357 init_lines.append(func_import) 

358 

359 # Create combined router 

360 init_lines.append("") 

361 init_lines.append("from ninja import Router") 

362 init_lines.append("") 

363 init_lines.append('router = Router(tags=["forms"])') 

364 

365 # Add router merging for all schema routers 

366 for schema_module in sorted(schema_modules): 

367 init_lines.append(f"from .{schema_module} import router as {schema_module}_router") 

368 init_lines.append(f'router.add_router("", {schema_module}_router)') 

369 

370 # Build __all__ list (include router and all functions) 

371 all_items = existing_all.copy() 

372 all_items.extend(extracted_items) 

373 # Remove duplicates while preserving order 

374 seen = set() 

375 unique_all = [] 

376 for item in all_items: 

377 if item not in seen: 

378 seen.add(item) 

379 unique_all.append(item) 

380 

381 if unique_all: 

382 init_lines.append("") 

383 all_str = ", ".join(f'"{str(item)}"' for item in unique_all) 

384 init_lines.append(f"__all__ = [{all_str}]") 

385 else: 

386 # Standard handling for other file types 

387 # Add existing imports (excluding the current module) 

388 for import_line in existing_import_lines: 

389 if f".{module_name}" not in import_line: 

390 init_lines.append(import_line) 

391 

392 # Add import for current module 

393 if extracted_items: 

394 items_str = ", ".join(extracted_items) 

395 init_lines.append(f"from .{module_name} import {items_str} # noqa: F401") 

396 

397 # Build __all__ list 

398 all_items = existing_all.copy() 

399 all_items.extend(extracted_items) 

400 # Remove duplicates while preserving order 

401 seen = set() 

402 unique_all = [] 

403 for item in all_items: 

404 if item not in seen: 

405 seen.add(item) 

406 unique_all.append(item) 

407 

408 if unique_all: 

409 init_lines.append("") 

410 all_str = ", ".join(f'"{str(item)}"' for item in unique_all) 

411 init_lines.append(f"__all__ = [{all_str}]") 

412 

413 return "\n".join(init_lines) 

414 

415 def _validate_code(self, code: str, filename: str) -> None: 

416 """ 

417 Validate that generated code is valid Python. 

418 

419 Args: 

420 code: Python code string to validate 

421 filename: Name of file (for error messages) 

422 

423 Raises: 

424 SyntaxError: If code is not valid Python 

425 """ 

426 try: 

427 ast.parse(code) 

428 except SyntaxError as e: 

429 raise SyntaxError( 

430 f"Generated {filename} has syntax errors: {e.msg} at line {e.lineno}", 

431 ) from e 

432 

433 def _write_file(self, filename: str, content: str, subdirectory: str | None = None) -> None: 

434 """ 

435 Write content to a file, creating directory if needed. 

436 

437 Args: 

438 filename: Name of the file (relative to output_dir or subdirectory) 

439 content: Content to write 

440 subdirectory: Optional subdirectory within output_dir (e.g., "models") 

441 """ 

442 # Ensure output directory exists 

443 self.config.output_dir.mkdir(parents=True, exist_ok=True) 

444 

445 if subdirectory: 

446 file_path = self.config.output_dir / subdirectory / filename 

447 file_path.parent.mkdir(parents=True, exist_ok=True) 

448 else: 

449 file_path = self.config.output_dir / filename 

450 

451 file_path.write_text(content, encoding="utf-8") 

452 

453 def _select_root_nodepath(self, nodepaths: List[NodePath]) -> NodePath | None: 

454 root_nodepath = next((np for np in nodepaths if not np.is_child and np.is_group), None) 

455 if not root_nodepath: 

456 root_nodepath = next((np for np in nodepaths if np.is_group), None) 

457 if not root_nodepath: 

458 root_nodepath = nodepaths[0] if nodepaths else None 

459 return root_nodepath 

460 

461 def _filter_descendants(self, root_nodepath: NodePath, nodepaths: List[NodePath]) -> List[NodePath]: 

462 filtered = [root_nodepath] 

463 for np in nodepaths: 

464 if np == root_nodepath: 

465 continue 

466 if len(np.nodes) > len(root_nodepath.nodes): 

467 is_descendant = all(np.nodes[i] == root_nodepath.nodes[i] for i in range(len(root_nodepath.nodes))) 

468 if is_descendant: 

469 filtered.append(np) 

470 return filtered 

471 

472 @staticmethod 

473 def _deduplicate_nodepaths(nodepaths: List[NodePath]) -> List[NodePath]: 

474 seen_classnames = set() 

475 unique_nodepaths = [] 

476 for np in nodepaths: 

477 if np.classname not in seen_classnames: 

478 seen_classnames.add(np.classname) 

479 unique_nodepaths.append(np) 

480 return unique_nodepaths 

481 

482 def _sort_abstract_bases_first(self, nodepaths: List[NodePath]) -> List[NodePath]: 

483 if not self.config.merge_top_level_groups: 

484 return nodepaths 

485 abstract_bases = [np for np in nodepaths if np.is_abstract_base] 

486 concrete_classes = [np for np in nodepaths if not np.is_abstract_base] 

487 return abstract_bases + concrete_classes 

488 

489 def _generate_models_with_root(self, nodepaths: List[NodePath], root_classname: str) -> None: 

490 models_filename = f"{schema_name_to_filename(root_classname)}.py" 

491 models_subdirectory = "models" 

492 

493 models_code = self._generate_file( 

494 "models.py.jinja2", 

495 models_filename, 

496 nodepaths, 

497 root_classname=root_classname, 

498 ) 

499 try: 

500 formatted_models_code = self.formatter.format(models_code) 

501 except FormattingError: 

502 formatted_models_code = models_code 

503 self._validate_code(formatted_models_code, models_filename) 

504 self._write_file(models_filename, formatted_models_code, subdirectory=models_subdirectory) 

505 

506 module_name = schema_name_to_filename(root_classname) 

507 init_file_path = self.config.output_dir / models_subdirectory / "__init__.py" 

508 existing_init_content = init_file_path.read_text() if init_file_path.exists() else None 

509 

510 init_content = self._generate_init_file( 

511 subdirectory=models_subdirectory, 

512 module_name=module_name, 

513 file_type="models", 

514 generated_file_content=formatted_models_code, 

515 existing_init_content=existing_init_content, 

516 ) 

517 try: 

518 formatted_init_code = self.formatter.format(init_content) 

519 except FormattingError: 

520 formatted_init_code = init_content 

521 self._validate_code(formatted_init_code, "__init__.py") 

522 self._write_file("__init__.py", formatted_init_code, subdirectory=models_subdirectory) 

523 

524 def _generate_models_without_root(self, nodepaths: List[NodePath]) -> None: 

525 models_code = self._generate_file( 

526 "models.py.jinja2", 

527 "models.py", 

528 nodepaths, 

529 root_classname=None, 

530 ) 

531 try: 

532 formatted_models_code = self.formatter.format(models_code) 

533 except FormattingError: 

534 formatted_models_code = models_code 

535 self._validate_code(formatted_models_code, "models.py") 

536 self._write_file("models.py", formatted_models_code) 

537 

538 def _generate_signals_with_root(self, nodepaths: List[NodePath], root_classname: str) -> None: 

539 """Generate signals.py when there is a root class.""" 

540 # For simplicity, we currently generate a single signals.py in the root app dir 

541 # regardless of whether we are splitting other files. 

542 # Ideally, this might be split, but signals usually need to be app-wide. 

543 self._generate_signals_without_root(nodepaths) 

544 

545 def _generate_signals_without_root(self, nodepaths: List[NodePath]) -> None: 

546 """Generate signals.py.""" 

547 signals_code = self._generate_file( 

548 "signals.py.jinja2", 

549 "signals.py", 

550 nodepaths, 

551 root_classname=None, 

552 ) 

553 try: 

554 formatted_code = self.formatter.format(signals_code) 

555 except FormattingError: 

556 formatted_code = signals_code 

557 self._validate_code(formatted_code, "signals.py") 

558 self._write_file("signals.py", formatted_code) 

559 

560 def _generate_subdir_files_with_root(self, nodepaths: List[NodePath], root_classname: str) -> None: 

561 file_mappings = [ 

562 ("schemas.py.jinja2", "schemas", "schemas"), 

563 ("schemas_in.py.jinja2", "schemas_in", "schemas_in"), 

564 ("admin.py.jinja2", "admin", "admin"), 

565 ("api.py.jinja2", "api", "api"), 

566 ] 

567 schema_filename = f"{schema_name_to_filename(root_classname)}.py" 

568 

569 for template_name, subdirectory, file_type in file_mappings: 

570 self._generate_per_schema_file( 

571 template_name=template_name, 

572 filename=schema_filename, 

573 subdirectory=subdirectory, 

574 nodepaths=nodepaths, 

575 root_classname=root_classname, 

576 ) 

577 

578 generated_file_path = self.config.output_dir / subdirectory / schema_filename 

579 generated_file_content = generated_file_path.read_text() 

580 

581 init_file_path = self.config.output_dir / subdirectory / "__init__.py" 

582 existing_init_content = init_file_path.read_text() if init_file_path.exists() else None 

583 

584 init_content = self._generate_init_file( 

585 subdirectory=subdirectory, 

586 module_name=schema_name_to_filename(root_classname), 

587 file_type=file_type, 

588 generated_file_content=generated_file_content, 

589 existing_init_content=existing_init_content, 

590 ) 

591 

592 try: 

593 formatted_init = self.formatter.format(init_content) 

594 except FormattingError: 

595 formatted_init = init_content 

596 self._validate_code(formatted_init, "__init__.py") 

597 self._write_file("__init__.py", formatted_init, subdirectory=subdirectory) 

598 

599 def _generate_subdir_files_without_root(self, nodepaths: List[NodePath]) -> None: 

600 file_mappings = [ 

601 ("schemas.py.jinja2", "schemas", "schemas"), 

602 ("schemas_in.py.jinja2", "schemas_in", "schemas_in"), 

603 ("admin.py.jinja2", "admin", "admin"), 

604 ("api.py.jinja2", "api", "api"), 

605 ] 

606 

607 for template_name, subdirectory, _ in file_mappings: 

608 output_filename = f"{subdirectory}.py" 

609 code = self._generate_file(template_name, output_filename, nodepaths, root_classname=None) 

610 try: 

611 formatted_code = self.formatter.format(code) 

612 except FormattingError: 

613 formatted_code = code 

614 self._validate_code(formatted_code, output_filename) 

615 self._write_file(output_filename, formatted_code) 

616 

617 def generate(self, schema: Union[List[dict], FormKitSchema]) -> None: 

618 """ 

619 Generate all code files from a FormKit schema. 

620 

621 Generates: 

622 - models/<schema_name>.py: Django models (if schema_name is provided) 

623 - models/__init__.py: Imports from the generated model file 

624 - schemas.py: Django Ninja output schemas 

625 - schemas_in.py: Django Ninja input schemas (Pydantic BaseModel) 

626 - admin.py: Django admin classes 

627 - api.py: Django Ninja API endpoints 

628 

629 Args: 

630 schema: FormKit schema (list of dicts or FormKitSchema object) 

631 

632 Raises: 

633 SyntaxError: If generated code is not valid Python 

634 FormattingError: If code formatting fails 

635 """ 

636 context = GenerationContext(schema=schema, generator=self) 

637 

638 def collect_nodepaths_step(ctx: GenerationContext) -> None: 

639 all_nodepaths = self._collect_nodepaths(ctx.schema) 

640 ctx.data["all_nodepaths"] = all_nodepaths 

641 ctx.data["groups_and_repeaters"] = [np for np in all_nodepaths if np.is_group or np.is_repeater] 

642 

643 def select_root_step(ctx: GenerationContext) -> None: 

644 groups_and_repeaters = ctx.data["groups_and_repeaters"] 

645 root_nodepath = self._select_root_nodepath(groups_and_repeaters) 

646 ctx.data["root_nodepath"] = root_nodepath 

647 if root_nodepath: 

648 root_classname = root_nodepath.classname 

649 nodepaths = self._filter_descendants(root_nodepath, groups_and_repeaters) 

650 nodepaths = self._sort_abstract_bases_first(nodepaths) 

651 nodepaths = self._deduplicate_nodepaths(nodepaths) 

652 ctx.data["nodepaths"] = nodepaths 

653 ctx.data["root_classname"] = root_classname 

654 else: 

655 nodepaths = self._deduplicate_nodepaths(groups_and_repeaters) 

656 ctx.data["nodepaths"] = nodepaths 

657 

658 def generate_models_step(ctx: GenerationContext) -> None: 

659 root_nodepath = ctx.data["root_nodepath"] 

660 nodepaths = ctx.data["nodepaths"] 

661 if root_nodepath: 

662 self._generate_models_with_root(nodepaths, ctx.data["root_classname"]) 

663 else: 

664 self._generate_models_without_root(nodepaths) 

665 

666 def generate_signals_step(ctx: GenerationContext) -> None: 

667 root_nodepath = ctx.data["root_nodepath"] 

668 nodepaths = ctx.data["nodepaths"] 

669 if root_nodepath: 

670 self._generate_signals_with_root(nodepaths, ctx.data["root_classname"]) 

671 else: 

672 self._generate_signals_without_root(nodepaths) 

673 

674 def generate_subdirs_step(ctx: GenerationContext) -> None: 

675 root_nodepath = ctx.data["root_nodepath"] 

676 nodepaths = ctx.data["nodepaths"] 

677 if root_nodepath: 

678 self._generate_subdir_files_with_root(nodepaths, ctx.data["root_classname"]) 

679 else: 

680 self._generate_subdir_files_without_root(nodepaths) 

681 

682 pipeline = GenerationPipeline( 

683 [ 

684 CallableStep(collect_nodepaths_step), 

685 CallableStep(select_root_step), 

686 CallableStep(generate_models_step), 

687 CallableStep(generate_subdirs_step), 

688 CallableStep(generate_signals_step), 

689 ] 

690 ) 

691 pipeline.run(context)