yaml_shredder.structure_analyzer

Analyze and detect repeating structures in YAML/JSON data.

  1"""Analyze and detect repeating structures in YAML/JSON data."""
  2
  3from collections import defaultdict
  4from typing import Any
  5
  6
  7class StructureAnalyzer:
  8    """Analyze nested structures and detect repeating patterns."""
  9
 10    def __init__(self):
 11        """Initialize the structure analyzer."""
 12        self.arrays_found = []
 13        self.structure_patterns = defaultdict(list)
 14
 15    def analyze(self, data: dict[str, Any], path: str = "") -> dict[str, Any]:
 16        """
 17        Analyze data structure and detect repeating patterns.
 18
 19        Args:
 20            data: Dictionary to analyze
 21            path: Current path in the structure (for nested objects)
 22
 23        Returns:
 24            Analysis results including arrays and patterns
 25        """
 26        self._traverse(data, path)
 27
 28        return {
 29            "total_arrays": len(self.arrays_found),
 30            "arrays": self.arrays_found,
 31            "structure_patterns": dict(self.structure_patterns),
 32        }
 33
 34    def _traverse(self, obj: Any, path: str = "") -> None:
 35        """
 36        Recursively traverse the object structure.
 37
 38        Args:
 39            obj: Object to traverse
 40            path: Current path
 41        """
 42        if isinstance(obj, dict):
 43            for key, value in obj.items():
 44                current_path = f"{path}.{key}" if path else key
 45                self._traverse(value, current_path)
 46
 47        elif isinstance(obj, list) and obj:
 48            # Found an array
 49            array_info = self._analyze_array(obj, path)
 50            self.arrays_found.append(array_info)
 51
 52            # Continue traversing into array elements
 53            for i, item in enumerate(obj):
 54                item_path = f"{path}[{i}]"
 55                self._traverse(item, item_path)
 56
 57    def _analyze_array(self, array: list[Any], path: str) -> dict[str, Any]:
 58        """
 59        Analyze an array to detect its structure and patterns.
 60
 61        Args:
 62            array: Array to analyze
 63            path: Path to this array
 64
 65        Returns:
 66            Analysis of the array
 67        """
 68        if not array:
 69            return {"path": path, "length": 0, "type": "empty", "element_types": []}
 70
 71        # Determine element types
 72        element_types = [type(item).__name__ for item in array]
 73        unique_types = list(set(element_types))
 74
 75        # For arrays of objects, detect common keys
 76        if all(isinstance(item, dict) for item in array):
 77            all_keys = [set(item.keys()) for item in array]
 78            common_keys = set.intersection(*all_keys) if all_keys else set()
 79            all_keys_union = set.union(*all_keys) if all_keys else set()
 80            optional_keys = all_keys_union - common_keys
 81
 82            # Detect structure pattern
 83            structure_signature = tuple(sorted(common_keys))
 84            self.structure_patterns[structure_signature].append(path)
 85
 86            return {
 87                "path": path,
 88                "length": len(array),
 89                "type": "object_array",
 90                "element_types": unique_types,
 91                "common_keys": sorted(common_keys),
 92                "optional_keys": sorted(optional_keys),
 93                "all_keys": sorted(all_keys_union),
 94                "structure_signature": structure_signature,
 95                "is_homogeneous": len(all_keys) == 1,
 96            }
 97        else:
 98            return {
 99                "path": path,
100                "length": len(array),
101                "type": "primitive_array" if len(unique_types) == 1 else "mixed_array",
102                "element_types": unique_types,
103            }
104
105    def get_table_candidates(self) -> list[dict[str, Any]]:
106        """
107        Get arrays that are good candidates for conversion to tables.
108
109        Returns:
110            List of array information suitable for table generation
111        """
112        candidates = []
113
114        for array_info in self.arrays_found:
115            if array_info.get("type") == "object_array":
116                # Object arrays are good table candidates
117                candidates.append(
118                    {
119                        "path": array_info["path"],
120                        "table_name": self._path_to_table_name(array_info["path"]),
121                        "row_count": array_info["length"],
122                        "columns": array_info["all_keys"],
123                        "required_columns": array_info["common_keys"],
124                        "optional_columns": array_info["optional_keys"],
125                    }
126                )
127
128        return candidates
129
130    def _path_to_table_name(self, path: str) -> str:
131        """
132        Convert a path to a suggested table name.
133
134        Args:
135            path: Path like "actions" or "communities"
136
137        Returns:
138            Suggested table name
139        """
140        # Remove array indices and clean up
141        parts = path.replace("[", ".").replace("]", "").split(".")
142        parts = [p for p in parts if p and not p.isdigit()]
143
144        # Use the last meaningful part or join all
145        if parts:
146            return "_".join(parts).upper()
147        return "UNKNOWN_TABLE"
148
149    def print_summary(self, analysis: dict[str, Any]) -> None:
150        """
151        Print a human-readable summary of the analysis.
152
153        Args:
154            analysis: Analysis results from analyze()
155        """
156        print(f"\n{'=' * 60}")
157        print("STRUCTURE ANALYSIS SUMMARY")
158        print(f"{'=' * 60}")
159
160        print(f"\nTotal arrays found: {analysis['total_arrays']}")
161
162        print(f"\n{'-' * 60}")
163        print("ARRAYS DETECTED:")
164        print(f"{'-' * 60}")
165
166        for i, array_info in enumerate(analysis["arrays"], 1):
167            print(f"\n{i}. Path: {array_info['path']}")
168            print(f"   Type: {array_info['type']}")
169            print(f"   Length: {array_info['length']}")
170
171            if array_info["type"] == "object_array":
172                print(f"   Homogeneous: {array_info['is_homogeneous']}")
173                print(f"   Common keys ({len(array_info['common_keys'])}): {', '.join(array_info['common_keys'][:5])}")
174                if len(array_info["common_keys"]) > 5:
175                    print(f"      ... and {len(array_info['common_keys']) - 5} more")
176                if array_info["optional_keys"]:
177                    print(
178                        f"   Optional keys ({len(array_info['optional_keys'])}): {', '.join(list(array_info['optional_keys'])[:3])}"
179                    )
180
181        print(f"\n{'-' * 60}")
182        print("TABLE CANDIDATES:")
183        print(f"{'-' * 60}")
184
185        candidates = self.get_table_candidates()
186        for i, candidate in enumerate(candidates, 1):
187            print(f"\n{i}. Table: {candidate['table_name']}")
188            print(f"   Source: {candidate['path']}")
189            print(f"   Rows: {candidate['row_count']}")
190            print(f"   Columns: {len(candidate['columns'])}")
191            print(f"   Required: {len(candidate['required_columns'])}")
192
193        print(f"\n{'=' * 60}\n")
class StructureAnalyzer:
  8class StructureAnalyzer:
  9    """Analyze nested structures and detect repeating patterns."""
 10
 11    def __init__(self):
 12        """Initialize the structure analyzer."""
 13        self.arrays_found = []
 14        self.structure_patterns = defaultdict(list)
 15
 16    def analyze(self, data: dict[str, Any], path: str = "") -> dict[str, Any]:
 17        """
 18        Analyze data structure and detect repeating patterns.
 19
 20        Args:
 21            data: Dictionary to analyze
 22            path: Current path in the structure (for nested objects)
 23
 24        Returns:
 25            Analysis results including arrays and patterns
 26        """
 27        self._traverse(data, path)
 28
 29        return {
 30            "total_arrays": len(self.arrays_found),
 31            "arrays": self.arrays_found,
 32            "structure_patterns": dict(self.structure_patterns),
 33        }
 34
 35    def _traverse(self, obj: Any, path: str = "") -> None:
 36        """
 37        Recursively traverse the object structure.
 38
 39        Args:
 40            obj: Object to traverse
 41            path: Current path
 42        """
 43        if isinstance(obj, dict):
 44            for key, value in obj.items():
 45                current_path = f"{path}.{key}" if path else key
 46                self._traverse(value, current_path)
 47
 48        elif isinstance(obj, list) and obj:
 49            # Found an array
 50            array_info = self._analyze_array(obj, path)
 51            self.arrays_found.append(array_info)
 52
 53            # Continue traversing into array elements
 54            for i, item in enumerate(obj):
 55                item_path = f"{path}[{i}]"
 56                self._traverse(item, item_path)
 57
 58    def _analyze_array(self, array: list[Any], path: str) -> dict[str, Any]:
 59        """
 60        Analyze an array to detect its structure and patterns.
 61
 62        Args:
 63            array: Array to analyze
 64            path: Path to this array
 65
 66        Returns:
 67            Analysis of the array
 68        """
 69        if not array:
 70            return {"path": path, "length": 0, "type": "empty", "element_types": []}
 71
 72        # Determine element types
 73        element_types = [type(item).__name__ for item in array]
 74        unique_types = list(set(element_types))
 75
 76        # For arrays of objects, detect common keys
 77        if all(isinstance(item, dict) for item in array):
 78            all_keys = [set(item.keys()) for item in array]
 79            common_keys = set.intersection(*all_keys) if all_keys else set()
 80            all_keys_union = set.union(*all_keys) if all_keys else set()
 81            optional_keys = all_keys_union - common_keys
 82
 83            # Detect structure pattern
 84            structure_signature = tuple(sorted(common_keys))
 85            self.structure_patterns[structure_signature].append(path)
 86
 87            return {
 88                "path": path,
 89                "length": len(array),
 90                "type": "object_array",
 91                "element_types": unique_types,
 92                "common_keys": sorted(common_keys),
 93                "optional_keys": sorted(optional_keys),
 94                "all_keys": sorted(all_keys_union),
 95                "structure_signature": structure_signature,
 96                "is_homogeneous": len(all_keys) == 1,
 97            }
 98        else:
 99            return {
100                "path": path,
101                "length": len(array),
102                "type": "primitive_array" if len(unique_types) == 1 else "mixed_array",
103                "element_types": unique_types,
104            }
105
106    def get_table_candidates(self) -> list[dict[str, Any]]:
107        """
108        Get arrays that are good candidates for conversion to tables.
109
110        Returns:
111            List of array information suitable for table generation
112        """
113        candidates = []
114
115        for array_info in self.arrays_found:
116            if array_info.get("type") == "object_array":
117                # Object arrays are good table candidates
118                candidates.append(
119                    {
120                        "path": array_info["path"],
121                        "table_name": self._path_to_table_name(array_info["path"]),
122                        "row_count": array_info["length"],
123                        "columns": array_info["all_keys"],
124                        "required_columns": array_info["common_keys"],
125                        "optional_columns": array_info["optional_keys"],
126                    }
127                )
128
129        return candidates
130
131    def _path_to_table_name(self, path: str) -> str:
132        """
133        Convert a path to a suggested table name.
134
135        Args:
136            path: Path like "actions" or "communities"
137
138        Returns:
139            Suggested table name
140        """
141        # Remove array indices and clean up
142        parts = path.replace("[", ".").replace("]", "").split(".")
143        parts = [p for p in parts if p and not p.isdigit()]
144
145        # Use the last meaningful part or join all
146        if parts:
147            return "_".join(parts).upper()
148        return "UNKNOWN_TABLE"
149
150    def print_summary(self, analysis: dict[str, Any]) -> None:
151        """
152        Print a human-readable summary of the analysis.
153
154        Args:
155            analysis: Analysis results from analyze()
156        """
157        print(f"\n{'=' * 60}")
158        print("STRUCTURE ANALYSIS SUMMARY")
159        print(f"{'=' * 60}")
160
161        print(f"\nTotal arrays found: {analysis['total_arrays']}")
162
163        print(f"\n{'-' * 60}")
164        print("ARRAYS DETECTED:")
165        print(f"{'-' * 60}")
166
167        for i, array_info in enumerate(analysis["arrays"], 1):
168            print(f"\n{i}. Path: {array_info['path']}")
169            print(f"   Type: {array_info['type']}")
170            print(f"   Length: {array_info['length']}")
171
172            if array_info["type"] == "object_array":
173                print(f"   Homogeneous: {array_info['is_homogeneous']}")
174                print(f"   Common keys ({len(array_info['common_keys'])}): {', '.join(array_info['common_keys'][:5])}")
175                if len(array_info["common_keys"]) > 5:
176                    print(f"      ... and {len(array_info['common_keys']) - 5} more")
177                if array_info["optional_keys"]:
178                    print(
179                        f"   Optional keys ({len(array_info['optional_keys'])}): {', '.join(list(array_info['optional_keys'])[:3])}"
180                    )
181
182        print(f"\n{'-' * 60}")
183        print("TABLE CANDIDATES:")
184        print(f"{'-' * 60}")
185
186        candidates = self.get_table_candidates()
187        for i, candidate in enumerate(candidates, 1):
188            print(f"\n{i}. Table: {candidate['table_name']}")
189            print(f"   Source: {candidate['path']}")
190            print(f"   Rows: {candidate['row_count']}")
191            print(f"   Columns: {len(candidate['columns'])}")
192            print(f"   Required: {len(candidate['required_columns'])}")
193
194        print(f"\n{'=' * 60}\n")

Analyze nested structures and detect repeating patterns.

StructureAnalyzer()
11    def __init__(self):
12        """Initialize the structure analyzer."""
13        self.arrays_found = []
14        self.structure_patterns = defaultdict(list)

Initialize the structure analyzer.

arrays_found
structure_patterns
def analyze( self, data: dict[str, typing.Any], path: str = '') -> dict[str, typing.Any]:
16    def analyze(self, data: dict[str, Any], path: str = "") -> dict[str, Any]:
17        """
18        Analyze data structure and detect repeating patterns.
19
20        Args:
21            data: Dictionary to analyze
22            path: Current path in the structure (for nested objects)
23
24        Returns:
25            Analysis results including arrays and patterns
26        """
27        self._traverse(data, path)
28
29        return {
30            "total_arrays": len(self.arrays_found),
31            "arrays": self.arrays_found,
32            "structure_patterns": dict(self.structure_patterns),
33        }

Analyze data structure and detect repeating patterns.

Arguments:
  • data: Dictionary to analyze
  • path: Current path in the structure (for nested objects)
Returns:

Analysis results including arrays and patterns

def get_table_candidates(self) -> list[dict[str, typing.Any]]:
106    def get_table_candidates(self) -> list[dict[str, Any]]:
107        """
108        Get arrays that are good candidates for conversion to tables.
109
110        Returns:
111            List of array information suitable for table generation
112        """
113        candidates = []
114
115        for array_info in self.arrays_found:
116            if array_info.get("type") == "object_array":
117                # Object arrays are good table candidates
118                candidates.append(
119                    {
120                        "path": array_info["path"],
121                        "table_name": self._path_to_table_name(array_info["path"]),
122                        "row_count": array_info["length"],
123                        "columns": array_info["all_keys"],
124                        "required_columns": array_info["common_keys"],
125                        "optional_columns": array_info["optional_keys"],
126                    }
127                )
128
129        return candidates

Get arrays that are good candidates for conversion to tables.

Returns:

List of array information suitable for table generation

def print_summary(self, analysis: dict[str, typing.Any]) -> None:
150    def print_summary(self, analysis: dict[str, Any]) -> None:
151        """
152        Print a human-readable summary of the analysis.
153
154        Args:
155            analysis: Analysis results from analyze()
156        """
157        print(f"\n{'=' * 60}")
158        print("STRUCTURE ANALYSIS SUMMARY")
159        print(f"{'=' * 60}")
160
161        print(f"\nTotal arrays found: {analysis['total_arrays']}")
162
163        print(f"\n{'-' * 60}")
164        print("ARRAYS DETECTED:")
165        print(f"{'-' * 60}")
166
167        for i, array_info in enumerate(analysis["arrays"], 1):
168            print(f"\n{i}. Path: {array_info['path']}")
169            print(f"   Type: {array_info['type']}")
170            print(f"   Length: {array_info['length']}")
171
172            if array_info["type"] == "object_array":
173                print(f"   Homogeneous: {array_info['is_homogeneous']}")
174                print(f"   Common keys ({len(array_info['common_keys'])}): {', '.join(array_info['common_keys'][:5])}")
175                if len(array_info["common_keys"]) > 5:
176                    print(f"      ... and {len(array_info['common_keys']) - 5} more")
177                if array_info["optional_keys"]:
178                    print(
179                        f"   Optional keys ({len(array_info['optional_keys'])}): {', '.join(list(array_info['optional_keys'])[:3])}"
180                    )
181
182        print(f"\n{'-' * 60}")
183        print("TABLE CANDIDATES:")
184        print(f"{'-' * 60}")
185
186        candidates = self.get_table_candidates()
187        for i, candidate in enumerate(candidates, 1):
188            print(f"\n{i}. Table: {candidate['table_name']}")
189            print(f"   Source: {candidate['path']}")
190            print(f"   Rows: {candidate['row_count']}")
191            print(f"   Columns: {len(candidate['columns'])}")
192            print(f"   Required: {len(candidate['required_columns'])}")
193
194        print(f"\n{'=' * 60}\n")

Print a human-readable summary of the analysis.

Arguments:
  • analysis: Analysis results from analyze()