schema_sentinel.metadata_manager.utils
1import os 2import random 3import string 4from configparser import ConfigParser 5 6 7def get_html(file_name): 8 with open(file_name) as file: 9 html = file.read() 10 return html 11 12 13def get_config(env: str, resources_path: str) -> ConfigParser: 14 """ 15 Reads a config file for and environment and returns a ConfigParser instance 16 :param env: Environment: dev, nonprod, cert or prod 17 :param resources_path: Resource path, where the config files are stored 18 :return: ConfigParser instance 19 """ 20 config = ConfigParser() 21 config.read_file(open(os.path.join(resources_path, f"db-{env}.properties")), "r") 22 return config 23 24 25def snake_case_split(s) -> str: 26 """ 27 The function would split a word in snake_case to separate words with first capital letter. 28 This was done to sve some space in page width when creating pdf 29 30 :param s: snake_case string like user_access_report 31 :return: split header as "User Access Report" 32 """ 33 return " ".join(x.capitalize() for x in s.split("_")) 34 35 36def camel_case_split(s): 37 """ 38 The function would split a word in camelCase to separate words. 39 This was done to sve some space in page width when creating pdf 40 41 :param s: camelCase string like userAccessReport 42 :return: split header as "user Access Report" 43 """ 44 idx = list(map(str.isupper, s)) 45 # mark change of case 46 indices = [0] 47 # Sliding window pattern: pair each element with next (strict=False is correct as idx[1:] is shorter) 48 for i, (x, y) in enumerate(zip(idx, idx[1:], strict=False)): 49 if x and not y: # "Ul" 50 indices.append(i) 51 elif not x and y: # "lU" 52 indices.append(i + 1) 53 indices.append(len(s)) 54 # for "lUl", index of "U" will pop twice, have to filer it 55 # Sliding window pattern: pair consecutive indices (strict=False is correct as indices[1:] is shorter) 56 return " ".join([s[x:y] for x, y in zip(indices, indices[1:], strict=False) if x < y]) 57 58 59GET_SCHEMA_DISCREPANCY_SQL = """ 60SELECT 61 "SCHEMA_DISCREPANCY_ID", 62 "ENVIRONMENT", 63 "DATABASE_NAME", 64 "SCHEMA_NAME", 65 "DB_OBJECT_TYPE", 66 "DB_OBJECT_NAME", 67 "DB_OBJECT_PARENT_TYPE", 68 "DB_OBJECT_PARENT_NAME", 69 "DDL", 70 "ACTION", 71 "ACTION_APPROVED_BY", 72 "ACTION_APPROVED_AT", 73 "KEEP_UNTIL", 74 "OBJECT_DEPENDENCIES", 75 "SYSTEM_CREATE_DATETIME", 76 "SYSTEM_UPDATE_DATETIME" 77FROM "MIGRATIONS"."SCHEMA_DISCREPANCY" 78WHERE 79 "ENVIRONMENT" = :environment 80 AND "database_name" = :database_name 81 AND "SCHEMA_NAME"=:schema_name 82 AND "db_object_type"=:db_object_type 83 AND "db_object_name"=:db_object_name 84""" 85 86ACCOUNT_MAP = { 87 "dev": "YOUR_DEV_ACCOUNT", 88 "staging": "YOUR_STAGING_ACCOUNT", 89 "prod": "YOUR_PROD_ACCOUNT", 90} 91 92ENV_MAP = {"dev": "DEV", "staging": "STAGING", "prod": "PROD"} 93 94# Example: Custom view filters for specific business logic 95# This is a template - customize based on your data model 96CUSTOM_VIEW_FILTERS = { 97 # Example: Filter views that need specific row-level security 98 "FILTER_BY_ACCOUNT": { 99 "TABLE_LIST": [ 100 # Add your table names here 101 # "ORDERS", 102 # "TRANSACTIONS", 103 ], 104 "FILTER": """AS {alias} WHERE EXISTS( 105 SELECT * 106 FROM SCHEMA.ACCOUNT AS A 107 WHERE {alias}.ACCOUNT_ID = A.ACCOUNT_ID)""", 108 }, 109 # Example: Exclude test data 110 "FILTER_OUT_TEST": { 111 "TABLE_LIST": [ 112 # "ACCOUNT", 113 ], 114 "FILTER": "WHERE NOT IS_TEST", 115 }, 116 # Example: Tables/views that don't need filtering 117 "NO_FILTER": { 118 "VIEWS": [ 119 # "REFERENCE_DATA", 120 # "LOOKUP_TABLES", 121 ] 122 }, 123 # Example: Tables to exclude from comparison 124 "EXCLUDE": { 125 "TABLE_LIST": [ 126 # "TEMP_TABLE", 127 # "STAGING_TABLE", 128 ] 129 }, 130} 131 132 133def exclude_table(table_name: str) -> bool: 134 """Check if table should be excluded from processing""" 135 return table_name in CUSTOM_VIEW_FILTERS.get("EXCLUDE", {}).get("TABLE_LIST", []) 136 137 138def get_filter(table_name: str) -> str: 139 """ 140 Get custom filter for a table based on configuration. 141 This is a template - customize based on your business logic. 142 """ 143 view_filter = "" 144 145 # Check each filter configuration 146 for filter_name, config in CUSTOM_VIEW_FILTERS.items(): 147 if filter_name == "EXCLUDE" or filter_name == "NO_FILTER": 148 continue 149 150 if table_name in config.get("TABLE_LIST", []): 151 view_filter = config["FILTER"] 152 if "{alias}" in view_filter: 153 view_filter = view_filter.replace("{alias}", get_alias(table_name)) 154 break 155 156 return view_filter 157 158 159def get_alias(table_name: str) -> str: 160 alias = "".join(x[0].upper() for x in table_name.split("_")) 161 alias += get_random_string(1) 162 return alias 163 164 165def get_random_string(length): 166 # choose from all lowercase letter 167 letters = string.ascii_lowercase 168 result_str = "".join(random.choice(letters) for i in range(length)) 169 return result_str
def
get_html(file_name):
def
get_config(env: str, resources_path: str) -> configparser.ConfigParser:
14def get_config(env: str, resources_path: str) -> ConfigParser: 15 """ 16 Reads a config file for and environment and returns a ConfigParser instance 17 :param env: Environment: dev, nonprod, cert or prod 18 :param resources_path: Resource path, where the config files are stored 19 :return: ConfigParser instance 20 """ 21 config = ConfigParser() 22 config.read_file(open(os.path.join(resources_path, f"db-{env}.properties")), "r") 23 return config
Reads a config file for and environment and returns a ConfigParser instance
Parameters
- env: Environment: dev, nonprod, cert or prod
- resources_path: Resource path, where the config files are stored
Returns
ConfigParser instance
def
snake_case_split(s) -> str:
26def snake_case_split(s) -> str: 27 """ 28 The function would split a word in snake_case to separate words with first capital letter. 29 This was done to sve some space in page width when creating pdf 30 31 :param s: snake_case string like user_access_report 32 :return: split header as "User Access Report" 33 """ 34 return " ".join(x.capitalize() for x in s.split("_"))
The function would split a word in snake_case to separate words with first capital letter. This was done to sve some space in page width when creating pdf
Parameters
- s: snake_case string like user_access_report
Returns
split header as "User Access Report"
def
camel_case_split(s):
37def camel_case_split(s): 38 """ 39 The function would split a word in camelCase to separate words. 40 This was done to sve some space in page width when creating pdf 41 42 :param s: camelCase string like userAccessReport 43 :return: split header as "user Access Report" 44 """ 45 idx = list(map(str.isupper, s)) 46 # mark change of case 47 indices = [0] 48 # Sliding window pattern: pair each element with next (strict=False is correct as idx[1:] is shorter) 49 for i, (x, y) in enumerate(zip(idx, idx[1:], strict=False)): 50 if x and not y: # "Ul" 51 indices.append(i) 52 elif not x and y: # "lU" 53 indices.append(i + 1) 54 indices.append(len(s)) 55 # for "lUl", index of "U" will pop twice, have to filer it 56 # Sliding window pattern: pair consecutive indices (strict=False is correct as indices[1:] is shorter) 57 return " ".join([s[x:y] for x, y in zip(indices, indices[1:], strict=False) if x < y])
The function would split a word in camelCase to separate words. This was done to sve some space in page width when creating pdf
Parameters
- s: camelCase string like userAccessReport
Returns
split header as "user Access Report"
GET_SCHEMA_DISCREPANCY_SQL =
'\nSELECT\n "SCHEMA_DISCREPANCY_ID",\n "ENVIRONMENT",\n "DATABASE_NAME",\n "SCHEMA_NAME",\n "DB_OBJECT_TYPE",\n "DB_OBJECT_NAME",\n "DB_OBJECT_PARENT_TYPE",\n "DB_OBJECT_PARENT_NAME",\n "DDL",\n "ACTION",\n "ACTION_APPROVED_BY",\n "ACTION_APPROVED_AT",\n "KEEP_UNTIL",\n "OBJECT_DEPENDENCIES",\n "SYSTEM_CREATE_DATETIME",\n "SYSTEM_UPDATE_DATETIME"\nFROM "MIGRATIONS"."SCHEMA_DISCREPANCY"\nWHERE\n "ENVIRONMENT" = :environment\n AND "database_name" = :database_name\n AND "SCHEMA_NAME"=:schema_name\n AND "db_object_type"=:db_object_type\n AND "db_object_name"=:db_object_name\n'
ACCOUNT_MAP =
{'dev': 'YOUR_DEV_ACCOUNT', 'staging': 'YOUR_STAGING_ACCOUNT', 'prod': 'YOUR_PROD_ACCOUNT'}
ENV_MAP =
{'dev': 'DEV', 'staging': 'STAGING', 'prod': 'PROD'}
CUSTOM_VIEW_FILTERS =
{'FILTER_BY_ACCOUNT': {'TABLE_LIST': [], 'FILTER': 'AS {alias} WHERE EXISTS(\n SELECT *\n FROM SCHEMA.ACCOUNT AS A\n WHERE {alias}.ACCOUNT_ID = A.ACCOUNT_ID)'}, 'FILTER_OUT_TEST': {'TABLE_LIST': [], 'FILTER': 'WHERE NOT IS_TEST'}, 'NO_FILTER': {'VIEWS': []}, 'EXCLUDE': {'TABLE_LIST': []}}
def
exclude_table(table_name: str) -> bool:
134def exclude_table(table_name: str) -> bool: 135 """Check if table should be excluded from processing""" 136 return table_name in CUSTOM_VIEW_FILTERS.get("EXCLUDE", {}).get("TABLE_LIST", [])
Check if table should be excluded from processing
def
get_filter(table_name: str) -> str:
139def get_filter(table_name: str) -> str: 140 """ 141 Get custom filter for a table based on configuration. 142 This is a template - customize based on your business logic. 143 """ 144 view_filter = "" 145 146 # Check each filter configuration 147 for filter_name, config in CUSTOM_VIEW_FILTERS.items(): 148 if filter_name == "EXCLUDE" or filter_name == "NO_FILTER": 149 continue 150 151 if table_name in config.get("TABLE_LIST", []): 152 view_filter = config["FILTER"] 153 if "{alias}" in view_filter: 154 view_filter = view_filter.replace("{alias}", get_alias(table_name)) 155 break 156 157 return view_filter
Get custom filter for a table based on configuration. This is a template - customize based on your business logic.
def
get_alias(table_name: str) -> str:
def
get_random_string(length):