Metadata-Version: 2.4
Name: kelvin-python-sdk
Version: 0.4.4b1
Summary: Framework for Kelvin application development
Author-email: Kelvin Inc <engineering@kelvininc.com>
Project-URL: Homepage, https://kelvin.ai
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Requires-Python: <3.14,>=3.9
Description-Content-Type: text/markdown
Requires-Dist: pydantic==2.11.*
Requires-Dist: pydantic-settings==2.*
Requires-Dist: structlog==23.*
Requires-Dist: PyYAML==6.*
Requires-Dist: croniter<4.0,>=2.0
Requires-Dist: kelvin-krn<0.2,>=0.1.2b1
Requires-Dist: kelvin-python-api-client<2.0,>=1.1.3b1
Provides-Extra: publisher
Requires-Dist: click==8.*; extra == "publisher"
Requires-Dist: arrow==1.*; extra == "publisher"
Provides-Extra: testing
Requires-Dist: arrow==1.*; extra == "testing"
Requires-Dist: pandas==2.*; extra == "testing"
Provides-Extra: ai
Requires-Dist: pandas==2.*; extra == "ai"

# Kelvin Python SDK

A framework to build Kelvin applications in Python. This package helps you connect and interact with the Kelvin platform.

It provides tools for building, publishing, receiving, and filtering Kelvin messages (the primary communication method with Kelvin) and easier access to the resources (Assets) that the application is deployed to.

## Quickstart

Here's a simple example receiving a data Message and publishing another.

```python
from kelvin.application import KelvinApp
from kelvin.message import Number
from kelvin.krn import KRNAssetDataStream

app = KelvinApp()

@app.stream()
async def handle_input(msg):
    """Process all incoming messages"""
    print(f"Received: {msg.payload} from {msg.resource}")
    
    # Publish a response
    await app.publish(
        Number(
            resource=KRNAssetDataStream(msg.resource.asset, "output"),
            payload=msg.payload * 2
        )
    )

# Run the application
app.run()
```

## Table of Contents

- [Installation](#installation)
- [Core Concepts](#core-concepts)
  - [Kelvin Messages](#kelvin-messages)
  - [Kelvin Resource Names (KRN)](#kelvin-resource-names-krn)
- [Application Deployment Context](#application-deployment-context)
- [Message Handling](#message-handling)
  - [Stream Decorators](#stream-decorators)
  - [Filters](#filters)
  - [Callbacks](#callbacks)
- [Background Tasks](#background-tasks)
  - [Tasks](#tasks)
  - [Timers](#timers)
  - [Schedules](#schedules)
- [Data Windows](#data-windows)
  - [Tumbling Window](#tumbling-window)
  - [Hopping Window](#hopping-window)
  - [Rolling Window](#rolling-window)
- [Publishing Messages](#publishing-messages)
- [Testing with KelvinPublisher](#testing-with-kelvinpublisher)
- [Testing with KelvinAppTest](#testing-with-kelvinapptest)
- [Complete Example](#complete-example)

## Installation

```bash
pip install kelvin-python-sdk
```

For AI/Data Science features (pandas support):
```bash
pip install kelvin-python-sdk[ai]
```

For development and testing with the publisher tool:
```bash
pip install kelvin-python-sdk[publisher]
```

Run `kelvin-publisher --help` to see available testing options.

## Core Concepts

### Kelvin Messages

Messages are the primary interface for exchanging data with the Kelvin platform. Applications send and receive messages to communicate with assets and other components.

#### Message Types

The SDK provides pre-built message types for common data:

```python
from kelvin.message import Number, String, Boolean
from kelvin.krn import KRNAssetDataStream

# Create primitive data messages
number_msg = Number(
    resource=KRNAssetDataStream("asset-name", "datastream-name"),
    payload=42.5
)

string_msg = String(
    resource=KRNAssetDataStream("asset-name", "datastream-name"),
    payload="Hello Kelvin"
)

boolean_msg = Boolean(
    resource=KRNAssetDataStream("asset-name", "datastream-name"),
    payload=True
)
```

#### Message Builders

For more complex messages, use the provided `MessageBuilder` helpers:

```python
from kelvin.message import (
    ControlChange,
    ControlAck,
    Recommendation,
    DataTag,
    AssetParameter,
    AssetParameters,
    StateEnum
)
from kelvin.krn import KRNAsset, KRNAssetDataStream, KRNAssetParameter
from datetime import datetime, timedelta

# Control Change - request a change in asset behavior
control_change = ControlChange(
    resource=KRNAssetDataStream("asset-name", "setpoint"),
    payload=75.0,
    expiration_date=timedelta(minutes=10),
    timeout=60,
    retries=3
)

# Control Acknowledgement - respond to control changes
ack = ControlAck(
    resource=KRNAssetDataStream("asset-name", "setpoint"),
    state=StateEnum.applied,
    message="Control change successfully applied"
)

# Recommendation - suggest multiple control changes
recommendation = Recommendation(
    resource=KRNAsset("asset-name"),
    type="optimization",
    control_changes=[control_change],
    expiration_date=timedelta(hours=1),
    auto_accepted=False
)

# Data Tag - add metadata to data
data_tag = DataTag(
    resource=KRNAsset("asset-name"),
    start_date=datetime.now(),
    tag_name="anomaly_detected",
    description="Temperature spike detected"
)

# Asset Parameters - update asset configuration
param = AssetParameter(
    resource=KRNAssetParameter("asset-name", "threshold"),
    value=100
)
params = AssetParameters(parameters=[param])
```

#### Evidences

Evidences provide context to recommendations by including supporting data and visual information. They help users understand the reasoning behind a recommendation, presenting analysis results, charts, and explanations that justify the suggested actions.

```python
from kelvin.message import Recommendation
from kelvin.message.evidences import DataExplorer, DataExplorerSelector, Markdown
from kelvin.krn import KRNAsset, KRNAssetDataStream
from datetime import datetime, timedelta

# Markdown evidence - provide textual explanations
markdown_evidence = Markdown(
    title="Analysis Summary",
    markdown="""
## Temperature Anomaly Detected

The system detected unusually high temperature readings:
- Average temperature: 85°C
- Normal range: 60-75°C
- Duration: 2 hours

**Recommended action**: Reduce speed to allow cooling.
"""
)

# Data Explorer evidence - visualize time-series data
now = datetime.now()
data_explorer = DataExplorer(
    title="Temperature Trend Analysis",
    start_time=now - timedelta(hours=6),
    end_time=now,
    selectors=[
        DataExplorerSelector(
            resource=KRNAssetDataStream("asset-name", "temperature")
        ),
        DataExplorerSelector(
            resource=KRNAssetDataStream("asset-name", "pressure"),
            agg="mean",
            time_bucket="5m"
        )
    ]
)
 
# Include evidences in recommendation
recommendation = Recommendation(
    resource=KRNAsset("asset-name"),
    type="temperature_optimization",
    evidences=[markdown_evidence, data_explorer]
)
```

The SDK supports multiple evidence types including `Markdown`, `DataExplorer`, `LineChart`, `BarChart`, `Image`, and `IFrame`. For a complete list of available evidence types and their configurations, see `kelvin.message.evidences`.

### Kelvin Resource Names (KRN)

KRNs are used to uniquely identify Kelvin resources. They follow a structured format to reference assets, datastreams, parameters, and other entities.

```python
from kelvin.krn import KRN, KRNAsset, KRNAssetDataStream, KRNAssetParameter

# Generic KRN (parse from string)
generic_krn = KRN.from_string("krn:asset:my-asset")

# Asset KRN
asset_krn = KRNAsset("my-asset")

# Asset DataStream KRN
datastream_krn = KRNAssetDataStream("my-asset", "temperature")
```

## Application Deployment Context

When your application connects to Kelvin, it receives runtime information about its deployment environment. The `KelvinApp` instance provides access to:

- **`app.app_configuration`**: Application-specific configuration settings
- **`app.inputs`**: Input datastreams configured for your application
- **`app.outputs`**: Output datastreams configured for your application
- **`app.assets`**: The assets your application is deployed to (most important)

### Working with Assets

The `app.assets` dictionary contains all assets available to your application. Each asset provides access to its properties, parameters, and datastreams:

```python
from kelvin.application import KelvinApp

app = KelvinApp()

async def on_connect():
    # Access all available assets
    for asset_name, asset in app.assets.items():
        print(f"Asset: {asset_name}")
        
        # Access asset properties
        print(f"  Properties: {asset.properties}")
        
        # Access asset parameters
        for param_name, param_value in asset.parameters.items():
            print(f"  Parameter {param_name}: {param_value}")
        
        # Access asset datastreams
        for ds_name, datastream in asset.datastreams.items():
            print(f"  Datastream {ds_name}: {datastream}")

app.on_connect = on_connect
app.run()
```

Assets are the core resources in Kelvin, representing physical or logical entities (machines, sensors, systems, etc.). Your application interacts with assets by reading from their datastreams and publishing data or control changes back to them.


## Message Handling

### Stream Decorators

Stream decorators are the recommended way to handle incoming messages. They allow you to process messages based on specific criteria:

Use decorators to process messages based on specific criteria:

```python
from kelvin.application import KelvinApp
from kelvin.message.typing import AssetDataMessage

app = KelvinApp()

# Process all messages
@app.stream()
async def handle_all(msg: AssetDataMessage):
    print(f"All messages: {msg.payload}")

# Filter by asset
@app.stream(assets=["asset-1", "asset-2"])
async def handle_specific_assets(msg: AssetDataMessage):
    print(f"From specific assets: {msg.payload}")

# Filter by input datastream
@app.stream(inputs=["temperature", "pressure"])
async def handle_specific_inputs(msg: AssetDataMessage):
    print(f"From specific inputs: {msg.payload}")

# Can also register functions directly
def my_handler(msg: AssetDataMessage):
    print(f"Handler: {msg.payload}")

app.stream(my_handler, inputs=["humidity"])

app.run()
```

### Filters

Filters allow you to selectively process messages using queues or async streams:

```python
from kelvin.application import KelvinApp, filters

app = KelvinApp()

async def main():
    await app.connect()
    
    # Using a queue with filters
    queue = app.filter(filters.input_equals("temperature"))
    while True:
        msg = await queue.get()
        print(f"Temperature: {msg.payload}")
    
    # Using async stream with filters
    stream = app.stream_filter(filters.asset_equals("asset-1"))
    async for msg in stream:
        print(f"Asset 1 data: {msg.payload}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
```

#### Built-in Filters

```python
from kelvin.application import filters

# Filter by input datastream name(s)
filters.input_equals("temperature")
filters.input_equals(["temperature", "pressure"])

# Filter by asset name(s)
filters.asset_equals("my-asset")
filters.asset_equals(["asset-1", "asset-2"])

# Filter by resource KRN
filters.resource_equals(krn_instance)
filters.resource_equals([krn1, krn2])

# Filter by message type
filters.is_data_message(msg)
filters.is_asset_data_message(msg)
filters.is_control_status_message(msg)
filters.is_custom_action(msg)
filters.is_data_quality_message(msg)
```

#### Custom Filters

You can create custom filter functions:

```python
def custom_filter(msg: Message) -> bool:
    """Filter for high-value readings"""
    return msg.payload > 100

queue = app.filter(custom_filter)
```

### Callbacks

For advanced scenarios, you can define callbacks for specific lifecycle events. However, stream decorators are generally preferred for message processing.

Callbacks can be registered using **decorators** or **direct assignment**. Both sync and async functions are supported — sync functions are automatically wrapped to run in a thread pool.

#### Using Decorators

```python
from kelvin.application import KelvinApp, AssetInfo
from kelvin.message.typing import AssetDataMessage
from typing import Optional

app = KelvinApp()

@app.on_connect
async def handle_connect():
    """Called when the app connects to Kelvin"""
    print("Connected to Kelvin platform")
    print(f"Configuration: {app.app_configuration}")
    print(f"Assets: {app.assets}")

@app.on_asset_input
async def handle_asset_input(msg: AssetDataMessage):
    """Called for data messages from asset inputs"""
    print(f"Data from {msg.resource}: {msg.payload}")

@app.on_control_change
async def handle_control_change(msg: AssetDataMessage):
    """Called when control changes are received"""
    print(f"Control change for {msg.resource}: {msg.payload}")

@app.on_asset_change
async def handle_asset_change(new_asset: Optional[AssetInfo], old_asset: Optional[AssetInfo]):
    """Called when assets are added, removed, or modified"""
    if new_asset is None:
        print(f"Asset removed: {old_asset.name}")
    else:
        print(f"Asset changed: {new_asset.name}")

@app.on_app_configuration
async def handle_app_configuration(config: dict):
    """Called when app configuration changes"""
    print(f"New configuration: {config}")

app.run()
```

#### Using Direct Assignment

```python
app = KelvinApp()

async def on_connect():
    print("Connected to Kelvin platform")

app.on_connect = on_connect
app.on_asset_input = on_asset_input
app.on_control_change = on_control_change
app.on_asset_change = on_asset_change
app.on_app_configuration = on_app_configuration

app.run()
```

#### Available Callbacks

| Callback | Arguments | Description |
|----------|-----------|-------------|
| `on_connect` | none | Connection established |
| `on_disconnect` | none | Connection closed |
| `on_message` | `msg: Message` | Any message received |
| `on_asset_input` | `msg: AssetDataMessage` | Asset data message received |
| `on_control_change` | `msg: AssetDataMessage` | Control change received |
| `on_control_status` | `msg: ControlChangeStatus` | Control status received |
| `on_custom_action` | `msg: CustomAction` | Custom action received |
| `on_data_tag` | `msg: DataTag` | Data tag message received |
| `on_asset_change` | `new: Optional[AssetInfo], old: Optional[AssetInfo]` | Asset added, removed, or modified |
| `on_app_configuration` | `config: dict[str, Any]` | App configuration changed |

## Background Tasks

### Tasks

Tasks are functions that run in the background. They're started when the application connects:

```python
from kelvin.application import KelvinApp
import asyncio

app = KelvinApp()

@app.task
async def background_task():
    """Runs once when the app starts"""
    print("Task started")
    # Perform initialization or one-time operations

@app.task
async def continuous_task():
    """Runs continuously in the background"""
    while True:
        print("Processing...")
        await asyncio.sleep(10)

# Can also register functions directly
async def another_task():
    print("Another task")

app.task(another_task, name="my_task")

app.run()
```

### Timers

Timers execute functions at regular intervals:

```python
from kelvin.application import KelvinApp

app = KelvinApp()

@app.timer(interval=30)  # seconds
async def periodic_check():
    """Runs every 30 seconds"""
    print("Periodic check executed")

@app.timer(interval=60)
async def publish_metrics():
    """Runs every 60 seconds"""
    # Publish periodic metrics
    await app.publish(...)

# Register timers directly
def sync_timer():
    print("Sync timer")

app.timer(sync_timer, interval=10, name="sync_timer")

app.run()
```

### Schedules

Schedules execute functions on cron-like schedules with timezone support. They are ideal for tasks that need to run at specific times of day or on specific days of the week.

#### Human-readable schedules

Use `every` and `at` parameters for readable schedule definitions:

```python
from kelvin.application import KelvinApp

app = KelvinApp()

@app.schedule(every="day", at="19:00", timezone="Australia/Sydney")
async def daily_report():
    """Runs every day at 7pm Sydney time"""
    print("Daily report")

@app.schedule(every="monday", at="09:00")
async def weekly_standup():
    """Runs every Monday at 09:00 UTC"""
    print("Weekly standup reminder")

@app.schedule(every="weekday", at="08:00", timezone="Europe/London")
async def weekday_morning():
    """Runs Monday-Friday at 08:00 London time"""
    print("Good morning!")

# Multiple times per day
@app.schedule(every="day", at=["09:00", "17:00"])
async def twice_daily():
    """Runs at 09:00 and 17:00 UTC"""
    print("Running twice daily")

app.run()
```

The `every` parameter accepts: `"day"`, `"monday"` through `"sunday"`, `"weekday"`, or `"weekend"`.

#### Cron expressions

For more complex schedules, use standard 5-field cron expressions:

```python
@app.schedule(cron="*/5 * * * *")
def every_five_minutes():
    """Runs every 5 minutes (sync functions are also supported)"""
    print("Running every 5 minutes")

@app.schedule(cron="0 9 * * MON-FRI", timezone="Europe/London")
async def weekday_job():
    """Runs at 09:00 London time on weekdays"""
    print("Weekday job")
```

#### Interval scheduling

Use `interval` to fire every Nth occurrence. Combined with `start_time`, the count alignment is deterministic:

```python
from datetime import datetime, timezone

@app.schedule(
    every="monday",
    at="09:00",
    interval=2,
    start_time=datetime(2026, 4, 6, tzinfo=timezone.utc),
)
async def biweekly_sync():
    """Runs every other Monday at 09:00 UTC, starting from April 6, 2026"""
    print("Biweekly sync")
```

#### Parameters

| Parameter | Description |
|-----------|-------------|
| `every` | Human-readable recurrence (`"day"`, `"monday"`-`"sunday"`, `"weekday"`, `"weekend"`) |
| `at` | Time(s) of day in `"HH:MM"` format. Required when using `every`. Can be a string or list of strings |
| `cron` | Standard 5-field cron expression. Mutually exclusive with `every`/`at` |
| `timezone` | IANA timezone name (default `"UTC"`) |
| `start_time` | Reference datetime for deterministic schedule alignment |
| `interval` | Fire every Nth cron match (e.g., `2` for every other occurrence) |
| `name` | Optional task name for logging |

Schedules can also be registered directly without the decorator syntax:

```python
async def my_scheduled_task():
    print("Scheduled task")

app.schedule(my_scheduled_task, every="day", at="12:00", name="noon_task")
```

## Data Windows

Data windows aggregate incoming data over time or count, returning pandas DataFrames for analysis. All window operations require the `ai` optional dependency.

### Tumbling Window

Non-overlapping, fixed-size time windows:

```python
import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp

app = KelvinApp()

async def main():
    await app.connect()
    
    # Process data in 10-second non-overlapping windows
    window_start = datetime.now()
    async for asset_name, df in app.tumbling_window(
        window_size=timedelta(seconds=10)
    ).stream(window_start):
        print(f"Asset: {asset_name}")
        print(df)  # pandas DataFrame with all data in window

if __name__ == "__main__":
    asyncio.run(main())
```

**Use case**: Aggregate data every N seconds for batch processing (e.g., computing averages every 10 seconds).

### Hopping Window

Overlapping, fixed-size windows with a configurable hop interval:

```python
import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp

app = KelvinApp()

async def main():
    await app.connect()
    
    # 10-second windows, moving forward by 5 seconds each time
    window_start = datetime.now()
    async for asset_name, df in app.hopping_window(
        window_size=timedelta(seconds=10),
        hop_size=timedelta(seconds=5)
    ).stream(window_start=window_start):
        print(f"Asset: {asset_name}")
        print(df)  # pandas DataFrame with overlapping data

if __name__ == "__main__":
    asyncio.run(main())
```

**Use case**: Sliding window analysis where you want overlapping data (e.g., moving averages with 50% overlap).

### Rolling Window

Count-based windows that slide with each new message:

```python
import asyncio
from kelvin.application import KelvinApp

app = KelvinApp()

async def main():
    await app.connect()
    
    # Window of last 5 messages, slides by 2 messages
    async for asset_name, df in app.rolling_window(
        count_size=5,
        slide=2
    ).stream():
        print(f"Asset: {asset_name}")
        print(df)  # pandas DataFrame with last 5 messages

if __name__ == "__main__":
    asyncio.run(main())
```

**Use case**: Process the last N data points (e.g., calculate trend over the last 10 readings).

## Publishing Messages

Use `app.publish()` to send messages to the Kelvin platform:

```python
from kelvin.application import KelvinApp
from kelvin.message import Number, ControlChange, Recommendation
from kelvin.krn import KRNAssetDataStream, KRNAsset
from datetime import timedelta

app = KelvinApp()

@app.timer(interval=10)
async def publish_data():
    # Publish data
    await app.publish(
        Number(
            resource=KRNAssetDataStream("asset-1", "output"),
            payload=42.0
        )
    )
    
    # Publish control change
    await app.publish(
        ControlChange(
            resource=KRNAssetDataStream("asset-1", "setpoint"),
            payload=75.0,
            expiration_date=timedelta(minutes=5)
        )
    )
    
    # Publish recommendation
    await app.publish(
        Recommendation(
            resource=KRNAsset("asset-1"),
            type="optimization",
            control_changes=[...],
            expiration_date=timedelta(hours=1)
        )
    )

app.run()
```

## Testing with KelvinPublisher

`kelvin-publisher` is a CLI tool for testing applications during development. It's not meant to be imported in your code, but used as a standalone testing utility.

### Installation

```bash
pip install kelvin-python-sdk[publisher]
```

Run `kelvin-publisher --help` to see all available commands and options.

### Usage

The publisher has three modes for simulating data:

#### 1. Simulator - Random Data

Generate random data to your application's inputs:

```bash
kelvin-publisher simulator --help
kelvin-publisher simulator
```

This automatically discovers your application's inputs and publishes random data.

#### 2. CSV - File-based Data

Publish data from a CSV file:

```bash
kelvin-publisher csv --help
kelvin-publisher csv --csv data.csv
```

Replays test data from CSV files.

#### 3. Generator - Custom Data

Use a custom Python class to generate data:

```bash
kelvin-publisher generator --help
kelvin-publisher generator --entrypoint action_generator.py:CustomActionGenerator
```

Example generator:

```python
import asyncio
from typing import AsyncGenerator

from kelvin.message import Number
from kelvin.krn import KRNAssetDataStream
from kelvin.publisher import DataGenerator

class OtherGenerator(DataGenerator):
    def __init__(self) -> None:
        print("Hello from OtherGenerator")

    async def run(self) -> AsyncGenerator[Number, None]:
        print("Running OtherGenerator")
        for i in range(20, 30):
            yield Number(
                resource=KRNAssetDataStream("test-asset-1", "input-number"),
                payload=i,
            )
            await asyncio.sleep(1)
```

This mode allows you to implement sophisticated test scenarios with custom logic.

## Testing with KelvinAppTest

The `kelvin.testing` module provides a deterministic, in-memory test harness for `KelvinApp` instances. Tests run without network dependencies and use a virtual clock for time control.

### Quick Start

Add a `pytest.ini` alongside your tests:

```ini
[pytest]
pythonpath = .
```

Write tests against your app singleton:

```python
import pytest
from main import app

from kelvin.krn import KRNAssetDataStream
from kelvin.message import Number
from kelvin.testing import KelvinAppTest, ManifestBuilder


def _build_manifest():
    return (
        ManifestBuilder()
        .add_input("temperature")
        .add_output("alert")
        .add_asset("pump-001")
    )


class TestMyApp:
    @pytest.mark.asyncio
    async def test_processes_input(self):
        harness = KelvinAppTest(app, manifest=_build_manifest().build())

        async with harness:
            await harness.publish(
                Number(
                    resource=KRNAssetDataStream("pump-001", "temperature"),
                    payload=42.0,
                )
            )
            await harness.run_until_idle()

        outputs = harness.outputs
        assert len(outputs) > 0
```

The pattern is always:

1. Build a manifest with `ManifestBuilder` (assets, datastreams, configuration)
2. Create `KelvinAppTest` wrapping your app
3. Use `async with` to connect/disconnect
4. Publish inputs and/or advance time with `run_until_idle()`
5. Assert on `harness.outputs`

### Testing Timers

Timers fire automatically when virtual time advances. Set the `timeout` parameter past the timer interval:

```python
@pytest.mark.asyncio
async def test_timer_fires(self, capsys):
    harness = KelvinAppTest(app, manifest=ManifestBuilder().build())

    async with harness:
        # Timer interval is 5s - advance past the first firing
        await harness.run_until_idle(timeout=6.0)

    captured = capsys.readouterr().out
    assert "timer fired" in captured
```

### Inspecting Outputs

Filter outputs by message type:

```python
from kelvin.message.base_messages import RecommendationMsg, ControlChangeMsg

recs = [m for m in harness.outputs if isinstance(m, RecommendationMsg)]
ccs = [m for m in harness.outputs if isinstance(m, ControlChangeMsg)]
```

### Data Sources

Automate input generation with built-in data sources:

```python
from datetime import timedelta
from kelvin.testing import RandomSource, CSVSource, SyntheticSource, SineWave

# Random values
source = RandomSource(datastreams=["temperature"], min_value=0, max_value=100).with_asset("pump-001")

# Replay from CSV
source = CSVSource("data.csv", playback=True).with_asset("pump-001")

# Synthetic wave patterns
source = SyntheticSource(
    pattern=SineWave(amplitude=10, period=timedelta(minutes=1)),
    datastream="temperature",
).with_asset("pump-001")

harness.add_source(source)  # add before connecting
```

For the full testing guide including `ManifestBuilder` options, all data source types, and detailed testing patterns, see [docs/modules/testing.rst](docs/modules/testing.rst).

## Complete Example

Here's a complete example that demonstrates multiple features working together:

```python
import asyncio
from datetime import timedelta
from kelvin.application import KelvinApp, filters
from kelvin.message import Number, ControlChange, ControlAck, StateEnum
from kelvin.krn import KRNAssetDataStream

app = KelvinApp()

# Lifecycle callbacks
@app.on_connect
async def handle_connect():
    print("Connected to Kelvin platform")
    print(f"Configuration: {app.app_configuration}")
    print(f"Available assets: {list(app.assets.keys())}")

@app.on_control_change
async def handle_control_change(msg):
    print(f"Control change received for {msg.resource}")
    # Acknowledge the control change
    await app.publish(
        ControlAck(
            resource=msg.resource,
            state=StateEnum.applied,
            message="Control change successfully applied"
        )
    )

# Stream decorators for processing inputs
@app.stream(inputs=["temperature"])
async def process_temperature(msg):
    """Process temperature readings"""
    temp = msg.payload
    print(f"Temperature from {msg.resource.asset}: {temp}°C")
    
    # Publish processed data
    await app.publish(
        Number(
            resource=KRNAssetDataStream(msg.resource.asset, "temp_fahrenheit"),
            payload=temp * 9/5 + 32
        )
    )

@app.stream(inputs=["pressure"])
async def process_pressure(msg):
    """Process pressure readings"""
    print(f"Pressure from {msg.resource.asset}: {msg.payload} Pa")

# Background task
@app.task
async def monitor_system():
    """Continuous monitoring task"""
    await asyncio.sleep(5)  # Wait for initialization
    
    while True:
        print("Monitoring system health...")
        # Add your monitoring logic here
        await asyncio.sleep(30)

# Periodic timer
@app.timer(interval=60)
async def publish_heartbeat():
    """Publish heartbeat every minute"""
    for asset_name in app.assets.keys():
        await app.publish(
            Number(
                resource=KRNAssetDataStream(asset_name, "heartbeat"),
                payload=1
            )
        )

# Run the application
if __name__ == "__main__":
    app.run()
```
