Metadata-Version: 2.4
Name: mnemosynecore
Version: 0.1.7
Summary: Internal analytics toolkit for data pipelines
Author-email: rostilin <rostilin@ozon.ru>
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: vertica-python>=1.3
Requires-Dist: pandas>=2.0
Requires-Dist: mattermostdriver>=2.0
Requires-Dist: requests>=2.30
Requires-Dist: python-dotenv>=1.2.1
Requires-Dist: typing-extensions>=4.0.0
Provides-Extra: airflow
Requires-Dist: apache-airflow<3.1,>=3.0; extra == "airflow"
Requires-Dist: apache-airflow-providers-postgres>=5.0; extra == "airflow"
Requires-Dist: apache-airflow-providers-vertica>=2.0; extra == "airflow"
Requires-Dist: sqlalchemy<2.0; extra == "airflow"

# mnemosynecore Documentation

## Общее описание
Библиотека mnemosynecore предоставляет набор инструментов для работы с Vertica БД, 
Airflow, Mattermost и другими системами аналитики. Она предназначена для автоматизации 
работы с данными и интеграции различных сервисов.

## Установка
```bash
pip install mnemosynecore

Для Airflow:
pip install mnemosynecore[airflow]

Основные модули
1. Работа с Vertica (mnemosynecore.db.vertica)
Функции для работы с БД
vertica_conn(conn_id) - Создание подключения к Vertica
from mnemosynecore.db.vertica import vertica_conn

# Подключение к Vertica
conn = vertica_conn("VERTICA_CONN_ID")
# Использование соединения
result = vertica_sql(conn=conn, sql="SELECT * FROM table")
conn.close()

**vertica_sql(kwargs) - Выполнение SQL запросов

from mnemosynecore.db.vertica import vertica_sql

# Выполнение запроса с ID подключения
vertica_sql(
    conn_id="VERTICA_CONN_ID",
    sql="INSERT INTO table VALUES (%s, %s)",
    params=[1, "test"]
)

# Выполнение запроса с готовым соединением
vertica_sql(conn=conn, sql="UPDATE table SET col = %s WHERE id = %s", params=[100, 1])

**vertica_select(kwargs) - Выполнение SELECT запросов

from mnemosynecore.db.vertica import vertica_select

# Получение данных в DataFrame
df = vertica_select(
    conn_id="VERTICA_CONN_ID",
    sql="SELECT * FROM table WHERE id = %s",
    params=[123]
)
print(df.head())

**vertica_dedupe(table_name, unique_keys, kwargs) - Удаление дубликатов

from mnemosynecore.db.vertica import vertica_dedupe

# Удаление дубликатов по одному ключу
vertica_dedupe(
    table_name="schema.table",
    unique_keys="id",
    conn_id="VERTICA_CONN_ID"
)

# Удаление дубликатов по нескольким ключам с учетом даты
vertica_dedupe(
    table_name="schema.table",
    unique_keys=["id", "date"],
    conn_id="VERTICA_CONN_ID",
    date_col="created_at",
    keep="last"
)

**vertica_upsert(df, table_name, unique_keys, kwargs) - Upsert операции

from mnemosynecore.db.vertica import vertica_upsert
import pandas as pd

# Подготовка данных
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [100, 200, 300]
})

# Upsert данных
vertica_upsert(
    df=df,
    table_name="schema.table",
    unique_keys=["id"],
    conn_id="VERTICA_CONN_ID"
)

load_sql_tasks_from_dir(dir_sql, vertica_conn_id) - Создание задач из SQL файлов

from mnemosynecore.db.vertica import load_sql_tasks_from_dir

# Создание задач из директории SQL файлов
tasks = load_sql_tasks_from_dir("/path/to/sql/files", "VERTICA_CONN_ID")

# Использование в Airflow DAG
from airflow import DAG
from airflow.utils.task_group import TaskGroup

with DAG("my_dag") as dag:
    with TaskGroup("vertica_tasks") as vertica_group:
        for task in tasks.values():
            task

read_sql_file(file_path) - Чтение SQL файлов
from mnemosynecore.db.vertica import read_sql_file

# Чтение SQL файла
sql_content = read_sql_file("/path/to/query.sql")
if sql_content:
    print("SQL загружен успешно")

2. Работа с секретами (mnemosynecore.vault)
get_secret(conn_id) - Получение секретов
from mnemosynecore.vault import get_secret

# Получение секрета из Vault
secret = get_secret("SECRET_ID")
print(secret["host"])  # Хост базы данных
print(secret["password"])  # Пароль

3. Интеграция с Mattermost (mnemosynecore.mattermost)
send_message(channel_id, bot_id, text, silent=False) - Отправка сообщений

from mnemosynecore.mattermost import send_message

# Отправка сообщения в канал
send_message(
    channel_id="s5c11srqkf8j3pbdwfbn9imrde",
    bot_id="MATTERMOST_BOT_ID",
    text="Привет! Это тестовое сообщение"
)

# Отправка Markdown сообщения
send_message(
    channel_id="s5c11srqkf8j3pbdwfbn9imrde",
    bot_id="MATTERMOST_BOT_ID",
    text="""
    📊 **Отчет по данным** 📊
    
    - Данные успешно загружены
    - Обработано записей: 1000
    - Время выполнения: 2 минуты
    
    [Подробнее](https://your-dashboard.com)
    """
)

4. Интеграция с Superset (mnemosynecore.superset)
superset_request(endpoint, method="GET", payload=None, vault_conn_id) - Запросы к Superset
from mnemosynecore.superset import superset_request

# Получение информации о дашборде
response = superset_request(
    endpoint="/api/v1/dashboard/123",
    method="GET",
    vault_conn_id="SUPERSET_CONN_ID"
)

# Создание нового дашборда
new_dashboard = superset_request(
    endpoint="/api/v1/dashboard/",
    method="POST",
    payload={"name": "New Dashboard"},
    vault_conn_id="SUPERSET_CONN_ID"
)

Конфигурация секретов
Формат секретов для Vault
Для Vertica:
{
    "host": "vertica-host.com",
    "port": "5433",
    "login": "username",
    "password": "password",
    "schema": "database_schema"
}

Для Mattermost:
{
    "host": "https://mattermost.company.com",
    "password": "bot_token_here",
    "scheme": "https",
    "port": 443,
    "basepath": "/api/v4"
}

Для Superset:
{
    "host": "https://superset.company.com",
    "password": "access_token_here",
    "scheme": "https",
    "port": 443,
    "basepath": "/api/v1"
}

Примеры использования
Пример 1: Полный пайплайн работы с Vertica
from mnemosynecore.db.vertica import vertica_conn, vertica_sql, vertica_select, vertica_upsert
from mnemosynecore.mattermost import send_message
import pandas as pd

def process_data_pipeline():
    # Подключение к Vertica
    conn = vertica_conn("VERTICA_CONN_ID")
    
    try:
        # Выполнение запроса
        df = vertica_select(
            conn=conn,
            sql="SELECT * FROM source_table WHERE date > %s",
            params=['2023-01-01']
        )
        
        # Обработка данных
        processed_df = df.groupby('category').sum()
        
        # Upsert в целевую таблицу
        vertica_upsert(
            df=processed_df,
            table_name="analytics.summary",
            unique_keys=["category"],
            conn=conn
        )
        
        # Отправка уведомления
        send_message(
            channel_id="s5c11srqkf8j3pbdwfbn9imrde",
            bot_id="MATTERMOST_BOT_ID",
            text="✅ Пайплайн выполнен успешно"
        )
        
    except Exception as e:
        send_message(
            channel_id="s5c11srqkf8j3pbdwfbn9imrde",
            bot_id="MATTERMOST_BOT_ID",
            text=f"❌ Ошибка в пайплайне: {str(e)}"
        )
        raise
    finally:
        conn.close()

Пример 2: Автоматическая очистка дубликатов

from mnemosynecore.db.vertica import vertica_dedupe

def remove_duplicates():
    vertica_dedupe(
        table_name="analytics.user_events",
        unique_keys=["user_id", "event_time"],
        conn_id="VERTICA_CONN_ID",
        date_col="event_time",
        keep="last"
    )

