schema_sentinel.markdown_utils.markdown

  1import datetime
  2import getpass
  3import logging as log
  4import os
  5
  6import pandas as pd
  7from snakemd import Document
  8from sqlalchemy.sql.elements import and_
  9
 10from ..metadata_manager.model.column import Column
 11from ..metadata_manager.model.comparison import Comparison
 12from ..metadata_manager.model.database import Database
 13from ..metadata_manager.model.function import Function
 14from ..metadata_manager.model.pipe import Pipe
 15from ..metadata_manager.model.procedure import Procedure
 16from ..metadata_manager.model.schema import Schema
 17from ..metadata_manager.model.stage import Stage
 18from ..metadata_manager.model.stream import Stream
 19from ..metadata_manager.model.table import Table
 20from ..metadata_manager.model.table_constraint import TableConstraint
 21from ..metadata_manager.model.task import Task
 22from ..metadata_manager.model.view import View
 23
 24
 25def comparison_to_markdown(src_database: Database, trg_database: Database, session) -> Document:
 26    header_level = 1
 27    header = f"{src_database.__get_name__()} -> {trg_database.__get_name__()})"
 28    log.info(f"Writing {header} comparison report to Markdown")
 29    doc: Document = Document()
 30
 31    # Get author from environment or use system username
 32    author = os.getenv("REPORT_AUTHOR", getpass.getuser())
 33
 34    doc.add_raw(f"""
 35    ---
 36    author: {author}
 37    date:   {datetime.datetime.now().strftime("%a %m %y")}
 38    """)
 39
 40    doc.add_heading(f"{header} comparison report")
 41    doc.add_horizontal_rule()
 42    doc.add_heading("Compared databases", level=header_level)
 43    doc.add_block(src_database.__side_by_side__(trg_database).to_markdown())
 44    header_level += 1
 45
 46    doc.add_heading(
 47        "DB Objects existing in the LEFT database and not present in the RIGHT database", level=header_level
 48    )
 49    comparisons = (
 50        session.query(Comparison)
 51        .filter(
 52            and_(
 53                Comparison.source_database_id == src_database.database_id,
 54                Comparison.target_database_id == trg_database.database_id,
 55                Comparison.comparison_value.like('%"comparison": {"left": "%", "right": null}%'),
 56            )
 57        )
 58        .all()
 59    )
 60    differences = None
 61
 62    for comparison in comparisons:
 63        differences = (
 64            pd.concat([differences, comparison.one_diff()]) if differences is not None else comparison.one_diff()
 65        )
 66
 67    doc.add_block(differences.to_markdown())
 68    differences = None
 69
 70    doc.add_heading("Both sides are different", level=header_level)
 71
 72    comparisons = (
 73        session.query(Comparison)
 74        .filter(
 75            and_(
 76                Comparison.source_database_id == src_database.database_id,
 77                Comparison.target_database_id == trg_database.database_id,
 78                Comparison.comparison_value.not_like('%"comparison": {"left": "%", "right": null}%'),
 79            )
 80        )
 81        .all()
 82    )
 83
 84    for comparison in comparisons:
 85        differences = (
 86            pd.concat([differences, comparison.both_diffs()]) if differences is not None else comparison.both_diffs()
 87        )
 88
 89    doc.add_block(differences.to_markdown())
 90
 91    return doc
 92
 93
 94def db_to_markdown(database: Database, session) -> Document:
 95    header = f"{database.__get_name__()}"
 96    log.info(f"Writing {header} database to Markdown")
 97    doc: Document = Document()
 98
 99    # Get author from environment or use system username
100    author = os.getenv("REPORT_AUTHOR", getpass.getuser())
101
102    doc.add_raw(f"""
103    ---
104    author: {author}
105    date:   {datetime.datetime.now().strftime("%a %m %y")}
106    """)
107
108    doc.add_heading(f"{header} database documentation")
109    doc.add_horizontal_rule()
110    doc.add_block(database.__get_df__().to_markdown())
111
112    schemas = session.query(Schema).filter(and_(Schema.database_id == database.database_id)).all()
113    header_level = 1
114    doc.add_heading(f"{database.database_name} schemas", level=header_level)
115    doc.add_block(
116        Schema.__to_df__(schemas, columns=["schema_name", "created", "last_altered", "comment"]).to_markdown()
117    )
118
119    header_level = 2
120    for schema in schemas:
121        schema_md = schema.schema_name.replace("_", "\\_")
122        doc.add_heading(f"Schema: {schema.schema_name}", level=header_level)
123        doc.add_block(schema.__get_df__().to_markdown())
124
125        tables = session.query(Table).filter(Table.schema_id == schema.schema_id).all()
126        if tables is not None:
127            header_level += 1
128            df = Table.__to_df__(tables, ["table_name", "created", "last_altered", "comment"])
129            if df.size:
130                doc.add_heading(f"{schema_md} Tables", level=header_level)
131                doc.add_block(df.to_markdown())
132            header_level -= 1
133
134            header_level += 1
135            for table in tables:
136                table_md = table.table_name.replace("_", "\\_")
137                doc.add_heading(f"Table  {schema.schema_name}.{table_md} ", level=header_level)
138                doc.add_block(table.__get_df__().to_markdown())
139
140                get_object_doc(
141                    data=session.query(Column).filter(Column.table_id == table.table_id).all(),
142                    klass=Column,
143                    columns=[
144                        "column_name",
145                        "ordinal_position",
146                        "is_nullable",
147                        "character_maximum_length",
148                        "numeric_precision",
149                        "numeric_scale",
150                        "datetime_precision",
151                    ],
152                    header=f"Table {schema.schema_name}.{table_md} columns",
153                    doc=doc,
154                    header_level=header_level,
155                )
156
157                get_object_doc(
158                    data=session.query(TableConstraint).filter(TableConstraint.table_id == table.table_id).all(),
159                    klass=TableConstraint,
160                    columns=["table_constraint_name", "constraint_type", "is_deferrable", "created", "last_altered"],
161                    header=f"Table {schema.schema_name}.{table_md} constraints",
162                    doc=doc,
163                    header_level=header_level,
164                )
165
166            header_level -= 1
167
168        get_object_doc(
169            data=session.query(View).filter(View.schema_id == schema.schema_id).all(),
170            klass=View,
171            columns=["view_name", "created", "is_secure", "is_materialized", "enable_schema_evolution", "comment"],
172            header=f"{schema_md} Views",
173            doc=doc,
174            header_level=header_level,
175        )
176
177        get_object_doc(
178            data=session.query(Procedure).filter(Procedure.schema_id == schema.schema_id).all(),
179            klass=Procedure,
180            columns=["procedure_name", "data_type", "argument_signature", "created", "last_altered", "comment"],
181            header=f"{schema_md} Procedures",
182            doc=doc,
183            header_level=header_level,
184        )
185
186        get_object_doc(
187            data=session.query(Function).filter(Function.schema_id == schema.schema_id).all(),
188            klass=Function,
189            columns=["function_name", "data_type", "argument_signature", "created", "last_altered", "comment"],
190            header=f"{schema_md} Functions",
191            doc=doc,
192            header_level=header_level,
193        )
194
195        get_object_doc(
196            data=session.query(Stage).filter(Stage.schema_id == schema.schema_id).all(),
197            klass=Stage,
198            columns=["stage_name", "stage_url", "stage_type", "storage_integration", "created", "comment"],
199            header=f"{schema_md} Stages",
200            doc=doc,
201            header_level=header_level,
202        )
203
204        get_object_doc(
205            data=session.query(Pipe).filter(Pipe.schema_id == schema.schema_id).all(),
206            klass=Pipe,
207            columns=["pipe_name", "pipe_definition", "notification_channel_name", "pattern", "created", "last_altered"],
208            header=f"{schema_md} Pipes",
209            doc=doc,
210            header_level=header_level,
211        )
212
213        get_object_doc(
214            data=session.query(Stream).filter(Stream.schema_id == schema.schema_id).all(),
215            klass=Stream,
216            columns=[
217                "stream_name",
218                "table_name",
219                "source_type",
220                "base_tables",
221                "type",
222                "stale",
223                "invalid_reason",
224                "created",
225                "comment",
226            ],
227            header=f"{schema_md} Streams",
228            doc=doc,
229            header_level=header_level,
230        )
231
232        get_object_doc(
233            data=session.query(Task).filter(Task.schema_id == schema.schema_id).all(),
234            klass=Stream,
235            columns=[
236                "id",
237                "task_name",
238                "warehouse",
239                "schedule",
240                "state",
241                "condition",
242                "error_integration",
243                "config",
244                "last_committed",
245                "last_suspended",
246                "created",
247                "comment",
248            ],
249            header=f"{schema_md} Streams",
250            doc=doc,
251            header_level=header_level,
252        )
253
254    return doc
255
256
257def get_object_doc(data, klass, columns: list, header: str, doc: Document, header_level: int = 0):
258    header_level += 1
259    if data:
260        doc.add_heading(header, level=header_level)
261        df: pd.DataFrame = klass.__to_df__(data, columns)
262        if df.size:
263            doc.add_block(df.to_markdown())
264    header_level -= 1
def comparison_to_markdown( src_database: schema_sentinel.metadata_manager.model.database.Database, trg_database: schema_sentinel.metadata_manager.model.database.Database, session) -> snakemd.document.Document:
26def comparison_to_markdown(src_database: Database, trg_database: Database, session) -> Document:
27    header_level = 1
28    header = f"{src_database.__get_name__()} -> {trg_database.__get_name__()})"
29    log.info(f"Writing {header} comparison report to Markdown")
30    doc: Document = Document()
31
32    # Get author from environment or use system username
33    author = os.getenv("REPORT_AUTHOR", getpass.getuser())
34
35    doc.add_raw(f"""
36    ---
37    author: {author}
38    date:   {datetime.datetime.now().strftime("%a %m %y")}
39    """)
40
41    doc.add_heading(f"{header} comparison report")
42    doc.add_horizontal_rule()
43    doc.add_heading("Compared databases", level=header_level)
44    doc.add_block(src_database.__side_by_side__(trg_database).to_markdown())
45    header_level += 1
46
47    doc.add_heading(
48        "DB Objects existing in the LEFT database and not present in the RIGHT database", level=header_level
49    )
50    comparisons = (
51        session.query(Comparison)
52        .filter(
53            and_(
54                Comparison.source_database_id == src_database.database_id,
55                Comparison.target_database_id == trg_database.database_id,
56                Comparison.comparison_value.like('%"comparison": {"left": "%", "right": null}%'),
57            )
58        )
59        .all()
60    )
61    differences = None
62
63    for comparison in comparisons:
64        differences = (
65            pd.concat([differences, comparison.one_diff()]) if differences is not None else comparison.one_diff()
66        )
67
68    doc.add_block(differences.to_markdown())
69    differences = None
70
71    doc.add_heading("Both sides are different", level=header_level)
72
73    comparisons = (
74        session.query(Comparison)
75        .filter(
76            and_(
77                Comparison.source_database_id == src_database.database_id,
78                Comparison.target_database_id == trg_database.database_id,
79                Comparison.comparison_value.not_like('%"comparison": {"left": "%", "right": null}%'),
80            )
81        )
82        .all()
83    )
84
85    for comparison in comparisons:
86        differences = (
87            pd.concat([differences, comparison.both_diffs()]) if differences is not None else comparison.both_diffs()
88        )
89
90    doc.add_block(differences.to_markdown())
91
92    return doc
def db_to_markdown( database: schema_sentinel.metadata_manager.model.database.Database, session) -> snakemd.document.Document:
 95def db_to_markdown(database: Database, session) -> Document:
 96    header = f"{database.__get_name__()}"
 97    log.info(f"Writing {header} database to Markdown")
 98    doc: Document = Document()
 99
100    # Get author from environment or use system username
101    author = os.getenv("REPORT_AUTHOR", getpass.getuser())
102
103    doc.add_raw(f"""
104    ---
105    author: {author}
106    date:   {datetime.datetime.now().strftime("%a %m %y")}
107    """)
108
109    doc.add_heading(f"{header} database documentation")
110    doc.add_horizontal_rule()
111    doc.add_block(database.__get_df__().to_markdown())
112
113    schemas = session.query(Schema).filter(and_(Schema.database_id == database.database_id)).all()
114    header_level = 1
115    doc.add_heading(f"{database.database_name} schemas", level=header_level)
116    doc.add_block(
117        Schema.__to_df__(schemas, columns=["schema_name", "created", "last_altered", "comment"]).to_markdown()
118    )
119
120    header_level = 2
121    for schema in schemas:
122        schema_md = schema.schema_name.replace("_", "\\_")
123        doc.add_heading(f"Schema: {schema.schema_name}", level=header_level)
124        doc.add_block(schema.__get_df__().to_markdown())
125
126        tables = session.query(Table).filter(Table.schema_id == schema.schema_id).all()
127        if tables is not None:
128            header_level += 1
129            df = Table.__to_df__(tables, ["table_name", "created", "last_altered", "comment"])
130            if df.size:
131                doc.add_heading(f"{schema_md} Tables", level=header_level)
132                doc.add_block(df.to_markdown())
133            header_level -= 1
134
135            header_level += 1
136            for table in tables:
137                table_md = table.table_name.replace("_", "\\_")
138                doc.add_heading(f"Table  {schema.schema_name}.{table_md} ", level=header_level)
139                doc.add_block(table.__get_df__().to_markdown())
140
141                get_object_doc(
142                    data=session.query(Column).filter(Column.table_id == table.table_id).all(),
143                    klass=Column,
144                    columns=[
145                        "column_name",
146                        "ordinal_position",
147                        "is_nullable",
148                        "character_maximum_length",
149                        "numeric_precision",
150                        "numeric_scale",
151                        "datetime_precision",
152                    ],
153                    header=f"Table {schema.schema_name}.{table_md} columns",
154                    doc=doc,
155                    header_level=header_level,
156                )
157
158                get_object_doc(
159                    data=session.query(TableConstraint).filter(TableConstraint.table_id == table.table_id).all(),
160                    klass=TableConstraint,
161                    columns=["table_constraint_name", "constraint_type", "is_deferrable", "created", "last_altered"],
162                    header=f"Table {schema.schema_name}.{table_md} constraints",
163                    doc=doc,
164                    header_level=header_level,
165                )
166
167            header_level -= 1
168
169        get_object_doc(
170            data=session.query(View).filter(View.schema_id == schema.schema_id).all(),
171            klass=View,
172            columns=["view_name", "created", "is_secure", "is_materialized", "enable_schema_evolution", "comment"],
173            header=f"{schema_md} Views",
174            doc=doc,
175            header_level=header_level,
176        )
177
178        get_object_doc(
179            data=session.query(Procedure).filter(Procedure.schema_id == schema.schema_id).all(),
180            klass=Procedure,
181            columns=["procedure_name", "data_type", "argument_signature", "created", "last_altered", "comment"],
182            header=f"{schema_md} Procedures",
183            doc=doc,
184            header_level=header_level,
185        )
186
187        get_object_doc(
188            data=session.query(Function).filter(Function.schema_id == schema.schema_id).all(),
189            klass=Function,
190            columns=["function_name", "data_type", "argument_signature", "created", "last_altered", "comment"],
191            header=f"{schema_md} Functions",
192            doc=doc,
193            header_level=header_level,
194        )
195
196        get_object_doc(
197            data=session.query(Stage).filter(Stage.schema_id == schema.schema_id).all(),
198            klass=Stage,
199            columns=["stage_name", "stage_url", "stage_type", "storage_integration", "created", "comment"],
200            header=f"{schema_md} Stages",
201            doc=doc,
202            header_level=header_level,
203        )
204
205        get_object_doc(
206            data=session.query(Pipe).filter(Pipe.schema_id == schema.schema_id).all(),
207            klass=Pipe,
208            columns=["pipe_name", "pipe_definition", "notification_channel_name", "pattern", "created", "last_altered"],
209            header=f"{schema_md} Pipes",
210            doc=doc,
211            header_level=header_level,
212        )
213
214        get_object_doc(
215            data=session.query(Stream).filter(Stream.schema_id == schema.schema_id).all(),
216            klass=Stream,
217            columns=[
218                "stream_name",
219                "table_name",
220                "source_type",
221                "base_tables",
222                "type",
223                "stale",
224                "invalid_reason",
225                "created",
226                "comment",
227            ],
228            header=f"{schema_md} Streams",
229            doc=doc,
230            header_level=header_level,
231        )
232
233        get_object_doc(
234            data=session.query(Task).filter(Task.schema_id == schema.schema_id).all(),
235            klass=Stream,
236            columns=[
237                "id",
238                "task_name",
239                "warehouse",
240                "schedule",
241                "state",
242                "condition",
243                "error_integration",
244                "config",
245                "last_committed",
246                "last_suspended",
247                "created",
248                "comment",
249            ],
250            header=f"{schema_md} Streams",
251            doc=doc,
252            header_level=header_level,
253        )
254
255    return doc
def get_object_doc( data, klass, columns: list, header: str, doc: snakemd.document.Document, header_level: int = 0):
258def get_object_doc(data, klass, columns: list, header: str, doc: Document, header_level: int = 0):
259    header_level += 1
260    if data:
261        doc.add_heading(header, level=header_level)
262        df: pd.DataFrame = klass.__to_df__(data, columns)
263        if df.size:
264            doc.add_block(df.to_markdown())
265    header_level -= 1