yaml_shredder.structure_analyzer
Analyze and detect repeating structures in YAML/JSON data.
1"""Analyze and detect repeating structures in YAML/JSON data.""" 2 3from collections import defaultdict 4from typing import Any 5 6 7class StructureAnalyzer: 8 """Analyze nested structures and detect repeating patterns.""" 9 10 def __init__(self): 11 """Initialize the structure analyzer.""" 12 self.arrays_found = [] 13 self.structure_patterns = defaultdict(list) 14 15 def analyze(self, data: dict[str, Any], path: str = "") -> dict[str, Any]: 16 """ 17 Analyze data structure and detect repeating patterns. 18 19 Args: 20 data: Dictionary to analyze 21 path: Current path in the structure (for nested objects) 22 23 Returns: 24 Analysis results including arrays and patterns 25 """ 26 self._traverse(data, path) 27 28 return { 29 "total_arrays": len(self.arrays_found), 30 "arrays": self.arrays_found, 31 "structure_patterns": dict(self.structure_patterns), 32 } 33 34 def _traverse(self, obj: Any, path: str = "") -> None: 35 """ 36 Recursively traverse the object structure. 37 38 Args: 39 obj: Object to traverse 40 path: Current path 41 """ 42 if isinstance(obj, dict): 43 for key, value in obj.items(): 44 current_path = f"{path}.{key}" if path else key 45 self._traverse(value, current_path) 46 47 elif isinstance(obj, list) and obj: 48 # Found an array 49 array_info = self._analyze_array(obj, path) 50 self.arrays_found.append(array_info) 51 52 # Continue traversing into array elements 53 for i, item in enumerate(obj): 54 item_path = f"{path}[{i}]" 55 self._traverse(item, item_path) 56 57 def _analyze_array(self, array: list[Any], path: str) -> dict[str, Any]: 58 """ 59 Analyze an array to detect its structure and patterns. 60 61 Args: 62 array: Array to analyze 63 path: Path to this array 64 65 Returns: 66 Analysis of the array 67 """ 68 if not array: 69 return {"path": path, "length": 0, "type": "empty", "element_types": []} 70 71 # Determine element types 72 element_types = [type(item).__name__ for item in array] 73 unique_types = list(set(element_types)) 74 75 # For arrays of objects, detect common keys 76 if all(isinstance(item, dict) for item in array): 77 all_keys = [set(item.keys()) for item in array] 78 common_keys = set.intersection(*all_keys) if all_keys else set() 79 all_keys_union = set.union(*all_keys) if all_keys else set() 80 optional_keys = all_keys_union - common_keys 81 82 # Detect structure pattern 83 structure_signature = tuple(sorted(common_keys)) 84 self.structure_patterns[structure_signature].append(path) 85 86 return { 87 "path": path, 88 "length": len(array), 89 "type": "object_array", 90 "element_types": unique_types, 91 "common_keys": sorted(common_keys), 92 "optional_keys": sorted(optional_keys), 93 "all_keys": sorted(all_keys_union), 94 "structure_signature": structure_signature, 95 "is_homogeneous": len(all_keys) == 1, 96 } 97 else: 98 return { 99 "path": path, 100 "length": len(array), 101 "type": "primitive_array" if len(unique_types) == 1 else "mixed_array", 102 "element_types": unique_types, 103 } 104 105 def get_table_candidates(self) -> list[dict[str, Any]]: 106 """ 107 Get arrays that are good candidates for conversion to tables. 108 109 Returns: 110 List of array information suitable for table generation 111 """ 112 candidates = [] 113 114 for array_info in self.arrays_found: 115 if array_info.get("type") == "object_array": 116 # Object arrays are good table candidates 117 candidates.append( 118 { 119 "path": array_info["path"], 120 "table_name": self._path_to_table_name(array_info["path"]), 121 "row_count": array_info["length"], 122 "columns": array_info["all_keys"], 123 "required_columns": array_info["common_keys"], 124 "optional_columns": array_info["optional_keys"], 125 } 126 ) 127 128 return candidates 129 130 def _path_to_table_name(self, path: str) -> str: 131 """ 132 Convert a path to a suggested table name. 133 134 Args: 135 path: Path like "actions" or "communities" 136 137 Returns: 138 Suggested table name 139 """ 140 # Remove array indices and clean up 141 parts = path.replace("[", ".").replace("]", "").split(".") 142 parts = [p for p in parts if p and not p.isdigit()] 143 144 # Use the last meaningful part or join all 145 if parts: 146 return "_".join(parts).upper() 147 return "UNKNOWN_TABLE" 148 149 def print_summary(self, analysis: dict[str, Any]) -> None: 150 """ 151 Print a human-readable summary of the analysis. 152 153 Args: 154 analysis: Analysis results from analyze() 155 """ 156 print(f"\n{'=' * 60}") 157 print("STRUCTURE ANALYSIS SUMMARY") 158 print(f"{'=' * 60}") 159 160 print(f"\nTotal arrays found: {analysis['total_arrays']}") 161 162 print(f"\n{'-' * 60}") 163 print("ARRAYS DETECTED:") 164 print(f"{'-' * 60}") 165 166 for i, array_info in enumerate(analysis["arrays"], 1): 167 print(f"\n{i}. Path: {array_info['path']}") 168 print(f" Type: {array_info['type']}") 169 print(f" Length: {array_info['length']}") 170 171 if array_info["type"] == "object_array": 172 print(f" Homogeneous: {array_info['is_homogeneous']}") 173 print(f" Common keys ({len(array_info['common_keys'])}): {', '.join(array_info['common_keys'][:5])}") 174 if len(array_info["common_keys"]) > 5: 175 print(f" ... and {len(array_info['common_keys']) - 5} more") 176 if array_info["optional_keys"]: 177 print( 178 f" Optional keys ({len(array_info['optional_keys'])}): {', '.join(list(array_info['optional_keys'])[:3])}" 179 ) 180 181 print(f"\n{'-' * 60}") 182 print("TABLE CANDIDATES:") 183 print(f"{'-' * 60}") 184 185 candidates = self.get_table_candidates() 186 for i, candidate in enumerate(candidates, 1): 187 print(f"\n{i}. Table: {candidate['table_name']}") 188 print(f" Source: {candidate['path']}") 189 print(f" Rows: {candidate['row_count']}") 190 print(f" Columns: {len(candidate['columns'])}") 191 print(f" Required: {len(candidate['required_columns'])}") 192 193 print(f"\n{'=' * 60}\n")
class
StructureAnalyzer:
8class StructureAnalyzer: 9 """Analyze nested structures and detect repeating patterns.""" 10 11 def __init__(self): 12 """Initialize the structure analyzer.""" 13 self.arrays_found = [] 14 self.structure_patterns = defaultdict(list) 15 16 def analyze(self, data: dict[str, Any], path: str = "") -> dict[str, Any]: 17 """ 18 Analyze data structure and detect repeating patterns. 19 20 Args: 21 data: Dictionary to analyze 22 path: Current path in the structure (for nested objects) 23 24 Returns: 25 Analysis results including arrays and patterns 26 """ 27 self._traverse(data, path) 28 29 return { 30 "total_arrays": len(self.arrays_found), 31 "arrays": self.arrays_found, 32 "structure_patterns": dict(self.structure_patterns), 33 } 34 35 def _traverse(self, obj: Any, path: str = "") -> None: 36 """ 37 Recursively traverse the object structure. 38 39 Args: 40 obj: Object to traverse 41 path: Current path 42 """ 43 if isinstance(obj, dict): 44 for key, value in obj.items(): 45 current_path = f"{path}.{key}" if path else key 46 self._traverse(value, current_path) 47 48 elif isinstance(obj, list) and obj: 49 # Found an array 50 array_info = self._analyze_array(obj, path) 51 self.arrays_found.append(array_info) 52 53 # Continue traversing into array elements 54 for i, item in enumerate(obj): 55 item_path = f"{path}[{i}]" 56 self._traverse(item, item_path) 57 58 def _analyze_array(self, array: list[Any], path: str) -> dict[str, Any]: 59 """ 60 Analyze an array to detect its structure and patterns. 61 62 Args: 63 array: Array to analyze 64 path: Path to this array 65 66 Returns: 67 Analysis of the array 68 """ 69 if not array: 70 return {"path": path, "length": 0, "type": "empty", "element_types": []} 71 72 # Determine element types 73 element_types = [type(item).__name__ for item in array] 74 unique_types = list(set(element_types)) 75 76 # For arrays of objects, detect common keys 77 if all(isinstance(item, dict) for item in array): 78 all_keys = [set(item.keys()) for item in array] 79 common_keys = set.intersection(*all_keys) if all_keys else set() 80 all_keys_union = set.union(*all_keys) if all_keys else set() 81 optional_keys = all_keys_union - common_keys 82 83 # Detect structure pattern 84 structure_signature = tuple(sorted(common_keys)) 85 self.structure_patterns[structure_signature].append(path) 86 87 return { 88 "path": path, 89 "length": len(array), 90 "type": "object_array", 91 "element_types": unique_types, 92 "common_keys": sorted(common_keys), 93 "optional_keys": sorted(optional_keys), 94 "all_keys": sorted(all_keys_union), 95 "structure_signature": structure_signature, 96 "is_homogeneous": len(all_keys) == 1, 97 } 98 else: 99 return { 100 "path": path, 101 "length": len(array), 102 "type": "primitive_array" if len(unique_types) == 1 else "mixed_array", 103 "element_types": unique_types, 104 } 105 106 def get_table_candidates(self) -> list[dict[str, Any]]: 107 """ 108 Get arrays that are good candidates for conversion to tables. 109 110 Returns: 111 List of array information suitable for table generation 112 """ 113 candidates = [] 114 115 for array_info in self.arrays_found: 116 if array_info.get("type") == "object_array": 117 # Object arrays are good table candidates 118 candidates.append( 119 { 120 "path": array_info["path"], 121 "table_name": self._path_to_table_name(array_info["path"]), 122 "row_count": array_info["length"], 123 "columns": array_info["all_keys"], 124 "required_columns": array_info["common_keys"], 125 "optional_columns": array_info["optional_keys"], 126 } 127 ) 128 129 return candidates 130 131 def _path_to_table_name(self, path: str) -> str: 132 """ 133 Convert a path to a suggested table name. 134 135 Args: 136 path: Path like "actions" or "communities" 137 138 Returns: 139 Suggested table name 140 """ 141 # Remove array indices and clean up 142 parts = path.replace("[", ".").replace("]", "").split(".") 143 parts = [p for p in parts if p and not p.isdigit()] 144 145 # Use the last meaningful part or join all 146 if parts: 147 return "_".join(parts).upper() 148 return "UNKNOWN_TABLE" 149 150 def print_summary(self, analysis: dict[str, Any]) -> None: 151 """ 152 Print a human-readable summary of the analysis. 153 154 Args: 155 analysis: Analysis results from analyze() 156 """ 157 print(f"\n{'=' * 60}") 158 print("STRUCTURE ANALYSIS SUMMARY") 159 print(f"{'=' * 60}") 160 161 print(f"\nTotal arrays found: {analysis['total_arrays']}") 162 163 print(f"\n{'-' * 60}") 164 print("ARRAYS DETECTED:") 165 print(f"{'-' * 60}") 166 167 for i, array_info in enumerate(analysis["arrays"], 1): 168 print(f"\n{i}. Path: {array_info['path']}") 169 print(f" Type: {array_info['type']}") 170 print(f" Length: {array_info['length']}") 171 172 if array_info["type"] == "object_array": 173 print(f" Homogeneous: {array_info['is_homogeneous']}") 174 print(f" Common keys ({len(array_info['common_keys'])}): {', '.join(array_info['common_keys'][:5])}") 175 if len(array_info["common_keys"]) > 5: 176 print(f" ... and {len(array_info['common_keys']) - 5} more") 177 if array_info["optional_keys"]: 178 print( 179 f" Optional keys ({len(array_info['optional_keys'])}): {', '.join(list(array_info['optional_keys'])[:3])}" 180 ) 181 182 print(f"\n{'-' * 60}") 183 print("TABLE CANDIDATES:") 184 print(f"{'-' * 60}") 185 186 candidates = self.get_table_candidates() 187 for i, candidate in enumerate(candidates, 1): 188 print(f"\n{i}. Table: {candidate['table_name']}") 189 print(f" Source: {candidate['path']}") 190 print(f" Rows: {candidate['row_count']}") 191 print(f" Columns: {len(candidate['columns'])}") 192 print(f" Required: {len(candidate['required_columns'])}") 193 194 print(f"\n{'=' * 60}\n")
Analyze nested structures and detect repeating patterns.
StructureAnalyzer()
11 def __init__(self): 12 """Initialize the structure analyzer.""" 13 self.arrays_found = [] 14 self.structure_patterns = defaultdict(list)
Initialize the structure analyzer.
def
analyze( self, data: dict[str, typing.Any], path: str = '') -> dict[str, typing.Any]:
16 def analyze(self, data: dict[str, Any], path: str = "") -> dict[str, Any]: 17 """ 18 Analyze data structure and detect repeating patterns. 19 20 Args: 21 data: Dictionary to analyze 22 path: Current path in the structure (for nested objects) 23 24 Returns: 25 Analysis results including arrays and patterns 26 """ 27 self._traverse(data, path) 28 29 return { 30 "total_arrays": len(self.arrays_found), 31 "arrays": self.arrays_found, 32 "structure_patterns": dict(self.structure_patterns), 33 }
Analyze data structure and detect repeating patterns.
Arguments:
- data: Dictionary to analyze
- path: Current path in the structure (for nested objects)
Returns:
Analysis results including arrays and patterns
def
get_table_candidates(self) -> list[dict[str, typing.Any]]:
106 def get_table_candidates(self) -> list[dict[str, Any]]: 107 """ 108 Get arrays that are good candidates for conversion to tables. 109 110 Returns: 111 List of array information suitable for table generation 112 """ 113 candidates = [] 114 115 for array_info in self.arrays_found: 116 if array_info.get("type") == "object_array": 117 # Object arrays are good table candidates 118 candidates.append( 119 { 120 "path": array_info["path"], 121 "table_name": self._path_to_table_name(array_info["path"]), 122 "row_count": array_info["length"], 123 "columns": array_info["all_keys"], 124 "required_columns": array_info["common_keys"], 125 "optional_columns": array_info["optional_keys"], 126 } 127 ) 128 129 return candidates
Get arrays that are good candidates for conversion to tables.
Returns:
List of array information suitable for table generation
def
print_summary(self, analysis: dict[str, typing.Any]) -> None:
150 def print_summary(self, analysis: dict[str, Any]) -> None: 151 """ 152 Print a human-readable summary of the analysis. 153 154 Args: 155 analysis: Analysis results from analyze() 156 """ 157 print(f"\n{'=' * 60}") 158 print("STRUCTURE ANALYSIS SUMMARY") 159 print(f"{'=' * 60}") 160 161 print(f"\nTotal arrays found: {analysis['total_arrays']}") 162 163 print(f"\n{'-' * 60}") 164 print("ARRAYS DETECTED:") 165 print(f"{'-' * 60}") 166 167 for i, array_info in enumerate(analysis["arrays"], 1): 168 print(f"\n{i}. Path: {array_info['path']}") 169 print(f" Type: {array_info['type']}") 170 print(f" Length: {array_info['length']}") 171 172 if array_info["type"] == "object_array": 173 print(f" Homogeneous: {array_info['is_homogeneous']}") 174 print(f" Common keys ({len(array_info['common_keys'])}): {', '.join(array_info['common_keys'][:5])}") 175 if len(array_info["common_keys"]) > 5: 176 print(f" ... and {len(array_info['common_keys']) - 5} more") 177 if array_info["optional_keys"]: 178 print( 179 f" Optional keys ({len(array_info['optional_keys'])}): {', '.join(list(array_info['optional_keys'])[:3])}" 180 ) 181 182 print(f"\n{'-' * 60}") 183 print("TABLE CANDIDATES:") 184 print(f"{'-' * 60}") 185 186 candidates = self.get_table_candidates() 187 for i, candidate in enumerate(candidates, 1): 188 print(f"\n{i}. Table: {candidate['table_name']}") 189 print(f" Source: {candidate['path']}") 190 print(f" Rows: {candidate['row_count']}") 191 print(f" Columns: {len(candidate['columns'])}") 192 print(f" Required: {len(candidate['required_columns'])}") 193 194 print(f"\n{'=' * 60}\n")
Print a human-readable summary of the analysis.
Arguments:
- analysis: Analysis results from analyze()