Coverage for intelligence_toolkit/generate_mock_data/schema_builder.py: 68%
335 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
4import json
5from enum import Enum
7import jsonschema
8import pandas as pd
10ValidationResult = Enum('ValidationResult', 'VALID SCHEMA_INVALID OBJECT_INVALID')
12class StringFormat(Enum):
13 DATE = 'date'
14 TIME = 'time'
15 DATE_TIME = 'date-time'
16 DURATION = 'duration'
17 URI = 'uri'
18 EMAIL = 'email'
19 IDN_EMAIL = 'idn-email'
20 HOSTNAME = 'hostname'
21 IDN_HOSTNAME = 'idn-hostname'
22 IPV4 = 'ipv4'
23 IPV6 = 'ipv6'
24 REGEX = 'regex'
25 UUID = 'uuid'
26 JSON_POINTER = 'json-pointer'
27 RELATIVE_JSON_POINTER = 'relative-json-pointer'
28 IRI = 'iri'
29 IRI_REFERENCE = 'iri-reference'
30 URI_REFERENCE = 'uri-reference'
31 URI_TEMPLATE = 'uri-template'
32 URI_TEMPLATE_EXPRESSION = 'uri-template-expression'
33 URI_TEMPLATE_FRAGMENT = 'uri-template-fragment'
35class FieldType(Enum):
36 OBJECT = 'object'
37 ARRAY = 'array'
38 STRING = 'string'
39 NUMBER = 'number'
40 BOOLEAN = 'boolean'
42class ArrayFieldType(Enum): # Disallow arrays of arrays
43 OBJECT = 'object'
44 STRING = 'string'
45 NUMBER = 'number'
46 BOOLEAN = 'boolean'
48class PrimitiveFieldType(Enum):
49 STRING = 'string'
50 NUMBER = 'number'
51 BOOLEAN = 'boolean'
53def _get_field_label_number(schema, field_label):
54 '''
55 Returns the number of times the field_label appears in the schema.
56 '''
57 count = 0
58 for key, value in schema.items():
59 root_label = '_'.join(key.split('_')[:-1])
60 if root_label == field_label:
61 count += 1
62 if isinstance(value, dict):
63 count += _get_field_label_number(value, field_label)
64 return count
66def _get_unique_field_label(schema, field_label):
67 if field_label != "":
68 new_suffix = _get_field_label_number(schema, field_label) + 1
69 field_label = field_label + '_' + str(new_suffix)
70 return field_label
72def get_subobject(json_obj, field_labels):
73 current_obj = json_obj
74 for label in field_labels:
75 if label in current_obj:
76 current_obj = current_obj[label]
77 elif 'properties' in current_obj and label in current_obj['properties']:
78 current_obj = current_obj['properties'][label]
79 elif 'items' in current_obj and 'properties' in current_obj['items'] and label in current_obj['items']['properties']:
80 current_obj = current_obj['items']['properties'][label]
81 if 'items' in current_obj:
82 current_obj = current_obj['items']['properties']
83 elif 'properties' in current_obj:
84 current_obj = current_obj['properties']
85 return current_obj
87def get_required_list(json_obj, field_labels):
88 current_obj = json_obj
89 for label in field_labels:
90 if label in current_obj:
91 current_obj = current_obj[label]
92 elif 'properties' in current_obj and label in current_obj['properties']:
93 current_obj = current_obj['properties'][label]
94 elif 'items' in current_obj and 'properties' in current_obj['items'] and label in current_obj['items']['properties']:
95 current_obj = current_obj['items']['properties'][label]
96 if 'items' in current_obj:
97 current_obj = current_obj['items']
98 return current_obj['required']
100def create_boilerplate_schema(
101 schema_field="http://json-schema.org/draft/2020-12/schema",
102 title_field="Example Schema",
103 description_field="An example schema ready to be edited and populated with fields.",
104 ):
105 schema = {
106 "$schema": schema_field,
107 "title": title_field,
108 "description": description_field,
109 "type": "object",
110 "properties": {
111 "records": {
112 "type": "array",
113 "description": "An array of records",
114 "items": {
115 "type": "object",
116 "properties": {},
117 "required": [],
118 "additionalProperties": False
119 }
120 }
121 },
122 "required": ["records"],
123 "additionalProperties": False
124 }
125 return schema
127def add_object_field(
128 global_schema,
129 field_location,
130 field_label="object",
131 field_description=""
132 ):
133 # if field_description == "":
134 # field_description = f"An object field"
135 use_field_label = _get_unique_field_label(global_schema, field_label)
136 field_location[use_field_label] = {
137 "type": "object",
138 "description": field_description,
139 "properties": {},
140 "required": [],
141 "additionalProperties": False,
142 }
143 return use_field_label
145def add_array_field(
146 global_schema,
147 field_location,
148 field_label="",
149 field_description="",
150 item_type: ArrayFieldType=ArrayFieldType.STRING
151 ):
152 if field_label == "":
153 field_label = f"{item_type.value}_array"
154 # if field_description == "":
155 # field_description = f"An array of {item_type.value}s"
156 use_field_label = _get_unique_field_label(global_schema, field_label)
157 if item_type == ArrayFieldType.OBJECT:
158 field_location[use_field_label] = {
159 "type": "array",
160 "description": field_description,
161 "items": {
162 "type": "object",
163 "properties": {},
164 "required": [],
165 "additionalProperties": False
166 }
167 }
168 else:
169 field_location[use_field_label] = {
170 "type": "array",
171 "description": field_description,
172 "items": {
173 "type": item_type.value
174 }
175 }
176 return use_field_label
178def add_primitive_field(
179 global_schema,
180 field_location,
181 field_label="",
182 field_description="",
183 field_type: PrimitiveFieldType=PrimitiveFieldType.STRING
184 ):
185 if field_label == "":
186 field_label = field_type.value
187 # if field_description == "":
188 # field_description = f"A {field_type.value} field"
189 use_field_label = _get_unique_field_label(global_schema, field_label)
190 field_location[use_field_label] = {
191 "type": field_type.value,
192 "description": field_description
193 }
194 return use_field_label
196def set_string_min_length(string_field, min_length):
197 if min_length is None:
198 string_field.pop('minLength', None)
199 else:
200 string_field['minLength'] = min_length
202def set_string_max_length(string_field, max_length):
203 if max_length is None:
204 string_field.pop('maxLength', None)
205 else:
206 string_field['maxLength'] = max_length
208def set_string_pattern(string_field, pattern):
209 if pattern is None:
210 string_field.pop('pattern', None)
211 else:
212 string_field['pattern'] = pattern
214def set_string_format(string_field, string_format: StringFormat | None):
215 if string_format is None:
216 string_field.pop('format', None)
217 else:
218 string_field['format'] = string_format.value
220def clear_string_constraints(string_field):
221 string_field.pop('minLength', None)
222 string_field.pop('maxLength', None)
223 string_field.pop('pattern', None)
224 string_field.pop('format', None)
226def set_number_minimum(number_field, minimum, exclusive):
227 if minimum is None:
228 if exclusive:
229 number_field.pop('exclusiveMinimum', None)
230 else:
231 number_field.pop('minimum', None)
232 else:
233 if exclusive:
234 number_field['exclusiveMinimum'] = minimum
235 number_field.pop('minimum', None)
236 else:
237 number_field['minimum'] = minimum
238 number_field.pop('exclusiveMinimum', None)
240def set_number_maximum(number_field, maximum, exclusive):
241 if maximum is None:
242 if exclusive:
243 number_field.pop('exclusiveMaximum', None)
244 else:
245 number_field.pop('maximum', None)
246 else:
247 if exclusive:
248 number_field['exclusiveMaximum'] = maximum
249 number_field.pop('maximum', None)
250 else:
251 number_field['maximum'] = maximum
252 number_field.pop('exclusiveMaximum', None)
254def set_number_multiple_of(number_field, multiple_of):
255 if multiple_of is None:
256 number_field.pop('multipleOf', None)
257 else:
258 number_field['multipleOf'] = multiple_of
260def clear_number_constraints(number_field):
261 number_field.pop('minimum', None)
262 number_field.pop('maximum', None)
263 number_field.pop('exclusiveMinimum', None)
264 number_field.pop('exclusiveMaximum', None)
265 number_field.pop('multipleOf', None)
267def rename_field(global_schema, field_location, nesting, old_label, new_label):
268 set_required_field_status(global_schema, nesting, old_label, False)
269 key_order = list(field_location.keys())
270 # Ensures key order is stable
271 for key in key_order:
272 if key == old_label:
273 field_location[new_label] = field_location.pop(key)
274 else:
275 field_location[key] = field_location.pop(key)
276 # Ensures required order matches field order
277 set_required_field_status(global_schema, nesting, new_label, True)
279def delete_field(global_schema, nesting, field_location, key):
280 field_location.pop(key)
281 set_required_field_status(global_schema, nesting, key, False)
283def move_field_up(global_schema, nesting, field_location, label):
284 key_order = list(field_location.keys())
285 key_index = key_order.index(label)
286 if key_index > 0:
287 key_order[key_index - 1], key_order[key_index] = key_order[key_index], key_order[key_index - 1]
288 # Ensures key order is stable
289 for ix, key in enumerate(key_order[key_index - 1:]):
290 field_location[key] = field_location.pop(key)
291 reqs = get_required_list(global_schema, nesting)
292 reqs.sort(key=lambda x : key_order.index(x))
294def move_field_down(global_schema, nesting, field_location, label):
295 key_order = list(field_location.keys())
296 key_index = key_order.index(label)
297 # Move the field down by one position
298 if key_index < len(key_order) - 1:
299 key_order[key_index + 1], key_order[key_index] = key_order[key_index], key_order[key_index + 1]
300 # Ensures key order is stable
301 for ix, key in enumerate(key_order[key_index:]):
302 field_location[key] = field_location.pop(key)
303 reqs = get_required_list(global_schema, nesting)
304 reqs.sort(key=lambda x : key_order.index(x))
306def set_required_field_status(schema, nesting, field_label, required):
307 reqs = get_required_list(schema, nesting)
308 if required and field_label not in reqs:
309 reqs.append(field_label)
310 elif not required and field_label in reqs:
311 reqs.remove(field_label)
312 obj = get_subobject(schema, nesting)
313 key_order = list(obj.keys())
314 reqs.sort(key=lambda x : key_order.index(x))
317def set_enum_field_status(schema, nesting, field_label, constrained):
318 obj = get_subobject(schema, nesting)
319 typ = obj[field_label]['type']
320 changed = False
321 if typ != 'array':
322 if constrained and 'enum' not in obj[field_label]:
323 changed = True
324 if typ == 'string':
325 obj[field_label]['enum'] = [""]
326 elif typ == 'number':
327 obj[field_label]['enum'] = [0]
328 elif typ == 'boolean':
329 obj[field_label]['enum'] = [True, False]
330 else:
331 changed = False
332 elif not constrained and 'enum' in obj[field_label]:
333 obj[field_label].pop('enum')
334 changed = True
335 else:
336 if constrained and 'enum' not in obj[field_label]['items']:
337 changed = True
338 item_typ = obj[field_label]['items']['type']
339 if item_typ == 'string':
340 obj[field_label]['items']['enum'] = [""]
341 elif item_typ == 'number':
342 obj[field_label]['items']['enum'] = [0]
343 else:
344 changed = False
345 elif not constrained and 'enum' in obj[field_label]['items']:
346 obj[field_label]['items'].pop('enum')
347 changed = True
348 return changed
350def set_additional_field_status(schema, nesting, field_label, additional):
351 obj = get_subobject(schema, nesting)
352 typ = obj[field_label]['type']
353 changed = False
354 if typ == 'object':
355 obj[field_label]['additionalProperties'] = additional
356 elif typ == 'array' and obj[field_label]['items']['type'] == 'object':
357 obj[field_label]['items']['additionalProperties'] = additional
358 return changed
360def generate_object_from_schema(json_schema):
361 '''
362 The json_schema is a JSON Schema in which values are described as follows:
363 "value": {
364 "description": "Description of the value",
365 "type": "<type of the value>"
366 }
367 The type can be "string", "number", "boolean", "array", or "object".
368 The generated template contains empty/null values for primitives, empty arrays for arrays of primitives, and empty objects as the sole elements of arrays of objects.
369'''
370 def generate_template(schema):
371 if 'type' not in schema:
372 return None
373 if schema['type'] == 'object':
374 if 'properties' not in schema:
375 return None
376 return {k: generate_template(v) for k, v in schema['properties'].items()}
377 elif schema['type'] == 'array':
378 if schema['items']['type'] == 'string':
379 if 'enum' in schema['items'] and len(schema['items'] ['enum']) > 0:
380 return [schema['items'] ['enum'][0]]
381 else:
382 return []
383 elif schema['items']['type'] == 'number':
384 if 'enum' in schema['items'] and len(schema['items'] ['enum']) > 0:
385 return [schema['items'] ['enum'][0]]
386 else:
387 return []
388 elif schema['items']['type'] == 'boolean':
389 return []
390 else:
391 return [generate_template(schema['items'])]
392 elif schema['type'] == 'string':
393 if 'enum' in schema and len(schema['enum']) > 0:
394 return schema['enum'][0]
395 else:
396 return ''
397 elif schema['type'] == 'number':
398 if 'enum' in schema and len(schema['enum']) > 0:
399 return schema['enum'][0]
400 else:
401 return _get_constrained_value(schema)
402 elif schema['type'] == 'boolean':
403 if 'enum' in schema and len(schema['enum']) > 0:
404 return schema['enum'][0]
405 else:
406 return False
407 else:
408 return None
410 return generate_template(json_schema)
412def convert_to_dataframe(json_obj):
413 df = pd.json_normalize(json_obj)
414 return df
416def _get_constrained_value(schema):
417 if 'minimum' in schema:
418 if 'multipleOf' in schema:
419 min = schema['minimum']
420 mult = schema['multipleOf']
421 # find the smallest multiple of mult that is greater than or equal to min
422 return min + mult - (min % mult)
423 else:
424 return schema['minimum']
425 elif 'maximum' in schema:
426 if 'multipleOf' in schema:
427 max = schema['maximum']
428 mult = schema['multipleOf']
429 # find the largest multiple of mult that is less than or equal to max
430 return max - (max % mult)
431 else:
432 return schema['maximum']
433 else:
434 return 0
436def evaluate_object_and_schema(obj, schema):
437 try:
438 jsonschema.validate(obj, schema)
439 return ValidationResult.VALID
440 except jsonschema.exceptions.ValidationError:
441 #check if it's is invalid because there's an empty field
442 if isinstance(obj, dict):
443 for key, value in obj.items():
444 if not value or (isinstance(value, str) and value.strip() != ''):
445 return ValidationResult.OBJECT_INVALID
446 except jsonschema.exceptions.SchemaError:
447 return ValidationResult.SCHEMA_INVALID
449def evaluate_schema(schema):
450 obj = generate_object_from_schema(schema)
451 return evaluate_object_and_schema(obj, schema)
453def test():
454 print('Creating schema')
455 schema = create_boilerplate_schema()
456 print(evaluate_schema(schema))
458 print('Adding first string field')
459 add_primitive_field(
460 schema=schema,
461 field_type=PrimitiveFieldType.STRING
462 )
463 print(evaluate_schema(schema))
465 print('Adding second string field')
466 add_primitive_field(
467 schema=schema,
468 field_type=PrimitiveFieldType.STRING
469 )
470 print(evaluate_schema(schema))
472 print('Adding nested object field')
473 obj_label = add_object_field(
474 schema=schema
475 )
476 print(evaluate_schema(schema))
478 print('Adding nested string field')
479 add_primitive_field(
480 schema=schema,
481 nesting=[obj_label],
482 field_type=PrimitiveFieldType.STRING
483 )
484 print(evaluate_schema(schema))
486 print('Adding nested string field')
487 add_primitive_field(
488 schema=schema,
489 nesting=[obj_label],
490 field_type=PrimitiveFieldType.STRING
491 )
492 print(evaluate_schema(schema))
494 print('Adding nested number field')
495 add_primitive_field(
496 schema=schema,
497 nesting=[obj_label],
498 field_type=PrimitiveFieldType.NUMBER
499 )
500 print(evaluate_schema(schema))
502 print('Adding nested boolean field')
503 add_primitive_field(
504 schema=schema,
505 nesting=[obj_label],
506 field_type=PrimitiveFieldType.BOOLEAN
507 )
508 print(evaluate_schema(schema))
510 print('Adding nested string array')
511 add_array_field(
512 schema=schema,
513 nesting=[obj_label],
514 item_type=PrimitiveFieldType.STRING
515 )
516 print(evaluate_schema(schema))
518 print('Adding nested object array')
519 arr_label = add_array_field(
520 schema=schema,
521 nesting=[obj_label],
522 item_type=ArrayFieldType.OBJECT
523 )
524 print(evaluate_schema(schema))
526 print('Adding boolean to objects of nested array')
527 add_primitive_field(
528 schema=schema,
529 nesting=[obj_label, arr_label],
530 field_type=PrimitiveFieldType.BOOLEAN
531 )
532 print(evaluate_schema(schema))
534 print('Final schema')
535 print(json.dumps(schema, indent=2))
537def main():
538 test()
540if __name__ == "__main__":
541 main()