schema_sentinel.markdown_utils.markdown
1import datetime 2import getpass 3import logging as log 4import os 5 6import pandas as pd 7from snakemd import Document 8from sqlalchemy.sql.elements import and_ 9 10from ..metadata_manager.model.column import Column 11from ..metadata_manager.model.comparison import Comparison 12from ..metadata_manager.model.database import Database 13from ..metadata_manager.model.function import Function 14from ..metadata_manager.model.pipe import Pipe 15from ..metadata_manager.model.procedure import Procedure 16from ..metadata_manager.model.schema import Schema 17from ..metadata_manager.model.stage import Stage 18from ..metadata_manager.model.stream import Stream 19from ..metadata_manager.model.table import Table 20from ..metadata_manager.model.table_constraint import TableConstraint 21from ..metadata_manager.model.task import Task 22from ..metadata_manager.model.view import View 23 24 25def comparison_to_markdown(src_database: Database, trg_database: Database, session) -> Document: 26 header_level = 1 27 header = f"{src_database.__get_name__()} -> {trg_database.__get_name__()})" 28 log.info(f"Writing {header} comparison report to Markdown") 29 doc: Document = Document() 30 31 # Get author from environment or use system username 32 author = os.getenv("REPORT_AUTHOR", getpass.getuser()) 33 34 doc.add_raw(f""" 35 --- 36 author: {author} 37 date: {datetime.datetime.now().strftime("%a %m %y")} 38 """) 39 40 doc.add_heading(f"{header} comparison report") 41 doc.add_horizontal_rule() 42 doc.add_heading("Compared databases", level=header_level) 43 doc.add_block(src_database.__side_by_side__(trg_database).to_markdown()) 44 header_level += 1 45 46 doc.add_heading( 47 "DB Objects existing in the LEFT database and not present in the RIGHT database", level=header_level 48 ) 49 comparisons = ( 50 session.query(Comparison) 51 .filter( 52 and_( 53 Comparison.source_database_id == src_database.database_id, 54 Comparison.target_database_id == trg_database.database_id, 55 Comparison.comparison_value.like('%"comparison": {"left": "%", "right": null}%'), 56 ) 57 ) 58 .all() 59 ) 60 differences = None 61 62 for comparison in comparisons: 63 differences = ( 64 pd.concat([differences, comparison.one_diff()]) if differences is not None else comparison.one_diff() 65 ) 66 67 doc.add_block(differences.to_markdown()) 68 differences = None 69 70 doc.add_heading("Both sides are different", level=header_level) 71 72 comparisons = ( 73 session.query(Comparison) 74 .filter( 75 and_( 76 Comparison.source_database_id == src_database.database_id, 77 Comparison.target_database_id == trg_database.database_id, 78 Comparison.comparison_value.not_like('%"comparison": {"left": "%", "right": null}%'), 79 ) 80 ) 81 .all() 82 ) 83 84 for comparison in comparisons: 85 differences = ( 86 pd.concat([differences, comparison.both_diffs()]) if differences is not None else comparison.both_diffs() 87 ) 88 89 doc.add_block(differences.to_markdown()) 90 91 return doc 92 93 94def db_to_markdown(database: Database, session) -> Document: 95 header = f"{database.__get_name__()}" 96 log.info(f"Writing {header} database to Markdown") 97 doc: Document = Document() 98 99 # Get author from environment or use system username 100 author = os.getenv("REPORT_AUTHOR", getpass.getuser()) 101 102 doc.add_raw(f""" 103 --- 104 author: {author} 105 date: {datetime.datetime.now().strftime("%a %m %y")} 106 """) 107 108 doc.add_heading(f"{header} database documentation") 109 doc.add_horizontal_rule() 110 doc.add_block(database.__get_df__().to_markdown()) 111 112 schemas = session.query(Schema).filter(and_(Schema.database_id == database.database_id)).all() 113 header_level = 1 114 doc.add_heading(f"{database.database_name} schemas", level=header_level) 115 doc.add_block( 116 Schema.__to_df__(schemas, columns=["schema_name", "created", "last_altered", "comment"]).to_markdown() 117 ) 118 119 header_level = 2 120 for schema in schemas: 121 schema_md = schema.schema_name.replace("_", "\\_") 122 doc.add_heading(f"Schema: {schema.schema_name}", level=header_level) 123 doc.add_block(schema.__get_df__().to_markdown()) 124 125 tables = session.query(Table).filter(Table.schema_id == schema.schema_id).all() 126 if tables is not None: 127 header_level += 1 128 df = Table.__to_df__(tables, ["table_name", "created", "last_altered", "comment"]) 129 if df.size: 130 doc.add_heading(f"{schema_md} Tables", level=header_level) 131 doc.add_block(df.to_markdown()) 132 header_level -= 1 133 134 header_level += 1 135 for table in tables: 136 table_md = table.table_name.replace("_", "\\_") 137 doc.add_heading(f"Table {schema.schema_name}.{table_md} ", level=header_level) 138 doc.add_block(table.__get_df__().to_markdown()) 139 140 get_object_doc( 141 data=session.query(Column).filter(Column.table_id == table.table_id).all(), 142 klass=Column, 143 columns=[ 144 "column_name", 145 "ordinal_position", 146 "is_nullable", 147 "character_maximum_length", 148 "numeric_precision", 149 "numeric_scale", 150 "datetime_precision", 151 ], 152 header=f"Table {schema.schema_name}.{table_md} columns", 153 doc=doc, 154 header_level=header_level, 155 ) 156 157 get_object_doc( 158 data=session.query(TableConstraint).filter(TableConstraint.table_id == table.table_id).all(), 159 klass=TableConstraint, 160 columns=["table_constraint_name", "constraint_type", "is_deferrable", "created", "last_altered"], 161 header=f"Table {schema.schema_name}.{table_md} constraints", 162 doc=doc, 163 header_level=header_level, 164 ) 165 166 header_level -= 1 167 168 get_object_doc( 169 data=session.query(View).filter(View.schema_id == schema.schema_id).all(), 170 klass=View, 171 columns=["view_name", "created", "is_secure", "is_materialized", "enable_schema_evolution", "comment"], 172 header=f"{schema_md} Views", 173 doc=doc, 174 header_level=header_level, 175 ) 176 177 get_object_doc( 178 data=session.query(Procedure).filter(Procedure.schema_id == schema.schema_id).all(), 179 klass=Procedure, 180 columns=["procedure_name", "data_type", "argument_signature", "created", "last_altered", "comment"], 181 header=f"{schema_md} Procedures", 182 doc=doc, 183 header_level=header_level, 184 ) 185 186 get_object_doc( 187 data=session.query(Function).filter(Function.schema_id == schema.schema_id).all(), 188 klass=Function, 189 columns=["function_name", "data_type", "argument_signature", "created", "last_altered", "comment"], 190 header=f"{schema_md} Functions", 191 doc=doc, 192 header_level=header_level, 193 ) 194 195 get_object_doc( 196 data=session.query(Stage).filter(Stage.schema_id == schema.schema_id).all(), 197 klass=Stage, 198 columns=["stage_name", "stage_url", "stage_type", "storage_integration", "created", "comment"], 199 header=f"{schema_md} Stages", 200 doc=doc, 201 header_level=header_level, 202 ) 203 204 get_object_doc( 205 data=session.query(Pipe).filter(Pipe.schema_id == schema.schema_id).all(), 206 klass=Pipe, 207 columns=["pipe_name", "pipe_definition", "notification_channel_name", "pattern", "created", "last_altered"], 208 header=f"{schema_md} Pipes", 209 doc=doc, 210 header_level=header_level, 211 ) 212 213 get_object_doc( 214 data=session.query(Stream).filter(Stream.schema_id == schema.schema_id).all(), 215 klass=Stream, 216 columns=[ 217 "stream_name", 218 "table_name", 219 "source_type", 220 "base_tables", 221 "type", 222 "stale", 223 "invalid_reason", 224 "created", 225 "comment", 226 ], 227 header=f"{schema_md} Streams", 228 doc=doc, 229 header_level=header_level, 230 ) 231 232 get_object_doc( 233 data=session.query(Task).filter(Task.schema_id == schema.schema_id).all(), 234 klass=Stream, 235 columns=[ 236 "id", 237 "task_name", 238 "warehouse", 239 "schedule", 240 "state", 241 "condition", 242 "error_integration", 243 "config", 244 "last_committed", 245 "last_suspended", 246 "created", 247 "comment", 248 ], 249 header=f"{schema_md} Streams", 250 doc=doc, 251 header_level=header_level, 252 ) 253 254 return doc 255 256 257def get_object_doc(data, klass, columns: list, header: str, doc: Document, header_level: int = 0): 258 header_level += 1 259 if data: 260 doc.add_heading(header, level=header_level) 261 df: pd.DataFrame = klass.__to_df__(data, columns) 262 if df.size: 263 doc.add_block(df.to_markdown()) 264 header_level -= 1
def
comparison_to_markdown( src_database: schema_sentinel.metadata_manager.model.database.Database, trg_database: schema_sentinel.metadata_manager.model.database.Database, session) -> snakemd.document.Document:
26def comparison_to_markdown(src_database: Database, trg_database: Database, session) -> Document: 27 header_level = 1 28 header = f"{src_database.__get_name__()} -> {trg_database.__get_name__()})" 29 log.info(f"Writing {header} comparison report to Markdown") 30 doc: Document = Document() 31 32 # Get author from environment or use system username 33 author = os.getenv("REPORT_AUTHOR", getpass.getuser()) 34 35 doc.add_raw(f""" 36 --- 37 author: {author} 38 date: {datetime.datetime.now().strftime("%a %m %y")} 39 """) 40 41 doc.add_heading(f"{header} comparison report") 42 doc.add_horizontal_rule() 43 doc.add_heading("Compared databases", level=header_level) 44 doc.add_block(src_database.__side_by_side__(trg_database).to_markdown()) 45 header_level += 1 46 47 doc.add_heading( 48 "DB Objects existing in the LEFT database and not present in the RIGHT database", level=header_level 49 ) 50 comparisons = ( 51 session.query(Comparison) 52 .filter( 53 and_( 54 Comparison.source_database_id == src_database.database_id, 55 Comparison.target_database_id == trg_database.database_id, 56 Comparison.comparison_value.like('%"comparison": {"left": "%", "right": null}%'), 57 ) 58 ) 59 .all() 60 ) 61 differences = None 62 63 for comparison in comparisons: 64 differences = ( 65 pd.concat([differences, comparison.one_diff()]) if differences is not None else comparison.one_diff() 66 ) 67 68 doc.add_block(differences.to_markdown()) 69 differences = None 70 71 doc.add_heading("Both sides are different", level=header_level) 72 73 comparisons = ( 74 session.query(Comparison) 75 .filter( 76 and_( 77 Comparison.source_database_id == src_database.database_id, 78 Comparison.target_database_id == trg_database.database_id, 79 Comparison.comparison_value.not_like('%"comparison": {"left": "%", "right": null}%'), 80 ) 81 ) 82 .all() 83 ) 84 85 for comparison in comparisons: 86 differences = ( 87 pd.concat([differences, comparison.both_diffs()]) if differences is not None else comparison.both_diffs() 88 ) 89 90 doc.add_block(differences.to_markdown()) 91 92 return doc
def
db_to_markdown( database: schema_sentinel.metadata_manager.model.database.Database, session) -> snakemd.document.Document:
95def db_to_markdown(database: Database, session) -> Document: 96 header = f"{database.__get_name__()}" 97 log.info(f"Writing {header} database to Markdown") 98 doc: Document = Document() 99 100 # Get author from environment or use system username 101 author = os.getenv("REPORT_AUTHOR", getpass.getuser()) 102 103 doc.add_raw(f""" 104 --- 105 author: {author} 106 date: {datetime.datetime.now().strftime("%a %m %y")} 107 """) 108 109 doc.add_heading(f"{header} database documentation") 110 doc.add_horizontal_rule() 111 doc.add_block(database.__get_df__().to_markdown()) 112 113 schemas = session.query(Schema).filter(and_(Schema.database_id == database.database_id)).all() 114 header_level = 1 115 doc.add_heading(f"{database.database_name} schemas", level=header_level) 116 doc.add_block( 117 Schema.__to_df__(schemas, columns=["schema_name", "created", "last_altered", "comment"]).to_markdown() 118 ) 119 120 header_level = 2 121 for schema in schemas: 122 schema_md = schema.schema_name.replace("_", "\\_") 123 doc.add_heading(f"Schema: {schema.schema_name}", level=header_level) 124 doc.add_block(schema.__get_df__().to_markdown()) 125 126 tables = session.query(Table).filter(Table.schema_id == schema.schema_id).all() 127 if tables is not None: 128 header_level += 1 129 df = Table.__to_df__(tables, ["table_name", "created", "last_altered", "comment"]) 130 if df.size: 131 doc.add_heading(f"{schema_md} Tables", level=header_level) 132 doc.add_block(df.to_markdown()) 133 header_level -= 1 134 135 header_level += 1 136 for table in tables: 137 table_md = table.table_name.replace("_", "\\_") 138 doc.add_heading(f"Table {schema.schema_name}.{table_md} ", level=header_level) 139 doc.add_block(table.__get_df__().to_markdown()) 140 141 get_object_doc( 142 data=session.query(Column).filter(Column.table_id == table.table_id).all(), 143 klass=Column, 144 columns=[ 145 "column_name", 146 "ordinal_position", 147 "is_nullable", 148 "character_maximum_length", 149 "numeric_precision", 150 "numeric_scale", 151 "datetime_precision", 152 ], 153 header=f"Table {schema.schema_name}.{table_md} columns", 154 doc=doc, 155 header_level=header_level, 156 ) 157 158 get_object_doc( 159 data=session.query(TableConstraint).filter(TableConstraint.table_id == table.table_id).all(), 160 klass=TableConstraint, 161 columns=["table_constraint_name", "constraint_type", "is_deferrable", "created", "last_altered"], 162 header=f"Table {schema.schema_name}.{table_md} constraints", 163 doc=doc, 164 header_level=header_level, 165 ) 166 167 header_level -= 1 168 169 get_object_doc( 170 data=session.query(View).filter(View.schema_id == schema.schema_id).all(), 171 klass=View, 172 columns=["view_name", "created", "is_secure", "is_materialized", "enable_schema_evolution", "comment"], 173 header=f"{schema_md} Views", 174 doc=doc, 175 header_level=header_level, 176 ) 177 178 get_object_doc( 179 data=session.query(Procedure).filter(Procedure.schema_id == schema.schema_id).all(), 180 klass=Procedure, 181 columns=["procedure_name", "data_type", "argument_signature", "created", "last_altered", "comment"], 182 header=f"{schema_md} Procedures", 183 doc=doc, 184 header_level=header_level, 185 ) 186 187 get_object_doc( 188 data=session.query(Function).filter(Function.schema_id == schema.schema_id).all(), 189 klass=Function, 190 columns=["function_name", "data_type", "argument_signature", "created", "last_altered", "comment"], 191 header=f"{schema_md} Functions", 192 doc=doc, 193 header_level=header_level, 194 ) 195 196 get_object_doc( 197 data=session.query(Stage).filter(Stage.schema_id == schema.schema_id).all(), 198 klass=Stage, 199 columns=["stage_name", "stage_url", "stage_type", "storage_integration", "created", "comment"], 200 header=f"{schema_md} Stages", 201 doc=doc, 202 header_level=header_level, 203 ) 204 205 get_object_doc( 206 data=session.query(Pipe).filter(Pipe.schema_id == schema.schema_id).all(), 207 klass=Pipe, 208 columns=["pipe_name", "pipe_definition", "notification_channel_name", "pattern", "created", "last_altered"], 209 header=f"{schema_md} Pipes", 210 doc=doc, 211 header_level=header_level, 212 ) 213 214 get_object_doc( 215 data=session.query(Stream).filter(Stream.schema_id == schema.schema_id).all(), 216 klass=Stream, 217 columns=[ 218 "stream_name", 219 "table_name", 220 "source_type", 221 "base_tables", 222 "type", 223 "stale", 224 "invalid_reason", 225 "created", 226 "comment", 227 ], 228 header=f"{schema_md} Streams", 229 doc=doc, 230 header_level=header_level, 231 ) 232 233 get_object_doc( 234 data=session.query(Task).filter(Task.schema_id == schema.schema_id).all(), 235 klass=Stream, 236 columns=[ 237 "id", 238 "task_name", 239 "warehouse", 240 "schedule", 241 "state", 242 "condition", 243 "error_integration", 244 "config", 245 "last_committed", 246 "last_suspended", 247 "created", 248 "comment", 249 ], 250 header=f"{schema_md} Streams", 251 doc=doc, 252 header_level=header_level, 253 ) 254 255 return doc
def
get_object_doc( data, klass, columns: list, header: str, doc: snakemd.document.Document, header_level: int = 0):
258def get_object_doc(data, klass, columns: list, header: str, doc: Document, header_level: int = 0): 259 header_level += 1 260 if data: 261 doc.add_heading(header, level=header_level) 262 df: pd.DataFrame = klass.__to_df__(data, columns) 263 if df.size: 264 doc.add_block(df.to_markdown()) 265 header_level -= 1