schema_sentinel.metadata_manager.model.pipe

 1import json
 2
 3import sqlalchemy as db
 4from sqlalchemy import ForeignKey, select
 5
 6from . import CommonBase
 7
 8
 9class Pipe(CommonBase):
10    __tablename__ = "pipes"
11    pipe_id = db.Column(db.String, primary_key=True)
12    schema_id = db.Column(db.String, ForeignKey("schemas.schema_id"))
13
14    pipe_name = db.Column(db.String, primary_key=True)
15    pipe_owner = db.Column(db.String)
16    pipe_definition = db.Column(db.String)
17    is_autoingest_enabled = db.Column(db.String)
18    notification_channel_name = db.Column(db.String)
19    comment = db.Column(db.String)
20    created = db.Column(db.String)
21    last_altered = db.Column(db.String)
22    pattern = db.Column(db.String)
23
24    def save(self, session) -> None:
25        if not session.execute(self.exists()).first():
26            session.add(self)
27            session.commit()
28
29    def exists(self) -> str:
30        return select(Pipe).filter_by(pipe_id=self.pipe_id)
31
32    def __get_id__(self) -> str:
33        id = json.loads(self.schema_id)
34        id["pipe_name"] = self.pipe_name_name
35        return json.dumps(id)
10class Pipe(CommonBase):
11    __tablename__ = "pipes"
12    pipe_id = db.Column(db.String, primary_key=True)
13    schema_id = db.Column(db.String, ForeignKey("schemas.schema_id"))
14
15    pipe_name = db.Column(db.String, primary_key=True)
16    pipe_owner = db.Column(db.String)
17    pipe_definition = db.Column(db.String)
18    is_autoingest_enabled = db.Column(db.String)
19    notification_channel_name = db.Column(db.String)
20    comment = db.Column(db.String)
21    created = db.Column(db.String)
22    last_altered = db.Column(db.String)
23    pattern = db.Column(db.String)
24
25    def save(self, session) -> None:
26        if not session.execute(self.exists()).first():
27            session.add(self)
28            session.commit()
29
30    def exists(self) -> str:
31        return select(Pipe).filter_by(pipe_id=self.pipe_id)
32
33    def __get_id__(self) -> str:
34        id = json.loads(self.schema_id)
35        id["pipe_name"] = self.pipe_name_name
36        return json.dumps(id)

The base class of the class hierarchy.

When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.

Pipe(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

pipe_id
schema_id
pipe_name
pipe_owner
pipe_definition
is_autoingest_enabled
notification_channel_name
comment
created
last_altered
pattern
def save(self, session) -> None:
25    def save(self, session) -> None:
26        if not session.execute(self.exists()).first():
27            session.add(self)
28            session.commit()
def exists(self) -> str:
30    def exists(self) -> str:
31        return select(Pipe).filter_by(pipe_id=self.pipe_id)