schema_sentinel.metadata_manager.model.stream
1import json 2 3import sqlalchemy as db 4from sqlalchemy import ForeignKey, select 5 6from . import CommonBase 7 8 9class Stream(CommonBase): 10 __tablename__ = "streams" 11 stream_id = db.Column(db.String, primary_key=True) 12 schema_id = db.Column(db.String, ForeignKey("schemas.schema_id")) 13 stream_name = db.Column(db.String) 14 stream_owner = db.Column(db.String) 15 comment = db.Column(db.String) 16 table_name = db.Column(db.String) 17 source_type = db.Column(db.String) 18 base_tables = db.Column(db.String) 19 type = db.Column(db.String) 20 stale = db.Column(db.String) 21 mode = db.Column(db.String) 22 stale_after = db.Column(db.String) 23 invalid_reason = db.Column(db.String) 24 owner_role_type = db.Column(db.String) 25 created = db.Column(db.String) 26 27 def save(self, session) -> None: 28 if not session.execute(self.exists()).first(): 29 session.add(self) 30 session.commit() 31 32 def exists(self) -> str: 33 return select(Stream).filter_by(stream_id=self.stream_id) 34 35 def __get_id__(self) -> str: 36 id = json.loads(self.schema_id) 37 id["stream_name"] = self.stream_name 38 return json.dumps(id)
10class Stream(CommonBase): 11 __tablename__ = "streams" 12 stream_id = db.Column(db.String, primary_key=True) 13 schema_id = db.Column(db.String, ForeignKey("schemas.schema_id")) 14 stream_name = db.Column(db.String) 15 stream_owner = db.Column(db.String) 16 comment = db.Column(db.String) 17 table_name = db.Column(db.String) 18 source_type = db.Column(db.String) 19 base_tables = db.Column(db.String) 20 type = db.Column(db.String) 21 stale = db.Column(db.String) 22 mode = db.Column(db.String) 23 stale_after = db.Column(db.String) 24 invalid_reason = db.Column(db.String) 25 owner_role_type = db.Column(db.String) 26 created = db.Column(db.String) 27 28 def save(self, session) -> None: 29 if not session.execute(self.exists()).first(): 30 session.add(self) 31 session.commit() 32 33 def exists(self) -> str: 34 return select(Stream).filter_by(stream_id=self.stream_id) 35 36 def __get_id__(self) -> str: 37 id = json.loads(self.schema_id) 38 id["stream_name"] = self.stream_name 39 return json.dumps(id)
The base class of the class hierarchy.
When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.
Stream(**kwargs)
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and
values in kwargs.
Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.