Metadata-Version: 2.3
Name: kicker
Version: 1.0.0
Summary: Lightweight job runner framework built on FastAPI and APScheduler with a simple web UI.
Author: Andrey Morozov
Author-email: Andrey Morozov <andrey@morozov.lv>
Requires-Dist: apscheduler>=3.11.2
Requires-Dist: dominate>=2.9.1
Requires-Dist: fastapi>=0.136.1
Requires-Dist: pydantic>=2.13.3
Requires-Dist: python-multipart>=0.0.28
Requires-Dist: uvicorn>=0.46.0
Requires-Python: >=3.14
Description-Content-Type: text/markdown

# kicker

Lightweight job runner built on FastAPI and APScheduler with a simple web UI.

## Features

* **FastAPI-style developer experience**
  * `app = Kicker()`
  * `@app.kick(...)`
  * standard `uvicorn module:app`
* Schedule jobs with APScheduler using decorators
* Run jobs manually from UI
* Pause/resume scheduler and individual jobs
* Supports both sync and async functions
* Simple HTML interface
* Use multiple log outputs
* Save execution state between job runs

---

## Installation

```bash
uv add kicker
```

---

## Usage

Create a project and define your jobs:

```python
# main.py
from kicker import Kicker, JobContext

app = Kicker(
  logger_fmt="%(prefix)s[%(levelname)s] %(message)s at %(asctime)s"
)

@app.kick(day_of_week='mon-fri', hour='9-20/2', minute=0)
async def job_echo(ctx: JobContext):
    ctx.logger.info("job_echo executed")


# run the app
# uv run uvicorn package.module:app --reload
```

To see logs in the UI, declare a `logger` parameter — kicker injects it automatically. Jobs without it work fine but won't produce UI logs.

## Splitting jobs across files

Like FastAPI's `APIRouter`, `Coworker` lets you define jobs in separate files and include them in the main app — no circular imports, no shared `app` instance.

```python
# jobs/weekly_report.py
from kicker import Coworker, JobContext

worker = Coworker()

@worker.kick(hour=8, minute=0)
async def weekly_report(ctx: JobContext):
  ctx.logger.info("weekly_report executed")
```

```python
# main.py
from kicker import Kicker
from jobs.weekly_report import worker

app = Kicker()
app.include_coworker(worker)
```

## Adding multiple log outputs

It is possible to have multiple visual log output "containers". By default, the `default` output container is used.
To see logs in a different output container, use the standatd `extra` keyword argument of the logger method you use (`error` in this example)
with a `output` key and some string value as a new output container name:

```python
from kicker import Kicker, JobContext

app = Kicker()

@app.kick(second="*/5")
def sync_job(ctx: JobContext):
    try:
      ...
    except:
        ctx.logger.error("error in a sync job executed in the thread pool", extra={"output": "errors"})
```

or, cleaner with `partial`:
```python
from functools import partial

from kicker import Kicker, JobContext

app = Kicker()

@app.kick(second="*/5")
def sync_job(ctx: JobContext):
    log_error = partial(ctx.logger.error, extra={"output": "errors"})
    try:
      ...
    except:
        log_error("error in a sync job executed in the thread pool")
```

## Getting access to the scheduler and the job

Along with the `logger` parameter, kicker also injects the `scheduler` object and a `job_id` of the current job object.
This is useful for managing jobs — for example, we can change the next run time of the current job:

```python
from datetime import datetime, timedelta
from functools import partial

from kicker import Kicker, JobContext

app = Kicker()

@app.kick(second="*/5")
def sync_job(ctx: JobContext):
    log_debug = partial(ctx.logger.debug, extra={"output": "debug"})
    slowdown_interval_minutes = 10
    try:
      ...
    except Exception as e:
        ctx.scheduler.modify_job(
          ctx.job_id, 
          next_run_time=datetime.now() + timedelta(minutes = slowdown_interval_minutes)
        )
        log_debug(f"error: {e} - slowing down for {slowdown_interval_minutes} minutes")
```

## Save execution state between job runs

You can save arbitrary data in `ctx.storage` between job runs.

```python
from kicker import Coworker, JobContext

worker = Coworker()

@worker.kick(second="*/6")
async def very_important_job_with_runs_counter(ctx: JobContext):
    counter = ctx.storage.get("counter", 0)
    ctx.logger.info(f"important job executed ({counter}) times")
    ctx.storage["counter"] = counter + 1
```

---

## Notes

* Scheduler runs in-process (not distributed)
* Running with multiple workers will duplicate job execution
* Designed for simple internal tools and automation

---

## License

MIT
