Coverage for intelligence_toolkit/generate_mock_data/schema_builder.py: 68%

335 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4import json 

5from enum import Enum 

6 

7import jsonschema 

8import pandas as pd 

9 

10ValidationResult = Enum('ValidationResult', 'VALID SCHEMA_INVALID OBJECT_INVALID') 

11 

12class StringFormat(Enum): 

13 DATE = 'date' 

14 TIME = 'time' 

15 DATE_TIME = 'date-time' 

16 DURATION = 'duration' 

17 URI = 'uri' 

18 EMAIL = 'email' 

19 IDN_EMAIL = 'idn-email' 

20 HOSTNAME = 'hostname' 

21 IDN_HOSTNAME = 'idn-hostname' 

22 IPV4 = 'ipv4' 

23 IPV6 = 'ipv6' 

24 REGEX = 'regex' 

25 UUID = 'uuid' 

26 JSON_POINTER = 'json-pointer' 

27 RELATIVE_JSON_POINTER = 'relative-json-pointer' 

28 IRI = 'iri' 

29 IRI_REFERENCE = 'iri-reference' 

30 URI_REFERENCE = 'uri-reference' 

31 URI_TEMPLATE = 'uri-template' 

32 URI_TEMPLATE_EXPRESSION = 'uri-template-expression' 

33 URI_TEMPLATE_FRAGMENT = 'uri-template-fragment' 

34 

35class FieldType(Enum): 

36 OBJECT = 'object' 

37 ARRAY = 'array' 

38 STRING = 'string' 

39 NUMBER = 'number' 

40 BOOLEAN = 'boolean' 

41 

42class ArrayFieldType(Enum): # Disallow arrays of arrays 

43 OBJECT = 'object' 

44 STRING = 'string' 

45 NUMBER = 'number' 

46 BOOLEAN = 'boolean' 

47 

48class PrimitiveFieldType(Enum): 

49 STRING = 'string' 

50 NUMBER = 'number' 

51 BOOLEAN = 'boolean' 

52 

53def _get_field_label_number(schema, field_label): 

54 ''' 

55 Returns the number of times the field_label appears in the schema. 

56 ''' 

57 count = 0 

58 for key, value in schema.items(): 

59 root_label = '_'.join(key.split('_')[:-1]) 

60 if root_label == field_label: 

61 count += 1 

62 if isinstance(value, dict): 

63 count += _get_field_label_number(value, field_label) 

64 return count 

65 

66def _get_unique_field_label(schema, field_label): 

67 if field_label != "": 

68 new_suffix = _get_field_label_number(schema, field_label) + 1 

69 field_label = field_label + '_' + str(new_suffix) 

70 return field_label 

71 

72def get_subobject(json_obj, field_labels): 

73 current_obj = json_obj 

74 for label in field_labels: 

75 if label in current_obj: 

76 current_obj = current_obj[label] 

77 elif 'properties' in current_obj and label in current_obj['properties']: 

78 current_obj = current_obj['properties'][label] 

79 elif 'items' in current_obj and 'properties' in current_obj['items'] and label in current_obj['items']['properties']: 

80 current_obj = current_obj['items']['properties'][label] 

81 if 'items' in current_obj: 

82 current_obj = current_obj['items']['properties'] 

83 elif 'properties' in current_obj: 

84 current_obj = current_obj['properties'] 

85 return current_obj 

86 

87def get_required_list(json_obj, field_labels): 

88 current_obj = json_obj 

89 for label in field_labels: 

90 if label in current_obj: 

91 current_obj = current_obj[label] 

92 elif 'properties' in current_obj and label in current_obj['properties']: 

93 current_obj = current_obj['properties'][label] 

94 elif 'items' in current_obj and 'properties' in current_obj['items'] and label in current_obj['items']['properties']: 

95 current_obj = current_obj['items']['properties'][label] 

96 if 'items' in current_obj: 

97 current_obj = current_obj['items'] 

98 return current_obj['required'] 

99 

100def create_boilerplate_schema( 

101 schema_field="http://json-schema.org/draft/2020-12/schema", 

102 title_field="Example Schema", 

103 description_field="An example schema ready to be edited and populated with fields.", 

104 ): 

105 schema = { 

106 "$schema": schema_field, 

107 "title": title_field, 

108 "description": description_field, 

109 "type": "object", 

110 "properties": { 

111 "records": { 

112 "type": "array", 

113 "description": "An array of records", 

114 "items": { 

115 "type": "object", 

116 "properties": {}, 

117 "required": [], 

118 "additionalProperties": False 

119 } 

120 } 

121 }, 

122 "required": ["records"], 

123 "additionalProperties": False 

124 } 

125 return schema 

126 

127def add_object_field( 

128 global_schema, 

129 field_location, 

130 field_label="object", 

131 field_description="" 

132 ): 

133 # if field_description == "": 

134 # field_description = f"An object field" 

135 use_field_label = _get_unique_field_label(global_schema, field_label) 

136 field_location[use_field_label] = { 

137 "type": "object", 

138 "description": field_description, 

139 "properties": {}, 

140 "required": [], 

141 "additionalProperties": False, 

142 } 

143 return use_field_label 

144 

145def add_array_field( 

146 global_schema, 

147 field_location, 

148 field_label="", 

149 field_description="", 

150 item_type: ArrayFieldType=ArrayFieldType.STRING 

151 ): 

152 if field_label == "": 

153 field_label = f"{item_type.value}_array" 

154 # if field_description == "": 

155 # field_description = f"An array of {item_type.value}s" 

156 use_field_label = _get_unique_field_label(global_schema, field_label) 

157 if item_type == ArrayFieldType.OBJECT: 

158 field_location[use_field_label] = { 

159 "type": "array", 

160 "description": field_description, 

161 "items": { 

162 "type": "object", 

163 "properties": {}, 

164 "required": [], 

165 "additionalProperties": False 

166 } 

167 } 

168 else: 

169 field_location[use_field_label] = { 

170 "type": "array", 

171 "description": field_description, 

172 "items": { 

173 "type": item_type.value 

174 } 

175 } 

176 return use_field_label 

177 

178def add_primitive_field( 

179 global_schema, 

180 field_location, 

181 field_label="", 

182 field_description="", 

183 field_type: PrimitiveFieldType=PrimitiveFieldType.STRING 

184 ): 

185 if field_label == "": 

186 field_label = field_type.value 

187 # if field_description == "": 

188 # field_description = f"A {field_type.value} field" 

189 use_field_label = _get_unique_field_label(global_schema, field_label) 

190 field_location[use_field_label] = { 

191 "type": field_type.value, 

192 "description": field_description 

193 } 

194 return use_field_label 

195 

196def set_string_min_length(string_field, min_length): 

197 if min_length is None: 

198 string_field.pop('minLength', None) 

199 else: 

200 string_field['minLength'] = min_length 

201 

202def set_string_max_length(string_field, max_length): 

203 if max_length is None: 

204 string_field.pop('maxLength', None) 

205 else: 

206 string_field['maxLength'] = max_length 

207 

208def set_string_pattern(string_field, pattern): 

209 if pattern is None: 

210 string_field.pop('pattern', None) 

211 else: 

212 string_field['pattern'] = pattern 

213 

214def set_string_format(string_field, string_format: StringFormat | None): 

215 if string_format is None: 

216 string_field.pop('format', None) 

217 else: 

218 string_field['format'] = string_format.value 

219 

220def clear_string_constraints(string_field): 

221 string_field.pop('minLength', None) 

222 string_field.pop('maxLength', None) 

223 string_field.pop('pattern', None) 

224 string_field.pop('format', None) 

225 

226def set_number_minimum(number_field, minimum, exclusive): 

227 if minimum is None: 

228 if exclusive: 

229 number_field.pop('exclusiveMinimum', None) 

230 else: 

231 number_field.pop('minimum', None) 

232 else: 

233 if exclusive: 

234 number_field['exclusiveMinimum'] = minimum 

235 number_field.pop('minimum', None) 

236 else: 

237 number_field['minimum'] = minimum 

238 number_field.pop('exclusiveMinimum', None) 

239 

240def set_number_maximum(number_field, maximum, exclusive): 

241 if maximum is None: 

242 if exclusive: 

243 number_field.pop('exclusiveMaximum', None) 

244 else: 

245 number_field.pop('maximum', None) 

246 else: 

247 if exclusive: 

248 number_field['exclusiveMaximum'] = maximum 

249 number_field.pop('maximum', None) 

250 else: 

251 number_field['maximum'] = maximum 

252 number_field.pop('exclusiveMaximum', None) 

253 

254def set_number_multiple_of(number_field, multiple_of): 

255 if multiple_of is None: 

256 number_field.pop('multipleOf', None) 

257 else: 

258 number_field['multipleOf'] = multiple_of 

259 

260def clear_number_constraints(number_field): 

261 number_field.pop('minimum', None) 

262 number_field.pop('maximum', None) 

263 number_field.pop('exclusiveMinimum', None) 

264 number_field.pop('exclusiveMaximum', None) 

265 number_field.pop('multipleOf', None) 

266 

267def rename_field(global_schema, field_location, nesting, old_label, new_label): 

268 set_required_field_status(global_schema, nesting, old_label, False) 

269 key_order = list(field_location.keys()) 

270 # Ensures key order is stable 

271 for key in key_order: 

272 if key == old_label: 

273 field_location[new_label] = field_location.pop(key) 

274 else: 

275 field_location[key] = field_location.pop(key) 

276 # Ensures required order matches field order 

277 set_required_field_status(global_schema, nesting, new_label, True) 

278 

279def delete_field(global_schema, nesting, field_location, key): 

280 field_location.pop(key) 

281 set_required_field_status(global_schema, nesting, key, False) 

282 

283def move_field_up(global_schema, nesting, field_location, label): 

284 key_order = list(field_location.keys()) 

285 key_index = key_order.index(label) 

286 if key_index > 0: 

287 key_order[key_index - 1], key_order[key_index] = key_order[key_index], key_order[key_index - 1] 

288 # Ensures key order is stable 

289 for ix, key in enumerate(key_order[key_index - 1:]): 

290 field_location[key] = field_location.pop(key) 

291 reqs = get_required_list(global_schema, nesting) 

292 reqs.sort(key=lambda x : key_order.index(x)) 

293 

294def move_field_down(global_schema, nesting, field_location, label): 

295 key_order = list(field_location.keys()) 

296 key_index = key_order.index(label) 

297 # Move the field down by one position 

298 if key_index < len(key_order) - 1: 

299 key_order[key_index + 1], key_order[key_index] = key_order[key_index], key_order[key_index + 1] 

300 # Ensures key order is stable 

301 for ix, key in enumerate(key_order[key_index:]): 

302 field_location[key] = field_location.pop(key) 

303 reqs = get_required_list(global_schema, nesting) 

304 reqs.sort(key=lambda x : key_order.index(x)) 

305 

306def set_required_field_status(schema, nesting, field_label, required): 

307 reqs = get_required_list(schema, nesting) 

308 if required and field_label not in reqs: 

309 reqs.append(field_label) 

310 elif not required and field_label in reqs: 

311 reqs.remove(field_label) 

312 obj = get_subobject(schema, nesting) 

313 key_order = list(obj.keys()) 

314 reqs.sort(key=lambda x : key_order.index(x)) 

315 

316 

317def set_enum_field_status(schema, nesting, field_label, constrained): 

318 obj = get_subobject(schema, nesting) 

319 typ = obj[field_label]['type'] 

320 changed = False 

321 if typ != 'array': 

322 if constrained and 'enum' not in obj[field_label]: 

323 changed = True 

324 if typ == 'string': 

325 obj[field_label]['enum'] = [""] 

326 elif typ == 'number': 

327 obj[field_label]['enum'] = [0] 

328 elif typ == 'boolean': 

329 obj[field_label]['enum'] = [True, False] 

330 else: 

331 changed = False 

332 elif not constrained and 'enum' in obj[field_label]: 

333 obj[field_label].pop('enum') 

334 changed = True 

335 else: 

336 if constrained and 'enum' not in obj[field_label]['items']: 

337 changed = True 

338 item_typ = obj[field_label]['items']['type'] 

339 if item_typ == 'string': 

340 obj[field_label]['items']['enum'] = [""] 

341 elif item_typ == 'number': 

342 obj[field_label]['items']['enum'] = [0] 

343 else: 

344 changed = False 

345 elif not constrained and 'enum' in obj[field_label]['items']: 

346 obj[field_label]['items'].pop('enum') 

347 changed = True 

348 return changed 

349 

350def set_additional_field_status(schema, nesting, field_label, additional): 

351 obj = get_subobject(schema, nesting) 

352 typ = obj[field_label]['type'] 

353 changed = False 

354 if typ == 'object': 

355 obj[field_label]['additionalProperties'] = additional 

356 elif typ == 'array' and obj[field_label]['items']['type'] == 'object': 

357 obj[field_label]['items']['additionalProperties'] = additional 

358 return changed 

359 

360def generate_object_from_schema(json_schema): 

361 ''' 

362 The json_schema is a JSON Schema in which values are described as follows: 

363 "value": { 

364 "description": "Description of the value", 

365 "type": "<type of the value>" 

366 } 

367 The type can be "string", "number", "boolean", "array", or "object". 

368 The generated template contains empty/null values for primitives, empty arrays for arrays of primitives, and empty objects as the sole elements of arrays of objects. 

369''' 

370 def generate_template(schema): 

371 if 'type' not in schema: 

372 return None 

373 if schema['type'] == 'object': 

374 if 'properties' not in schema: 

375 return None 

376 return {k: generate_template(v) for k, v in schema['properties'].items()} 

377 elif schema['type'] == 'array': 

378 if schema['items']['type'] == 'string': 

379 if 'enum' in schema['items'] and len(schema['items'] ['enum']) > 0: 

380 return [schema['items'] ['enum'][0]] 

381 else: 

382 return [] 

383 elif schema['items']['type'] == 'number': 

384 if 'enum' in schema['items'] and len(schema['items'] ['enum']) > 0: 

385 return [schema['items'] ['enum'][0]] 

386 else: 

387 return [] 

388 elif schema['items']['type'] == 'boolean': 

389 return [] 

390 else: 

391 return [generate_template(schema['items'])] 

392 elif schema['type'] == 'string': 

393 if 'enum' in schema and len(schema['enum']) > 0: 

394 return schema['enum'][0] 

395 else: 

396 return '' 

397 elif schema['type'] == 'number': 

398 if 'enum' in schema and len(schema['enum']) > 0: 

399 return schema['enum'][0] 

400 else: 

401 return _get_constrained_value(schema) 

402 elif schema['type'] == 'boolean': 

403 if 'enum' in schema and len(schema['enum']) > 0: 

404 return schema['enum'][0] 

405 else: 

406 return False 

407 else: 

408 return None 

409 

410 return generate_template(json_schema) 

411 

412def convert_to_dataframe(json_obj): 

413 df = pd.json_normalize(json_obj) 

414 return df 

415 

416def _get_constrained_value(schema): 

417 if 'minimum' in schema: 

418 if 'multipleOf' in schema: 

419 min = schema['minimum'] 

420 mult = schema['multipleOf'] 

421 # find the smallest multiple of mult that is greater than or equal to min 

422 return min + mult - (min % mult) 

423 else: 

424 return schema['minimum'] 

425 elif 'maximum' in schema: 

426 if 'multipleOf' in schema: 

427 max = schema['maximum'] 

428 mult = schema['multipleOf'] 

429 # find the largest multiple of mult that is less than or equal to max 

430 return max - (max % mult) 

431 else: 

432 return schema['maximum'] 

433 else: 

434 return 0 

435 

436def evaluate_object_and_schema(obj, schema): 

437 try: 

438 jsonschema.validate(obj, schema) 

439 return ValidationResult.VALID 

440 except jsonschema.exceptions.ValidationError: 

441 #check if it's is invalid because there's an empty field 

442 if isinstance(obj, dict): 

443 for key, value in obj.items(): 

444 if not value or (isinstance(value, str) and value.strip() != ''): 

445 return ValidationResult.OBJECT_INVALID 

446 except jsonschema.exceptions.SchemaError: 

447 return ValidationResult.SCHEMA_INVALID 

448 

449def evaluate_schema(schema): 

450 obj = generate_object_from_schema(schema) 

451 return evaluate_object_and_schema(obj, schema) 

452 

453def test(): 

454 print('Creating schema') 

455 schema = create_boilerplate_schema() 

456 print(evaluate_schema(schema)) 

457 

458 print('Adding first string field') 

459 add_primitive_field( 

460 schema=schema, 

461 field_type=PrimitiveFieldType.STRING 

462 ) 

463 print(evaluate_schema(schema)) 

464 

465 print('Adding second string field') 

466 add_primitive_field( 

467 schema=schema, 

468 field_type=PrimitiveFieldType.STRING 

469 ) 

470 print(evaluate_schema(schema)) 

471 

472 print('Adding nested object field') 

473 obj_label = add_object_field( 

474 schema=schema 

475 ) 

476 print(evaluate_schema(schema)) 

477 

478 print('Adding nested string field') 

479 add_primitive_field( 

480 schema=schema, 

481 nesting=[obj_label], 

482 field_type=PrimitiveFieldType.STRING 

483 ) 

484 print(evaluate_schema(schema)) 

485 

486 print('Adding nested string field') 

487 add_primitive_field( 

488 schema=schema, 

489 nesting=[obj_label], 

490 field_type=PrimitiveFieldType.STRING 

491 ) 

492 print(evaluate_schema(schema)) 

493 

494 print('Adding nested number field') 

495 add_primitive_field( 

496 schema=schema, 

497 nesting=[obj_label], 

498 field_type=PrimitiveFieldType.NUMBER 

499 ) 

500 print(evaluate_schema(schema)) 

501 

502 print('Adding nested boolean field') 

503 add_primitive_field( 

504 schema=schema, 

505 nesting=[obj_label], 

506 field_type=PrimitiveFieldType.BOOLEAN 

507 ) 

508 print(evaluate_schema(schema)) 

509 

510 print('Adding nested string array') 

511 add_array_field( 

512 schema=schema, 

513 nesting=[obj_label], 

514 item_type=PrimitiveFieldType.STRING 

515 ) 

516 print(evaluate_schema(schema)) 

517 

518 print('Adding nested object array') 

519 arr_label = add_array_field( 

520 schema=schema, 

521 nesting=[obj_label], 

522 item_type=ArrayFieldType.OBJECT 

523 ) 

524 print(evaluate_schema(schema)) 

525 

526 print('Adding boolean to objects of nested array') 

527 add_primitive_field( 

528 schema=schema, 

529 nesting=[obj_label, arr_label], 

530 field_type=PrimitiveFieldType.BOOLEAN 

531 ) 

532 print(evaluate_schema(schema)) 

533 

534 print('Final schema') 

535 print(json.dumps(schema, indent=2)) 

536 

537def main(): 

538 test() 

539 

540if __name__ == "__main__": 

541 main()