schema_sentinel.metadata_manager.model.task

 1import json
 2
 3import sqlalchemy as db
 4from sqlalchemy import ForeignKey, select
 5
 6from . import CommonBase
 7
 8
 9class Task(CommonBase):
10    __tablename__ = "tasks"
11    id = db.Column(db.String, primary_key=True)
12    task_id = db.Column(db.String, primary_key=True)
13    schema_id = db.Column(db.String, ForeignKey("schemas.schema_id"))
14    task_name = db.Column(db.String, primary_key=True)
15    task_owner = db.Column(db.String)
16    warehouse = db.Column(db.String)
17    schedule = db.Column(db.String)
18    predecessors = db.Column(db.String)
19    state = db.Column(db.String)
20    definition = db.Column(db.String)
21    condition = db.Column(db.String)
22    allow_overlapping_execution = db.Column(db.String)
23    error_integration = db.Column(db.String)
24    comment = db.Column(db.String)
25    last_committed = db.Column(db.String)
26    last_suspended = db.Column(db.String)
27    owner_role_type = db.Column(db.String)
28    config = db.Column(db.String)
29    created = db.Column(db.String)
30
31    def save(self, session) -> None:
32        if not session.execute(self.exists()).first():
33            session.add(self)
34            session.commit()
35
36    def exists(self) -> str:
37        return select(Task).filter_by(id=self.id, task_id=self.task_id)
38
39    def __get_id__(self) -> str:
40        id = json.loads(self.schema_id)
41        id["task_name"] = self.task_name
42        return json.dumps(id)
10class Task(CommonBase):
11    __tablename__ = "tasks"
12    id = db.Column(db.String, primary_key=True)
13    task_id = db.Column(db.String, primary_key=True)
14    schema_id = db.Column(db.String, ForeignKey("schemas.schema_id"))
15    task_name = db.Column(db.String, primary_key=True)
16    task_owner = db.Column(db.String)
17    warehouse = db.Column(db.String)
18    schedule = db.Column(db.String)
19    predecessors = db.Column(db.String)
20    state = db.Column(db.String)
21    definition = db.Column(db.String)
22    condition = db.Column(db.String)
23    allow_overlapping_execution = db.Column(db.String)
24    error_integration = db.Column(db.String)
25    comment = db.Column(db.String)
26    last_committed = db.Column(db.String)
27    last_suspended = db.Column(db.String)
28    owner_role_type = db.Column(db.String)
29    config = db.Column(db.String)
30    created = db.Column(db.String)
31
32    def save(self, session) -> None:
33        if not session.execute(self.exists()).first():
34            session.add(self)
35            session.commit()
36
37    def exists(self) -> str:
38        return select(Task).filter_by(id=self.id, task_id=self.task_id)
39
40    def __get_id__(self) -> str:
41        id = json.loads(self.schema_id)
42        id["task_name"] = self.task_name
43        return json.dumps(id)

The base class of the class hierarchy.

When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.

Task(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

id
task_id
schema_id
task_name
task_owner
warehouse
schedule
predecessors
state
definition
condition
allow_overlapping_execution
error_integration
comment
last_committed
last_suspended
owner_role_type
config
created
def save(self, session) -> None:
32    def save(self, session) -> None:
33        if not session.execute(self.exists()).first():
34            session.add(self)
35            session.commit()
def exists(self) -> str:
37    def exists(self) -> str:
38        return select(Task).filter_by(id=self.id, task_id=self.task_id)