Coverage for src/dataknobs_data/pandas/metadata.py: 0%

123 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:23 -0700

1"""Metadata preservation for DataKnobs-Pandas conversions.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from enum import Enum 

7from typing import Any, TYPE_CHECKING 

8 

9import pandas as pd 

10 

11if TYPE_CHECKING: 

12 from dataknobs_data.records import Record 

13 

14 

15class MetadataStrategy(Enum): 

16 """Strategy for handling metadata during conversion.""" 

17 NONE = "none" # Don't preserve metadata 

18 ATTRS = "attrs" # Store in DataFrame.attrs 

19 COLUMNS = "columns" # Store as additional columns 

20 MULTI_INDEX = "multi_index" # Use multi-level column index 

21 

22 

23@dataclass 

24class MetadataConfig: 

25 """Configuration for metadata handling.""" 

26 strategy: MetadataStrategy = MetadataStrategy.ATTRS 

27 include_record_metadata: bool = True 

28 include_field_metadata: bool = True 

29 metadata_prefix: str = "_meta_" 

30 preserve_record_ids: bool = True 

31 

32 

33class MetadataHandler: 

34 """Handles metadata preservation during conversions.""" 

35 

36 def __init__(self, config: MetadataConfig | None = None): 

37 """Initialize metadata handler. 

38  

39 Args: 

40 config: Metadata configuration 

41 """ 

42 self.config = config or MetadataConfig() 

43 

44 def extract_metadata_from_records(self, records: list[Record]) -> dict[str, Any]: 

45 """Extract metadata from records. 

46  

47 Args: 

48 records: List of records 

49  

50 Returns: 

51 Dictionary of metadata 

52 """ 

53 metadata = { 

54 "record_count": len(records), 

55 "has_record_ids": all(r.id for r in records), 

56 "field_names": self._get_all_field_names(records), 

57 "field_types": self._get_field_types(records), 

58 } 

59 

60 if self.config.include_record_metadata: 

61 metadata["record_metadata"] = self._extract_record_metadata(records) 

62 

63 if self.config.include_field_metadata: 

64 metadata["field_metadata"] = self._extract_field_metadata(records) 

65 

66 return metadata 

67 

68 def apply_metadata_to_dataframe( 

69 self, 

70 df: pd.DataFrame, 

71 metadata: dict[str, Any], 

72 records: list[Record] | None = None 

73 ) -> pd.DataFrame: 

74 """Apply metadata to DataFrame based on strategy. 

75  

76 Args: 

77 df: Target DataFrame 

78 metadata: Metadata to apply 

79 records: Original records (for additional metadata) 

80  

81 Returns: 

82 DataFrame with metadata 

83 """ 

84 if self.config.strategy == MetadataStrategy.NONE: 

85 return df 

86 

87 elif self.config.strategy == MetadataStrategy.ATTRS: 

88 df.attrs.update(metadata) # type: ignore[arg-type] 

89 if records and self.config.preserve_record_ids: 

90 record_ids = [r.id for r in records] 

91 df.attrs["record_ids"] = record_ids 

92 

93 elif self.config.strategy == MetadataStrategy.COLUMNS: 

94 # Add metadata as columns 

95 if self.config.include_record_metadata and records: 

96 for key, values in self._get_record_metadata_columns(records).items(): 

97 col_name = f"{self.config.metadata_prefix}{key}" 

98 df[col_name] = values 

99 

100 elif self.config.strategy == MetadataStrategy.MULTI_INDEX: 

101 # Create multi-level column index with metadata 

102 if "field_types" in metadata: 

103 arrays = [ 

104 df.columns.tolist(), 

105 [metadata["field_types"].get(col, "unknown") for col in df.columns] 

106 ] 

107 df.columns = pd.MultiIndex.from_arrays( 

108 arrays, 

109 names=["field_name", "field_type"] 

110 ) 

111 

112 return df 

113 

114 def extract_metadata_from_dataframe(self, df: pd.DataFrame) -> dict[str, Any]: 

115 """Extract metadata from DataFrame. 

116  

117 Args: 

118 df: Source DataFrame 

119  

120 Returns: 

121 Dictionary of metadata 

122 """ 

123 metadata = {} 

124 

125 if self.config.strategy == MetadataStrategy.ATTRS: 

126 # Convert attrs keys to strings for consistency 

127 for key, value in df.attrs.items(): 

128 if key is not None: 

129 metadata[str(key)] = value 

130 

131 elif self.config.strategy == MetadataStrategy.COLUMNS: 

132 # Extract from metadata columns 

133 meta_cols = [col for col in df.columns if col.startswith(self.config.metadata_prefix)] 

134 for col in meta_cols: 

135 key = col.replace(self.config.metadata_prefix, "") 

136 metadata[key] = df[col].tolist() 

137 

138 elif self.config.strategy == MetadataStrategy.MULTI_INDEX: 

139 # Extract from multi-level index 

140 if isinstance(df.columns, pd.MultiIndex): 

141 metadata["field_names"] = df.columns.get_level_values(0).tolist() 

142 if df.columns.nlevels > 1: 

143 metadata["field_types"] = df.columns.get_level_values(1).tolist() 

144 

145 return metadata 

146 

147 def create_records_with_metadata( 

148 self, 

149 df: pd.DataFrame, 

150 base_records: list[Record], 

151 metadata: dict[str, Any] | None = None 

152 ) -> list[Record]: 

153 """Create records with preserved metadata. 

154  

155 Args: 

156 df: Source DataFrame 

157 base_records: Base records from conversion 

158 metadata: Additional metadata 

159  

160 Returns: 

161 Records with metadata 

162 """ 

163 if not metadata: 

164 metadata = self.extract_metadata_from_dataframe(df) 

165 

166 # Apply record IDs if preserved 

167 if "record_ids" in metadata and len(metadata["record_ids"]) == len(base_records): 

168 for record, record_id in zip(base_records, metadata["record_ids"], strict=False): 

169 if record_id: 

170 record.id = record_id 

171 

172 # Apply record metadata if present 

173 if "record_metadata" in metadata: 

174 record_meta = metadata["record_metadata"] 

175 for i, record in enumerate(base_records): 

176 if i < len(record_meta) and record_meta[i]: 

177 record.metadata = record_meta[i] 

178 

179 # Apply field metadata if present 

180 if "field_metadata" in metadata: 

181 field_meta = metadata["field_metadata"] 

182 for record in base_records: 

183 for field_name, field in record.fields.items(): 

184 if field_name in field_meta: 

185 field.metadata = field_meta[field_name] 

186 

187 return base_records 

188 

189 def _get_all_field_names(self, records: list[Record]) -> list[str]: 

190 """Get all unique field names from records.""" 

191 field_names = set() 

192 for record in records: 

193 field_names.update(record.fields.keys()) 

194 return sorted(field_names) 

195 

196 def _get_field_types(self, records: list[Record]) -> dict[str, str]: 

197 """Get field types from records.""" 

198 field_types = {} 

199 for record in records: 

200 for field_name, field in record.fields.items(): 

201 if field_name not in field_types and field.type: 

202 field_types[field_name] = field.type.value 

203 return field_types 

204 

205 def _extract_record_metadata(self, records: list[Record]) -> list[dict[str, Any]]: 

206 """Extract metadata from each record.""" 

207 return [r.metadata if r.metadata else {} for r in records] 

208 

209 def _extract_field_metadata(self, records: list[Record]) -> dict[str, dict[str, Any]]: 

210 """Extract metadata from fields.""" 

211 field_metadata = {} 

212 for record in records: 

213 for field_name, field in record.fields.items(): 

214 if field.metadata and field_name not in field_metadata: 

215 field_metadata[field_name] = field.metadata 

216 return field_metadata 

217 

218 def _get_record_metadata_columns(self, records: list[Record]) -> dict[str, list]: 

219 """Get record metadata as column data.""" 

220 columns = {} 

221 

222 # Collect all metadata keys 

223 all_keys = set() 

224 for record in records: 

225 if record.metadata: 

226 all_keys.update(record.metadata.keys()) 

227 

228 # Create column for each metadata key 

229 for key in all_keys: 

230 values = [] 

231 for record in records: 

232 value = record.metadata.get(key) if record.metadata else None 

233 values.append(value) 

234 columns[key] = values 

235 

236 return columns 

237 

238 def clean_dataframe_columns(self, df: pd.DataFrame) -> pd.DataFrame: 

239 """Remove metadata columns from DataFrame. 

240  

241 Args: 

242 df: DataFrame to clean 

243  

244 Returns: 

245 DataFrame without metadata columns 

246 """ 

247 if self.config.strategy == MetadataStrategy.COLUMNS: 

248 # Remove metadata columns 

249 meta_cols = [col for col in df.columns if col.startswith(self.config.metadata_prefix)] 

250 return df.drop(columns=meta_cols) 

251 

252 elif self.config.strategy == MetadataStrategy.MULTI_INDEX: 

253 # Flatten multi-index to single level 

254 if isinstance(df.columns, pd.MultiIndex): 

255 df.columns = df.columns.get_level_values(0) 

256 

257 return df