Advanced Usage¶
This guide covers advanced PIPolars usage patterns for complex scenarios.
Bulk Operations¶
For large-scale data extraction, PIPolars optimizes bulk operations:
with PIClient("my-pi-server") as client:
# Extract many tags efficiently
tags = client.search_tags("PLANT1.*") # May return hundreds of tags
# Bulk extraction with parallelization
df = client.recorded_values(
tags[:100], # First 100 tags
start="*-1d",
end="*",
pivot=True
)
Parallel requests are configured via QueryConfig:
from pipolars.core.config import QueryConfig
config = PIConfig(
server=PIServerConfig(host="my-server"),
query=QueryConfig(
parallel_requests=8, # 8 concurrent requests
),
)
Lazy Evaluation¶
Use LazyFrame for query optimization on large datasets:
with PIClient("my-pi-server") as client:
# Get LazyFrame instead of DataFrame
lf = (
client.query("SINUSOID")
.last(days=30)
.recorded()
.to_lazy_frame()
)
# Build query (not executed)
result = (
lf.filter(pl.col("value") > 50)
.with_columns(
pl.col("value").rolling_mean(window_size=100).alias("rolling")
)
.group_by(pl.col("timestamp").dt.date())
.agg(pl.col("value").mean())
.sort("timestamp")
)
# Execute optimized query
df = result.collect()
Error Handling¶
Comprehensive error handling for production systems:
from pipolars import PIClient
from pipolars.core.exceptions import (
PIPolarsError,
PIConnectionError,
PIAuthenticationError,
PIDataError,
PIPointNotFoundError,
PIQueryError,
PITimeParseError,
PIBulkOperationError,
)
def safe_query(client, tag, start, end):
"""Query with comprehensive error handling."""
try:
return client.recorded_values(tag, start, end)
except PIPointNotFoundError as e:
print(f"Tag not found: {e.tag}")
return None
except PITimeParseError as e:
print(f"Invalid time expression: {e}")
return None
except PIConnectionError as e:
print(f"Connection failed to {e.server}: {e.message}")
raise
except PIPolarsError as e:
print(f"PI error: {e.message}")
print(f"Details: {e.details}")
raise
Handling Bulk Operation Failures¶
from pipolars.core.exceptions import PIBulkOperationError
try:
df = client.recorded_values(
["TAG1", "TAG2", "INVALID_TAG", "TAG3"],
start="*-1h",
end="*"
)
except PIBulkOperationError as e:
print(f"Succeeded: {e.succeeded}")
print(f"Failed: {e.failed}")
# Process successful results
for tag in e.succeeded:
print(f"Got data for {tag}")
Streaming Large Datasets¶
For very large time ranges, process data in chunks:
from datetime import datetime, timedelta
def stream_data(client, tag, start, end, chunk_days=7):
"""Stream data in chunks to avoid memory issues."""
current = start
while current < end:
chunk_end = min(current + timedelta(days=chunk_days), end)
df = client.recorded_values(
tag,
start=current.isoformat(),
end=chunk_end.isoformat()
)
yield df
current = chunk_end
# Usage
with PIClient("my-pi-server") as client:
start = datetime(2024, 1, 1)
end = datetime(2024, 12, 31)
for chunk_df in stream_data(client, "SINUSOID", start, end):
# Process each chunk
process(chunk_df)
Custom Data Processing Pipeline¶
Build reusable processing pipelines:
import polars as pl
class PIDataPipeline:
"""Reusable data processing pipeline."""
def __init__(self, client):
self.client = client
self.steps = []
def add_step(self, step_fn):
self.steps.append(step_fn)
return self
def execute(self, tags, start, end, **kwargs):
df = self.client.recorded_values(tags, start, end, **kwargs)
for step in self.steps:
df = step(df)
return df
# Define processing steps
def add_rolling_stats(df):
return df.with_columns([
pl.col("value").rolling_mean(window_size=12).alias("rolling_avg"),
pl.col("value").rolling_std(window_size=12).alias("rolling_std"),
])
def filter_outliers(df, z_threshold=3):
mean = df["value"].mean()
std = df["value"].std()
return df.filter(
((pl.col("value") - mean) / std).abs() < z_threshold
)
def add_time_features(df):
return df.with_columns([
pl.col("timestamp").dt.hour().alias("hour"),
pl.col("timestamp").dt.weekday().alias("day_of_week"),
])
# Build and execute pipeline
with PIClient("my-pi-server") as client:
pipeline = (
PIDataPipeline(client)
.add_step(add_rolling_stats)
.add_step(filter_outliers)
.add_step(add_time_features)
)
df = pipeline.execute("SINUSOID", "*-7d", "*")
Logging and Monitoring¶
Configure logging for debugging:
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("pipolars")
logger.setLevel(logging.DEBUG)
# Or via configuration
config = PIConfig(
server=PIServerConfig(host="my-server"),
debug=True,
log_level="DEBUG",
)
Query Performance Monitoring¶
import time
class TimingClient:
"""Client wrapper with timing instrumentation."""
def __init__(self, client):
self.client = client
self.timings = []
def recorded_values(self, *args, **kwargs):
start = time.perf_counter()
result = self.client.recorded_values(*args, **kwargs)
elapsed = time.perf_counter() - start
self.timings.append({
"method": "recorded_values",
"elapsed": elapsed,
"rows": len(result),
})
return result
def report(self):
import polars as pl
return pl.DataFrame(self.timings).describe()
Connection Pooling Pattern¶
For long-running applications:
import threading
from queue import Queue
class PIConnectionPool:
"""Simple connection pool for PIPolars."""
def __init__(self, server, size=4):
self.server = server
self.size = size
self.pool = Queue(maxsize=size)
self._lock = threading.Lock()
# Pre-create connections
for _ in range(size):
client = PIClient(server)
client.connect()
self.pool.put(client)
def acquire(self):
return self.pool.get()
def release(self, client):
self.pool.put(client)
def close_all(self):
while not self.pool.empty():
client = self.pool.get()
client.disconnect()
# Usage
pool = PIConnectionPool("my-pi-server", size=4)
try:
client = pool.acquire()
df = client.recorded_values("SINUSOID", "*-1h", "*")
pool.release(client)
finally:
pool.close_all()
Context manager version:
from contextlib import contextmanager
class PIConnectionPool:
# ... previous implementation ...
@contextmanager
def connection(self):
client = self.acquire()
try:
yield client
finally:
self.release(client)
# Usage
pool = PIConnectionPool("my-pi-server")
with pool.connection() as client:
df = client.recorded_values("SINUSOID", "*-1h", "*")
Working with AF Data¶
Access AF elements and attributes:
from pipolars.connection.af_database import AFDatabaseConnection
# Connect to AF Database
af_conn = AFDatabaseConnection(
server="my-af-server",
database="MyDatabase"
)
af_conn.connect()
# Navigate AF hierarchy
root = af_conn.get_root_element()
elements = af_conn.search_elements("Plant1|*")
for element in elements:
attributes = element.Attributes
for attr in attributes:
print(f"{element.Name}.{attr.Name}")
uv Script Dependencies¶
Run standalone scripts with uv inline dependencies:
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["pipolars"]
# ///
from pipolars import PIClient, SummaryType
def main():
with PIClient("my-pi-server") as client:
df = (
client.query("SINUSOID")
.last(days=7)
.summary(SummaryType.AVERAGE, SummaryType.MAXIMUM)
.to_dataframe()
)
print(df)
if __name__ == "__main__":
main()
Run with:
uv run my_script.py
Integration with Other Libraries¶
With pandas¶
# Convert to pandas
pandas_df = df.to_pandas()
# Use pandas functionality
pandas_df.to_excel("output.xlsx")
With NumPy¶
import numpy as np
# Get numpy array
values = df["value"].to_numpy()
# Perform numpy operations
fft_result = np.fft.fft(values)
With scikit-learn¶
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
# Prepare features
X = df.select(["value", "rolling_avg"]).to_numpy()
# Scale and cluster
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
kmeans = KMeans(n_clusters=3)
clusters = kmeans.fit_predict(X_scaled)
# Add back to DataFrame
df = df.with_columns(pl.Series("cluster", clusters))
With Plotly¶
import plotly.express as px
# Convert to pandas for Plotly
pandas_df = df.to_pandas()
fig = px.line(
pandas_df,
x="timestamp",
y="value",
title="PI Data"
)
fig.show()
Next Steps¶
API Reference - Complete API reference
Caching - Caching configuration
Configuration - Full configuration options