Metadata-Version: 2.4
Name: kissml
Version: 0.4.11
Summary: Keep It Simple Stupid Tools for Machine Learning
Project-URL: Homepage, https://github.com/lou-k/kissml
Author-email: Lou Kratz <kissml@lou.dev>
License-Expression: CC-BY-NC-ND-4.0
License-File: LICENSE
Requires-Python: >=3.12
Requires-Dist: diskcache>=5.6.3
Requires-Dist: numpy>=2.3.5
Requires-Dist: pandas>=2.3.3
Requires-Dist: pydantic-settings>=2.0.0
Requires-Dist: pydantic>=2.12.5
Provides-Extra: dev
Requires-Dist: coverage>=7.0.0; extra == 'dev'
Requires-Dist: ipython>=8.37.0; extra == 'dev'
Requires-Dist: isort>=7.0.0; extra == 'dev'
Requires-Dist: pandas-stubs>=2.3.3.251219; extra == 'dev'
Requires-Dist: pandas>=2.3.3; extra == 'dev'
Requires-Dist: pkginfo>=1.12.1.2; extra == 'dev'
Requires-Dist: pre-commit>=3.6.0; extra == 'dev'
Requires-Dist: pyarrow>=22.0.0; extra == 'dev'
Requires-Dist: pytest-cov>=7.0.0; extra == 'dev'
Requires-Dist: pytest>=8.4.1; extra == 'dev'
Requires-Dist: ruff>=0.1.0; extra == 'dev'
Requires-Dist: ty>=0.0.8; extra == 'dev'
Description-Content-Type: text/markdown

# kissml

Keep It Simple Stupid Tools for Machine Learning

A Python library providing simple, powerful tools for ML workflows with minimal boilerplate.

I made this because:
* Most data science services are notebook based, but notebooks are difficult to debug
* Most frameworks (flyte, metaflow) focus on extending to the cloud. This is great, but for local iteration all we really need is reproducible pipeline steps.

## Installation

```bash
pip install kissml
```

## Steps

The `@step` decorator provides:
* execution tracking
* persistent disk-based caching for your functions
* post-run execution (i.e., after effects) for the return value -- useful to visualize data or log stats.

### Basic Usage

```python
from kissml import step, CacheConfig
import logging

# Simple execution time logging
@step(log_level=logging.INFO)
def process_data(data):
    # Your processing logic here
    return result

# With persistent caching
@step(
    log_level=logging.INFO,
    cache=CacheConfig(version=1)
)
def expensive_computation(data):
    # This will only run once per unique input
    # Subsequent calls return cached results
    return result
```

### Key Features

**Execution Time Tracking**: Log how long your functions take to run
```python
@step(log_level=logging.INFO)
def train_model(X, y):
    # Logs: "train_model completed in 45.2341 seconds"
    return model
```

**Persistent Disk Caching**: Cache results to disk and reuse them across runs
```python
@step(cache=CacheConfig(version=1))
def load_and_preprocess(filepath):
    # Expensive preprocessing runs once
    # Subsequent calls load from cache in milliseconds
    return processed_data
```

**Version-Based Invalidation**: Bump the version to invalidate old cache
```python
# Old implementation
@step(cache=CacheConfig(version=1))
def feature_engineering(df):
    return old_features(df)

# Updated implementation - cache automatically invalidated
@step(cache=CacheConfig(version=2))
def feature_engineering(df):
    return new_improved_features(df)
```

**Smart Serialization**: Efficient storage for pandas DataFrames and nested collections
```python
import pandas as pd

@step(cache=CacheConfig(version=1))
def analyze_data(df: pd.DataFrame) -> pd.DataFrame:
    # DataFrames cached as Parquet files (requires pyarrow)
    # Much more efficient than pickle
    return processed_df

@step(cache=CacheConfig(version=1))
def complex_pipeline(data) -> dict:
    # Returns dict with DataFrames, lists, etc.
    # Each type uses optimal serialization
    return {
        "results": some_dataframe,
        "metrics": [metric1, metric2],
        "metadata": {"key": "value"}
    }
```

### Cache Configuration

Control cache behavior with `CacheConfig`:

```python
from kissml import step, CacheConfig, EvictionPolicy

# No eviction (default) - cache grows forever
@step(cache=CacheConfig(version=1, eviction_policy=EvictionPolicy.NONE))
def permanent_cache(x):
    return x

# Least Recently Used - evicts oldest accessed items
@step(cache=CacheConfig(version=1, eviction_policy=EvictionPolicy.LEAST_RECENTLY_USED))
def lru_cache(x):
    return x

# Least Recently Stored - evicts oldest stored items
@step(cache=CacheConfig(version=1, eviction_policy=EvictionPolicy.LEAST_RECENTLY_STORED))
def lrs_cache(x):
    return x

# Least Frequently Used - evicts least accessed items
@step(cache=CacheConfig(version=1, eviction_policy=EvictionPolicy.LEAST_FREQUENTLY_USED))
def lfu_cache(x):
    return x
```

### AfterEffects

AfterEffects allow you to automatically execute side effects (like visualization, logging, or validation) after a step completes, whether the result was cached or freshly computed.

```python
from typing import Annotated
from kissml import step, AfterEffect, CacheConfig
import mlflow

# Define a custom AfterEffect
class HTMLVisualizer(AfterEffect):
    def __init__(self, max_rows=100):
        self.max_rows = max_rows
    
    def __call__(self, result, was_cached, func_name, execution_time):
        # Create HTML preview
        html = result.head(self.max_rows).to_html()
        html = f"<h3>{func_name} - {execution_time:.2f}s {'(cached)' if was_cached else ''}</h3>" + html
        
        # Log to MLflow
        with open(f"{func_name}.html", "w") as f:
            f.write(html)
        mlflow.log_artifact(f"{func_name}.html")

# Use it with type annotations
@step(cache=CacheConfig(version=1))
def load_data() -> Annotated[pd.DataFrame, HTMLVisualizer(max_rows=200)]:
    return pd.read_csv("data.csv")

# Multiple effects run left-to-right
class DatasetLogger(AfterEffect):
    def __call__(self, result, was_cached, func_name, execution_time):
        if not was_cached:  # Only log once
            mlflow.log_metric(f"{func_name}_rows", len(result))

@step(cache=CacheConfig(version=1))
def process() -> Annotated[pd.DataFrame, DatasetLogger(), HTMLVisualizer()]:
    # Both effects run automatically after the function completes
    return load_data()
```

**Error Handling**: Control whether AfterEffect failures stop execution:
```python
# Default: errors are logged but don't stop execution
@step(cache=CacheConfig(version=1))
def safe_pipeline() -> Annotated[pd.DataFrame, MyVisualizer()]:
    return data

# Strict mode: effect errors raise exceptions
@step(cache=CacheConfig(version=1), error_on_affect_failure=True)
def strict_pipeline() -> Annotated[pd.DataFrame, MyVisualizer()]:
    return data
```

**Global AfterEffects**: Register an AfterEffect once and have it fire after every `@step` call — no per-step annotation required. Useful for cross-cutting concerns like logging, persistence, or experiment tracking.

```python
import logging
from kissml import settings, step, AfterEffect

class StepTimingLogger(AfterEffect):
    """Log every step's name, runtime, and cache status."""

    def __call__(self, result, was_cached, func_name, execution_time):
        status = "cached" if was_cached else "fresh"
        logging.info(
            f"{func_name} finished in {execution_time:.3f}s ({status})"
        )

# Register once — fires for every @step call from now on
settings.global_after_effects.append(StepTimingLogger())

@step()
def load_data() -> pd.DataFrame:
    return pd.read_csv("data.csv")  # StepTimingLogger runs after this returns

@step()
def transform(df: pd.DataFrame) -> pd.DataFrame:
    return df.dropna()              # StepTimingLogger runs here too
```

Per-step effects (declared in the return annotation) fire first, then global effects. Both honor the `error_on_effect_failure` flag on the step.

### Configuration

Configure the cache directory via environment variable or settings:

```python
from kissml import settings
from pathlib import Path

# Set cache directory
settings.cache_directory = Path("/path/to/cache")

# Or use environment variable
# export KISSML_CACHE_DIRECTORY=/path/to/cache
```

### Custom Serialization

Register custom serializers for your types:

```python
from kissml.settings import settings
from kissml.types import Serializer
from typing import Any, BinaryIO

class MyCustomSerializer(Serializer):
    def serialize(self, value: Any, out: BinaryIO) -> None:
        # Your serialization logic
        pass

    def deserialize(self, input: BinaryIO) -> Any:
        # Your deserialization logic
        pass

# Register the serializer
settings.serialize_by_type[MyCustomType] = MyCustomSerializer()

# Register a hash function for cache keys
settings.hash_by_type[MyCustomType] = lambda obj: str(hash(obj))
```

## License

Licensed under CC BY-NC-ND 4.0 (Attribution-NonCommercial-NoDerivatives). This is a **non-commercial license** - see the [LICENSE](LICENSE) file for full details.

For commercial use, please contact the author.
