schema_sentinel.metadata_manager.model.stream

 1import json
 2
 3import sqlalchemy as db
 4from sqlalchemy import ForeignKey, select
 5
 6from . import CommonBase
 7
 8
 9class Stream(CommonBase):
10    __tablename__ = "streams"
11    stream_id = db.Column(db.String, primary_key=True)
12    schema_id = db.Column(db.String, ForeignKey("schemas.schema_id"))
13    stream_name = db.Column(db.String)
14    stream_owner = db.Column(db.String)
15    comment = db.Column(db.String)
16    table_name = db.Column(db.String)
17    source_type = db.Column(db.String)
18    base_tables = db.Column(db.String)
19    type = db.Column(db.String)
20    stale = db.Column(db.String)
21    mode = db.Column(db.String)
22    stale_after = db.Column(db.String)
23    invalid_reason = db.Column(db.String)
24    owner_role_type = db.Column(db.String)
25    created = db.Column(db.String)
26
27    def save(self, session) -> None:
28        if not session.execute(self.exists()).first():
29            session.add(self)
30            session.commit()
31
32    def exists(self) -> str:
33        return select(Stream).filter_by(stream_id=self.stream_id)
34
35    def __get_id__(self) -> str:
36        id = json.loads(self.schema_id)
37        id["stream_name"] = self.stream_name
38        return json.dumps(id)
10class Stream(CommonBase):
11    __tablename__ = "streams"
12    stream_id = db.Column(db.String, primary_key=True)
13    schema_id = db.Column(db.String, ForeignKey("schemas.schema_id"))
14    stream_name = db.Column(db.String)
15    stream_owner = db.Column(db.String)
16    comment = db.Column(db.String)
17    table_name = db.Column(db.String)
18    source_type = db.Column(db.String)
19    base_tables = db.Column(db.String)
20    type = db.Column(db.String)
21    stale = db.Column(db.String)
22    mode = db.Column(db.String)
23    stale_after = db.Column(db.String)
24    invalid_reason = db.Column(db.String)
25    owner_role_type = db.Column(db.String)
26    created = db.Column(db.String)
27
28    def save(self, session) -> None:
29        if not session.execute(self.exists()).first():
30            session.add(self)
31            session.commit()
32
33    def exists(self) -> str:
34        return select(Stream).filter_by(stream_id=self.stream_id)
35
36    def __get_id__(self) -> str:
37        id = json.loads(self.schema_id)
38        id["stream_name"] = self.stream_name
39        return json.dumps(id)

The base class of the class hierarchy.

When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.

Stream(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

stream_id
schema_id
stream_name
stream_owner
comment
table_name
source_type
base_tables
type
stale
mode
stale_after
invalid_reason
owner_role_type
created
def save(self, session) -> None:
28    def save(self, session) -> None:
29        if not session.execute(self.exists()).first():
30            session.add(self)
31            session.commit()
def exists(self) -> str:
33    def exists(self) -> str:
34        return select(Stream).filter_by(stream_id=self.stream_id)