schema_sentinel.metadata_manager.utils

  1import os
  2import random
  3import string
  4from configparser import ConfigParser
  5
  6
  7def get_html(file_name):
  8    with open(file_name) as file:
  9        html = file.read()
 10    return html
 11
 12
 13def get_config(env: str, resources_path: str) -> ConfigParser:
 14    """
 15    Reads a config file for and environment and returns a ConfigParser instance
 16    :param env: Environment: dev, nonprod, cert or prod
 17    :param resources_path: Resource path, where the config files are stored
 18    :return: ConfigParser instance
 19    """
 20    config = ConfigParser()
 21    config.read_file(open(os.path.join(resources_path, f"db-{env}.properties")), "r")
 22    return config
 23
 24
 25def snake_case_split(s) -> str:
 26    """
 27    The function would split a word in snake_case to separate words with first capital letter.
 28    This was done to sve some space in page width when creating pdf
 29
 30    :param s: snake_case string like user_access_report
 31    :return: split header as "User Access Report"
 32    """
 33    return " ".join(x.capitalize() for x in s.split("_"))
 34
 35
 36def camel_case_split(s):
 37    """
 38    The function would split a word in camelCase to separate words.
 39    This was done to sve some space in page width when creating pdf
 40
 41    :param s: camelCase string like userAccessReport
 42    :return: split header as "user Access Report"
 43    """
 44    idx = list(map(str.isupper, s))
 45    # mark change of case
 46    indices = [0]
 47    # Sliding window pattern: pair each element with next (strict=False is correct as idx[1:] is shorter)
 48    for i, (x, y) in enumerate(zip(idx, idx[1:], strict=False)):
 49        if x and not y:  # "Ul"
 50            indices.append(i)
 51        elif not x and y:  # "lU"
 52            indices.append(i + 1)
 53    indices.append(len(s))
 54    # for "lUl", index of "U" will pop twice, have to filer it
 55    # Sliding window pattern: pair consecutive indices (strict=False is correct as indices[1:] is shorter)
 56    return " ".join([s[x:y] for x, y in zip(indices, indices[1:], strict=False) if x < y])
 57
 58
 59GET_SCHEMA_DISCREPANCY_SQL = """
 60SELECT
 61    "SCHEMA_DISCREPANCY_ID",
 62    "ENVIRONMENT",
 63    "DATABASE_NAME",
 64    "SCHEMA_NAME",
 65    "DB_OBJECT_TYPE",
 66    "DB_OBJECT_NAME",
 67    "DB_OBJECT_PARENT_TYPE",
 68    "DB_OBJECT_PARENT_NAME",
 69    "DDL",
 70    "ACTION",
 71    "ACTION_APPROVED_BY",
 72    "ACTION_APPROVED_AT",
 73    "KEEP_UNTIL",
 74    "OBJECT_DEPENDENCIES",
 75    "SYSTEM_CREATE_DATETIME",
 76    "SYSTEM_UPDATE_DATETIME"
 77FROM "MIGRATIONS"."SCHEMA_DISCREPANCY"
 78WHERE
 79    "ENVIRONMENT" = :environment
 80    AND "database_name" = :database_name
 81    AND "SCHEMA_NAME"=:schema_name
 82    AND "db_object_type"=:db_object_type
 83    AND "db_object_name"=:db_object_name
 84"""
 85
 86ACCOUNT_MAP = {
 87    "dev": "YOUR_DEV_ACCOUNT",
 88    "staging": "YOUR_STAGING_ACCOUNT",
 89    "prod": "YOUR_PROD_ACCOUNT",
 90}
 91
 92ENV_MAP = {"dev": "DEV", "staging": "STAGING", "prod": "PROD"}
 93
 94# Example: Custom view filters for specific business logic
 95# This is a template - customize based on your data model
 96CUSTOM_VIEW_FILTERS = {
 97    # Example: Filter views that need specific row-level security
 98    "FILTER_BY_ACCOUNT": {
 99        "TABLE_LIST": [
100            # Add your table names here
101            # "ORDERS",
102            # "TRANSACTIONS",
103        ],
104        "FILTER": """AS {alias} WHERE EXISTS(
105            SELECT *
106            FROM SCHEMA.ACCOUNT AS A
107            WHERE {alias}.ACCOUNT_ID = A.ACCOUNT_ID)""",
108    },
109    # Example: Exclude test data
110    "FILTER_OUT_TEST": {
111        "TABLE_LIST": [
112            # "ACCOUNT",
113        ],
114        "FILTER": "WHERE NOT IS_TEST",
115    },
116    # Example: Tables/views that don't need filtering
117    "NO_FILTER": {
118        "VIEWS": [
119            # "REFERENCE_DATA",
120            # "LOOKUP_TABLES",
121        ]
122    },
123    # Example: Tables to exclude from comparison
124    "EXCLUDE": {
125        "TABLE_LIST": [
126            # "TEMP_TABLE",
127            # "STAGING_TABLE",
128        ]
129    },
130}
131
132
133def exclude_table(table_name: str) -> bool:
134    """Check if table should be excluded from processing"""
135    return table_name in CUSTOM_VIEW_FILTERS.get("EXCLUDE", {}).get("TABLE_LIST", [])
136
137
138def get_filter(table_name: str) -> str:
139    """
140    Get custom filter for a table based on configuration.
141    This is a template - customize based on your business logic.
142    """
143    view_filter = ""
144
145    # Check each filter configuration
146    for filter_name, config in CUSTOM_VIEW_FILTERS.items():
147        if filter_name == "EXCLUDE" or filter_name == "NO_FILTER":
148            continue
149
150        if table_name in config.get("TABLE_LIST", []):
151            view_filter = config["FILTER"]
152            if "{alias}" in view_filter:
153                view_filter = view_filter.replace("{alias}", get_alias(table_name))
154            break
155
156    return view_filter
157
158
159def get_alias(table_name: str) -> str:
160    alias = "".join(x[0].upper() for x in table_name.split("_"))
161    alias += get_random_string(1)
162    return alias
163
164
165def get_random_string(length):
166    # choose from all lowercase letter
167    letters = string.ascii_lowercase
168    result_str = "".join(random.choice(letters) for i in range(length))
169    return result_str
def get_html(file_name):
 8def get_html(file_name):
 9    with open(file_name) as file:
10        html = file.read()
11    return html
def get_config(env: str, resources_path: str) -> configparser.ConfigParser:
14def get_config(env: str, resources_path: str) -> ConfigParser:
15    """
16    Reads a config file for and environment and returns a ConfigParser instance
17    :param env: Environment: dev, nonprod, cert or prod
18    :param resources_path: Resource path, where the config files are stored
19    :return: ConfigParser instance
20    """
21    config = ConfigParser()
22    config.read_file(open(os.path.join(resources_path, f"db-{env}.properties")), "r")
23    return config

Reads a config file for and environment and returns a ConfigParser instance

Parameters
  • env: Environment: dev, nonprod, cert or prod
  • resources_path: Resource path, where the config files are stored
Returns

ConfigParser instance

def snake_case_split(s) -> str:
26def snake_case_split(s) -> str:
27    """
28    The function would split a word in snake_case to separate words with first capital letter.
29    This was done to sve some space in page width when creating pdf
30
31    :param s: snake_case string like user_access_report
32    :return: split header as "User Access Report"
33    """
34    return " ".join(x.capitalize() for x in s.split("_"))

The function would split a word in snake_case to separate words with first capital letter. This was done to sve some space in page width when creating pdf

Parameters
  • s: snake_case string like user_access_report
Returns

split header as "User Access Report"

def camel_case_split(s):
37def camel_case_split(s):
38    """
39    The function would split a word in camelCase to separate words.
40    This was done to sve some space in page width when creating pdf
41
42    :param s: camelCase string like userAccessReport
43    :return: split header as "user Access Report"
44    """
45    idx = list(map(str.isupper, s))
46    # mark change of case
47    indices = [0]
48    # Sliding window pattern: pair each element with next (strict=False is correct as idx[1:] is shorter)
49    for i, (x, y) in enumerate(zip(idx, idx[1:], strict=False)):
50        if x and not y:  # "Ul"
51            indices.append(i)
52        elif not x and y:  # "lU"
53            indices.append(i + 1)
54    indices.append(len(s))
55    # for "lUl", index of "U" will pop twice, have to filer it
56    # Sliding window pattern: pair consecutive indices (strict=False is correct as indices[1:] is shorter)
57    return " ".join([s[x:y] for x, y in zip(indices, indices[1:], strict=False) if x < y])

The function would split a word in camelCase to separate words. This was done to sve some space in page width when creating pdf

Parameters
  • s: camelCase string like userAccessReport
Returns

split header as "user Access Report"

GET_SCHEMA_DISCREPANCY_SQL = '\nSELECT\n "SCHEMA_DISCREPANCY_ID",\n "ENVIRONMENT",\n "DATABASE_NAME",\n "SCHEMA_NAME",\n "DB_OBJECT_TYPE",\n "DB_OBJECT_NAME",\n "DB_OBJECT_PARENT_TYPE",\n "DB_OBJECT_PARENT_NAME",\n "DDL",\n "ACTION",\n "ACTION_APPROVED_BY",\n "ACTION_APPROVED_AT",\n "KEEP_UNTIL",\n "OBJECT_DEPENDENCIES",\n "SYSTEM_CREATE_DATETIME",\n "SYSTEM_UPDATE_DATETIME"\nFROM "MIGRATIONS"."SCHEMA_DISCREPANCY"\nWHERE\n "ENVIRONMENT" = :environment\n AND "database_name" = :database_name\n AND "SCHEMA_NAME"=:schema_name\n AND "db_object_type"=:db_object_type\n AND "db_object_name"=:db_object_name\n'
ACCOUNT_MAP = {'dev': 'YOUR_DEV_ACCOUNT', 'staging': 'YOUR_STAGING_ACCOUNT', 'prod': 'YOUR_PROD_ACCOUNT'}
ENV_MAP = {'dev': 'DEV', 'staging': 'STAGING', 'prod': 'PROD'}
CUSTOM_VIEW_FILTERS = {'FILTER_BY_ACCOUNT': {'TABLE_LIST': [], 'FILTER': 'AS {alias} WHERE EXISTS(\n SELECT *\n FROM SCHEMA.ACCOUNT AS A\n WHERE {alias}.ACCOUNT_ID = A.ACCOUNT_ID)'}, 'FILTER_OUT_TEST': {'TABLE_LIST': [], 'FILTER': 'WHERE NOT IS_TEST'}, 'NO_FILTER': {'VIEWS': []}, 'EXCLUDE': {'TABLE_LIST': []}}
def exclude_table(table_name: str) -> bool:
134def exclude_table(table_name: str) -> bool:
135    """Check if table should be excluded from processing"""
136    return table_name in CUSTOM_VIEW_FILTERS.get("EXCLUDE", {}).get("TABLE_LIST", [])

Check if table should be excluded from processing

def get_filter(table_name: str) -> str:
139def get_filter(table_name: str) -> str:
140    """
141    Get custom filter for a table based on configuration.
142    This is a template - customize based on your business logic.
143    """
144    view_filter = ""
145
146    # Check each filter configuration
147    for filter_name, config in CUSTOM_VIEW_FILTERS.items():
148        if filter_name == "EXCLUDE" or filter_name == "NO_FILTER":
149            continue
150
151        if table_name in config.get("TABLE_LIST", []):
152            view_filter = config["FILTER"]
153            if "{alias}" in view_filter:
154                view_filter = view_filter.replace("{alias}", get_alias(table_name))
155            break
156
157    return view_filter

Get custom filter for a table based on configuration. This is a template - customize based on your business logic.

def get_alias(table_name: str) -> str:
160def get_alias(table_name: str) -> str:
161    alias = "".join(x[0].upper() for x in table_name.split("_"))
162    alias += get_random_string(1)
163    return alias
def get_random_string(length):
166def get_random_string(length):
167    # choose from all lowercase letter
168    letters = string.ascii_lowercase
169    result_str = "".join(random.choice(letters) for i in range(length))
170    return result_str