Learn QuickHooks through interactive examples and live code demonstrations
Get started with QuickHooks by installing it using UV (recommended) or pip. UV provides 10-100x faster dependency management compared to traditional pip.
# Install UV (fast package manager)
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install QuickHooks with all features
uv add quickhooks[all]
# Set up your Groq API key for agent analysis
export GROQ_API_KEY=your_groq_api_key_here
# Verify installation
quickhooks version
Create a simple hook that processes user input and returns a greeting. This example demonstrates the basic structure of all QuickHooks.
from quickhooks.models import (
BaseHook, HookInput, HookOutput,
HookStatus, HookError
)
from typing import Dict, Any
class HelloHook(BaseHook):
"""A simple hook that greets users with personalized messages."""
async def run(self, input_data: HookInput) -> HookOutput:
"""Execute the hook logic with proper error handling."""
try:
# Extract data from input
name = input_data.data.get("name", "World")
message_type = input_data.data.get("type", "greeting")
# Process the data
if message_type == "greeting":
greeting = f"Hello, {name}! ๐"
elif message_type == "farewell":
greeting = f"Goodbye, {name}! ๐"
else:
greeting = f"Hi, {name}!"
# Return successful result
return HookOutput(
status=HookStatus.SUCCESS,
data={
"greeting": greeting,
"name": name,
"type": message_type,
"timestamp": input_data.timestamp.isoformat()
},
message=f"Greeting generated for {name}"
)
except Exception as e:
# Handle any errors gracefully
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="GREETING_ERROR",
message=f"Failed to generate greeting: {str(e)}",
details={"input_data": input_data.data}
)
)
Save this code as hello_hook.py and run it:
Create a hook that validates user data according to business rules. This demonstrates input validation, error handling, and structured responses.
import re
from datetime import datetime
from quickhooks.models import (
BaseHook, HookInput, HookOutput,
HookStatus, HookError
)
class DataValidatorHook(BaseHook):
"""Validates user data with comprehensive business rules."""
def __init__(self):
super().__init__()
self.email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
self.phone_pattern = re.compile(r'^\+?1?-?\.?\s?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$')
async def run(self, input_data: HookInput) -> HookOutput:
"""Validate user data according to business rules."""
user_data = input_data.data.get("user_data", {})
validation_rules = input_data.data.get("validation_rules", {})
errors = []
warnings = []
processed_data = {}
# Validate email (required field)
email = user_data.get("email", "").strip()
if not email:
errors.append("Email is required")
elif not self.email_pattern.match(email):
errors.append("Invalid email format")
else:
processed_data["email"] = email.lower().strip()
# Validate name (optional but recommended)
name = user_data.get("name", "").strip()
if name:
if len(name) < 2:
errors.append("Name must be at least 2 characters long")
elif len(name) > 100:
warnings.append("Name is very long (over 100 characters)")
else:
processed_data["name"] = name.strip().title()
else:
warnings.append("Name is recommended but not required")
# Validate phone (optional)
phone = user_data.get("phone", "").strip()
if phone:
if not self.phone_pattern.match(phone):
errors.append("Invalid phone number format (use: +1-XXX-XXX-XXXX)")
else:
processed_data["phone"] = self._normalize_phone(phone)
# Validate age (optional)
age = user_data.get("age")
if age is not None:
try:
age_int = int(age)
if age_int < 0:
errors.append("Age cannot be negative")
elif age_int > 150:
warnings.append("Age seems unrealistic (over 150)")
elif age_int < 13:
warnings.append("User may be under minimum age requirement")
processed_data["age"] = age_int
except (ValueError, TypeError):
errors.append("Age must be a valid number")
# Validate preferences (optional dict)
preferences = user_data.get("preferences", {})
if isinstance(preferences, dict):
valid_preferences = {}
for key, value in preferences.items():
if key in ["theme", "language", "timezone"]:
valid_preferences[key] = value
else:
warnings.append(f"Unknown preference: {key}")
processed_data["preferences"] = valid_preferences
# Determine overall status
if errors:
status = HookStatus.FAILED
message = f"Validation failed with {len(errors)} error(s)"
elif warnings:
status = HookStatus.SUCCESS
message = f"Validation passed with {len(warnings)} warning(s)"
else:
status = HookStatus.SUCCESS
message = "Validation completed successfully"
return HookOutput(
status=status,
data={
"is_valid": len(errors) == 0,
"processed_data": processed_data,
"validation_errors": errors,
"validation_warnings": warnings,
"validation_summary": {
"total_fields_checked": len(user_data),
"errors_count": len(errors),
"warnings_count": len(warnings)
}
},
message=message
)
def _normalize_phone(self, phone: str) -> str:
"""Normalize phone number to standard format."""
# Remove all non-digit characters
digits = re.sub(r'\D', '', phone)
# Ensure it starts with country code
if len(digits) == 10: # US number without country code
digits = '1' + digits
# Format as +1-XXX-XXX-XXXX
if len(digits) == 11 and digits.startswith('1'):
return f"+1-{digits[1:4]}-{digits[4:7]}-{digits[7:]}"
return phone # Return original if can't normalize
Process multiple data sources concurrently for improved performance. This hook demonstrates async/await patterns and concurrent execution.
import asyncio
import aiohttp
import time
from typing import Dict, Any, List
from quickhooks.models import (
BaseHook, HookInput, HookOutput, HookStatus
)
class ParallelProcessorHook(BaseHook):
"""Processes multiple data sources in parallel for maximum efficiency."""
def __init__(self):
super().__init__()
self.timeout = aiohttp.ClientTimeout(total=30)
async def run(self, input_data: HookInput) -> HookOutput:
"""Process multiple data sources concurrently."""
sources = input_data.data.get("sources", [])
if not sources:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="NO_SOURCES",
message="No data sources provided"
)
)
start_time = time.time()
try:
# Process all sources concurrently
tasks = [
self.process_source(source, i)
for i, source in enumerate(sources)
]
results = await asyncio.gather(
*tasks,
return_exceptions=True
)
# Process results
processed_results = []
successful_count = 0
failed_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"source_index": i,
"status": "error",
"error": str(result),
"source": sources[i]
})
failed_count += 1
else:
processed_results.append(result)
if result["status"] == "success":
successful_count += 1
else:
failed_count += 1
execution_time = time.time() - start_time
# Aggregate results
aggregated_data = self._aggregate_results(processed_results)
return HookOutput(
status=HookStatus.SUCCESS if failed_count == 0 else HookStatus.FAILED,
data={
"aggregated_data": aggregated_data,
"processing_summary": {
"total_sources": len(sources),
"successful": successful_count,
"failed": failed_count,
"execution_time": execution_time,
"parallel_efficiency": len(sources) / execution_time if execution_time > 0 else 0
},
"detailed_results": processed_results
},
message=f"Processed {len(sources)} sources in {execution_time:.2f}s"
)
except Exception as e:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="PARALLEL_PROCESSING_ERROR",
message=f"Failed to process sources in parallel: {str(e)}"
)
)
async def process_source(self, source: Dict[str, Any], index: int) -> Dict[str, Any]:
"""Process a single data source."""
source_type = source.get("type", "unknown")
source_url = source.get("url")
result = {
"source_index": index,
"source_type": source_type,
"source_url": source_url,
"status": "processing"
}
try:
if source_type == "api":
data = await self._fetch_api_data(source_url, source.get("headers", {}))
result["data"] = data
result["status"] = "success"
elif source_type == "file":
data = await self._read_file_data(source.get("path"))
result["data"] = data
result["status"] = "success"
elif source_type == "database":
data = await self._query_database(source.get("query"), source.get("connection"))
result["data"] = data
result["status"] = "success"
else:
result["status"] = "error"
result["error"] = f"Unsupported source type: {source_type}"
except Exception as e:
result["status"] = "error"
result["error"] = str(e)
result["processed_at"] = time.time()
return result
async def _fetch_api_data(self, url: str, headers: Dict[str, str]) -> Dict[str, Any]:
"""Fetch data from API endpoint."""
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"API request failed with status {response.status}")
async def _read_file_data(self, file_path: str) -> Dict[str, Any]:
"""Read data from file (simulated)."""
# In a real implementation, you'd read the actual file
await asyncio.sleep(0.1) # Simulate file I/O
return {"file": file_path, "content": "Sample file content"}
async def _query_database(self, query: str, connection: str) -> Dict[str, Any]:
"""Query database (simulated)."""
# In a real implementation, you'd connect to actual database
await asyncio.sleep(0.2) # Simulate database query
return {"query": query, "connection": connection, "results": []}
def _aggregate_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate results from all sources."""
aggregated = {
"total_records": 0,
"data_types": {},
"sources_by_status": {
"success": [],
"error": []
},
"combined_data": []
}
for result in results:
# Count by status
if result["status"] == "success":
aggregated["sources_by_status"]["success"].append(result["source_index"])
if "data" in result:
aggregated["combined_data"].append(result["data"])
aggregated["total_records"] += len(result.get("data", []))
else:
aggregated["sources_by_status"]["error"].append(result["source_index"])
# Count by type
source_type = result.get("source_type", "unknown")
if source_type not in aggregated["data_types"]:
aggregated["data_types"][source_type] = 0
aggregated["data_types"][source_type] += 1
return aggregated
Learn the TDD workflow with QuickHooks. Write tests first, then implement the hook to make the tests pass. This ensures your hooks are reliable and well-tested.
import pytest
from quickhooks.models import HookInput, HookOutput, HookStatus, HookError
from calculator_hook import CalculatorHook
class TestCalculatorHook:
"""Test suite for CalculatorHook using TDD approach."""
@pytest.fixture
def hook(self):
"""Create hook instance for testing."""
return CalculatorHook()
@pytest.mark.asyncio
async def test_addition_success(self, hook):
"""Test successful addition operation."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "add",
"operands": [5, 3, 2]
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.SUCCESS
assert result.data["result"] == 10
assert result.data["operation"] == "add"
assert result.data["operands"] == [5, 3, 2]
@pytest.mark.asyncio
async def test_multiplication_success(self, hook):
"""Test successful multiplication operation."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "multiply",
"operands": [4, 5]
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.SUCCESS
assert result.data["result"] == 20
@pytest.mark.asyncio
async def test_division_by_zero(self, hook):
"""Test division by zero error handling."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "divide",
"operands": [10, 0]
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.FAILED
assert result.error.code == "DIVISION_BY_ZERO"
assert "division by zero" in result.error.message.lower()
@pytest.mark.asyncio
async def test_invalid_operation(self, hook):
"""Test handling of invalid operations."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "invalid_op",
"operands": [1, 2]
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.FAILED
assert result.error.code == "INVALID_OPERATION"
@pytest.mark.asyncio
async def test_missing_operands(self, hook):
"""Test handling of missing operands."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "add"
# Missing operands
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.FAILED
assert result.error.code == "MISSING_OPERANDS"
@pytest.mark.asyncio
async def test_empty_operands_list(self, hook):
"""Test handling of empty operands list."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "add",
"operands": []
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.FAILED
assert result.error.code == "NO_OPERANDS"
@pytest.mark.asyncio
async def test_complex_calculation(self, hook):
"""Test complex calculation with multiple operations."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "complex",
"expression": "5 + 3 * 2 - 4"
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.SUCCESS
assert result.data["result"] == 7 # 5 + (3 * 2) - 4 = 7
assert result.data["expression"] == "5 + 3 * 2 - 4"
@pytest.mark.asyncio
async def test_calculation_with_context(self, hook):
"""Test calculation with additional context."""
input_data = HookInput(
event_type="calculation",
data={
"operation": "add",
"operands": [10, 5]
},
context={
"user_id": "user123",
"session_id": "session456",
"calculation_purpose": "shopping_cart_total"
}
)
result = await hook.run(input_data)
assert result.status == HookStatus.SUCCESS
assert result.data["result"] == 15
# Hook should preserve context information
assert "context" in result.data or hasattr(result, 'metadata')
from quickhooks.models import (
BaseHook, HookInput, HookOutput,
HookStatus, HookError
)
from typing import List, Union, Dict, Any
import operator
import re
class CalculatorHook(BaseHook):
"""A calculator hook that performs various mathematical operations."""
def __init__(self):
super().__init__()
self.operations = {
'add': operator.add,
'subtract': operator.sub,
'multiply': operator.mul,
'divide': operator.truediv,
'power': operator.pow,
'modulo': operator.mod
}
async def run(self, input_data: HookInput) -> HookOutput:
"""Perform mathematical calculations."""
try:
operation = input_data.data.get("operation")
if operation == "complex":
return await self._handle_complex_expression(input_data)
else:
return await self._handle_simple_operation(input_data)
except Exception as e:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="CALCULATION_ERROR",
message=f"Calculation failed: {str(e)}",
details={"input_data": input_data.data}
)
)
async def _handle_simple_operation(self, input_data: HookInput) -> HookOutput:
"""Handle simple arithmetic operations."""
operation = input_data.data.get("operation")
operands = input_data.data.get("operands", [])
# Validate inputs
if not operands:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="NO_OPERANDS",
message="No operands provided for calculation"
)
)
if operation not in self.operations:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="INVALID_OPERATION",
message=f"Invalid operation: {operation}. Valid operations: {list(self.operations.keys())}"
)
)
# Check for division by zero
if operation == "divide" and 0 in operands[1:]:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="DIVISION_BY_ZERO",
message="Division by zero is not allowed"
)
)
# Perform calculation
op_func = self.operations[operation]
result = operands[0]
for operand in operands[1:]:
result = op_func(result, operand)
return HookOutput(
status=HookStatus.SUCCESS,
data={
"operation": operation,
"operands": operands,
"result": result,
"calculation_type": "simple"
},
message=f"Successfully calculated {operation} of {operands}"
)
async def _handle_complex_expression(self, input_data: HookInput) -> HookOutput:
"""Handle complex mathematical expressions."""
expression = input_data.data.get("expression", "")
if not expression:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="NO_EXPRESSION",
message="No expression provided for complex calculation"
)
)
# Validate expression (only allow numbers and basic operators)
if not re.match(r'^[\d\s\+\-\*/\(\)\.]+$', expression):
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="INVALID_EXPRESSION",
message="Expression contains invalid characters"
)
)
try:
# Safely evaluate the expression
result = eval(expression) # Note: In production, use a safer expression parser
return HookOutput(
status=HookStatus.SUCCESS,
data={
"expression": expression,
"result": result,
"calculation_type": "complex"
},
message=f"Successfully evaluated expression: {expression}"
)
except ZeroDivisionError:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="DIVISION_BY_ZERO",
message="Expression results in division by zero"
)
except Exception as e:
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="EXPRESSION_ERROR",
message=f"Failed to evaluate expression: {str(e)}"
)
)
Execute the test suite to verify your implementation:
Use QuickHooks' AI-powered agent analysis to get intelligent recommendations for your prompts. The system analyzes your prompts and suggests the best Claude Code agents.
import asyncio
from quickhooks.agent_analysis.analyzer import AgentAnalyzer
from quickhooks.agent_analysis.types import AgentAnalysisRequest
async def demonstrate_agent_analysis():
"""Demonstrate agent analysis capabilities."""
# Initialize the analyzer with your Groq API key
analyzer = AgentAnalyzer(
groq_api_key="your_groq_api_key_here",
model_name="llama-3.3-70b-versatile",
enable_agent_discovery=True # Enable local agent discovery
)
# Example 1: Basic prompt analysis
print("=== Basic Prompt Analysis ===")
request1 = AgentAnalysisRequest(
prompt="Write a Python function that processes CSV files and validates data structure",
confidence_threshold=0.7
)
response1 = await analyzer.analyze_prompt(request1)
print(f"Analysis Summary: {response1.analysis_summary}")
print(f"Recommended Agents: {[agent.agent_type for agent in response1.recommended_agents]}")
print(f"Modified Prompt: {response1.claude_code_prompt_modification}")
print()
# Example 2: Analysis with context
print("=== Analysis with Context ===")
request2 = AgentAnalysisRequest(
prompt="Debug authentication issues in Flask application",
context="Working on a Flask web application with JWT authentication, getting 401 errors",
confidence_threshold=0.8
)
response2 = await analyzer.analyze_prompt(request2)
print(f"Discovered Local Agents: {len(response2.discovered_agents)}")
for agent in response2.discovered_agents:
print(f" - {agent.name}: {agent.description} (similarity: {agent.similarity_score:.2f})")
print()
# Example 3: Complex multi-domain prompt
print("=== Complex Multi-domain Analysis ===")
request3 = AgentAnalysisRequest(
prompt="Create a complete e-commerce checkout system with payment processing, inventory management, and email notifications",
context="Building a Python web application using FastAPI with PostgreSQL database",
confidence_threshold=0.6
)
response3 = await analyzer.analyze_prompt(request3)
print(f"Multiple Agents Recommended: {response3.multiple_agents_recommended}")
print(f"Total Tokens Used: {response3.total_tokens_used}")
for agent in response3.recommended_agents:
print(f" - {agent.agent_type}: confidence {agent.confidence:.2f}, priority {agent.priority}")
print(f" Reasoning: {agent.reasoning}")
# Run the demonstration
if __name__ == "__main__":
asyncio.run(demonstrate_agent_analysis())
Use the QuickHooks CLI to analyze prompts directly from your terminal. This is perfect for quick analysis during development.
Create extensible plugins that add new capabilities to QuickHooks. This example shows how to build a plugin system for analytics and monitoring.
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import time
import json
import asyncio
class BasePlugin(ABC):
"""Base class for all QuickHooks plugins."""
def __init__(self):
self.name = self.__class__.__name__
self.version = "1.0.0"
self.enabled = True
@abstractmethod
async def initialize(self, config: Dict[str, Any]) -> None:
"""Initialize the plugin with configuration."""
pass
@abstractmethod
async def cleanup(self) -> None:
"""Cleanup plugin resources."""
pass
class AnalyticsPlugin(BasePlugin):
"""Plugin for collecting and analyzing hook execution metrics."""
def __init__(self):
super().__init__()
self.name = "analytics"
self.metrics_store = {}
self.aggregation_interval = 60 # seconds
self._aggregation_task = None
async def initialize(self, config: Dict[str, Any]) -> None:
"""Initialize analytics plugin."""
self.metrics_store = {
"hook_executions": [],
"performance_metrics": {},
"error_rates": {},
"usage_patterns": {}
}
# Start background aggregation task
self._aggregation_task = asyncio.create_task(self._aggregate_metrics())
async def cleanup(self) -> None:
"""Cleanup analytics plugin."""
if self._aggregation_task:
self._aggregation_task.cancel()
try:
await self._aggregation_task
except asyncio.CancelledError:
pass
async def on_hook_execution_start(self, hook_id: str, input_data: Dict[str, Any]) -> None:
"""Called when hook execution starts."""
execution_record = {
"hook_id": hook_id,
"start_time": time.time(),
"input_size": len(json.dumps(input_data, default=str)),
"event_type": input_data.get("event_type", "unknown")
}
# Store in temporary execution tracking
if "active_executions" not in self.metrics_store:
self.metrics_store["active_executions"] = {}
self.metrics_store["active_executions"][hook_id] = execution_record
async def on_hook_execution_complete(self, hook_id: str, output_data: Dict[str, Any]) -> None:
"""Called when hook execution completes."""
if "active_executions" not in self.metrics_store or hook_id not in self.metrics_store["active_executions"]:
return
execution_record = self.metrics_store["active_executions"][hook_id]
execution_time = time.time() - execution_record["start_time"]
# Create completed execution record
completed_record = {
"hook_id": hook_id,
"execution_time": execution_time,
"input_size": execution_record["input_size"],
"output_size": len(json.dumps(output_data, default=str)),
"status": output_data.get("status", "unknown"),
"event_type": execution_record["event_type"],
"timestamp": datetime.now().isoformat(),
"success": output_data.get("status") == "success"
}
# Store in metrics
self.metrics_store["hook_executions"].append(completed_record)
# Update performance metrics
if hook_id not in self.metrics_store["performance_metrics"]:
self.metrics_store["performance_metrics"][hook_id] = {
"total_executions": 0,
"total_time": 0,
"min_time": float('inf'),
"max_time": 0,
"success_count": 0,
"error_count": 0
}
metrics = self.metrics_store["performance_metrics"][hook_id]
metrics["total_executions"] += 1
metrics["total_time"] += execution_time
metrics["min_time"] = min(metrics["min_time"], execution_time)
metrics["max_time"] = max(metrics["max_time"], execution_time)
if completed_record["success"]:
metrics["success_count"] += 1
else:
metrics["error_count"] += 1
# Clean up active executions
del self.metrics_store["active_executions"][hook_id]
async def get_analytics_report(self, time_range: str = "1h") -> Dict[str, Any]:
"""Generate comprehensive analytics report."""
now = datetime.now()
if time_range == "1h":
cutoff = now - timedelta(hours=1)
elif time_range == "24h":
cutoff = now - timedelta(days=1)
elif time_range == "7d":
cutoff = now - timedelta(days=7)
else:
cutoff = now - timedelta(hours=1) # Default to 1 hour
# Filter executions by time range
recent_executions = [
exec for exec in self.metrics_store["hook_executions"]
if datetime.fromisoformat(exec["timestamp"]) >= cutoff
]
if not recent_executions:
return {"message": "No data available for specified time range"}
# Calculate metrics
total_executions = len(recent_executions)
successful_executions = sum(1 for exec in recent_executions if exec["success"])
success_rate = (successful_executions / total_executions) * 100
avg_execution_time = sum(exec["execution_time"] for exec in recent_executions) / total_executions
# Top performing hooks
hook_performance = {}
for exec in recent_executions:
hook_id = exec["hook_id"]
if hook_id not in hook_performance:
hook_performance[hook_id] = {
"executions": 0,
"total_time": 0,
"successes": 0
}
hook_performance[hook_id]["executions"] += 1
hook_performance[hook_id]["total_time"] += exec["execution_time"]
if exec["success"]:
hook_performance[hook_id]["successes"] += 1
top_hooks = sorted(
[(hook_id,
metrics["executions"],
metrics["total_time"] / metrics["executions"],
(metrics["successes"] / metrics["executions"]) * 100)
for hook_id, metrics in hook_performance.items()],
key=lambda x: x[1],
reverse=True
)[:10]
return {
"time_range": time_range,
"summary": {
"total_executions": total_executions,
"successful_executions": successful_executions,
"success_rate": round(success_rate, 2),
"average_execution_time": round(avg_execution_time, 3)
},
"top_hooks": [
{
"hook_id": hook_id,
"executions": executions,
"avg_time": round(avg_time, 3),
"success_rate": round(success_rate, 2)
}
for hook_id, executions, avg_time, success_rate in top_hooks
],
"performance_trends": self._calculate_performance_trends(recent_executions),
"error_analysis": self._analyze_errors(recent_executions)
}
async def _aggregate_metrics(self) -> None:
"""Background task to aggregate metrics periodically."""
while True:
try:
await asyncio.sleep(self.aggregation_interval)
# Perform aggregation
# This would typically save to a database or external system
print(f"[{datetime.now()}] Aggregated {len(self.metrics_store['hook_executions'])} hook executions")
except asyncio.CancelledError:
break
except Exception as e:
print(f"Error in metrics aggregation: {e}")
def _calculate_performance_trends(self, executions: List[Dict]) -> Dict[str, Any]:
"""Calculate performance trends from execution data."""
if len(executions) < 2:
return {"trend": "insufficient_data"}
# Group by hour
hourly_data = {}
for exec in executions:
hour = datetime.fromisoformat(exec["timestamp"]).strftime("%Y-%m-%d %H:00")
if hour not in hourly_data:
hourly_data[hour] = {
"executions": 0,
"total_time": 0,
"successes": 0
}
hourly_data[hour]["executions"] += 1
hourly_data[hour]["total_time"] += exec["execution_time"]
if exec["success"]:
hourly_data[hour]["successes"] += 1
# Calculate trends
hours = sorted(hourly_data.keys())
if len(hours) < 2:
return {"trend": "insufficient_data"}
recent_hour = hours[-1]
previous_hour = hours[-2]
recent_metrics = hourly_data[recent_hour]
previous_metrics = hourly_data[previous_hour]
execution_trend = ((recent_metrics["executions"] - previous_metrics["executions"]) /
max(previous_metrics["executions"], 1)) * 100
return {
"trend": "increasing" if execution_trend > 0 else "decreasing",
"execution_trend_percent": round(execution_trend, 2),
"hourly_breakdown": hourly_data
}
def _analyze_errors(self, executions: List[Dict]) -> Dict[str, Any]:
"""Analyze error patterns."""
error_executions = [exec for exec in executions if not exec["success"]]
if not error_executions:
return {"message": "No errors in specified time range"}
# Group errors by hook_id
errors_by_hook = {}
for exec in error_executions:
hook_id = exec["hook_id"]
if hook_id not in errors_by_hook:
errors_by_hook[hook_id] = 0
errors_by_hook[hook_id] += 1
# Sort by error count
top_error_hooks = sorted(errors_by_hook.items(), key=lambda x: x[1], reverse=True)[:5]
return {
"total_errors": len(error_executions),
"error_rate": round((len(error_executions) / len(executions)) * 100, 2),
"top_error_hooks": [
{"hook_id": hook_id, "error_count": count}
for hook_id, count in top_error_hooks
]
}
# Plugin Registry
class PluginRegistry:
"""Registry for managing plugins."""
def __init__(self):
self.plugins: Dict[str, BasePlugin] = {}
def register_plugin(self, plugin: BasePlugin) -> None:
"""Register a plugin."""
self.plugins[plugin.name] = plugin
def get_plugin(self, name: str) -> Optional[BasePlugin]:
"""Get a plugin by name."""
return self.plugins.get(name)
def list_plugins(self) -> List[str]:
"""List all registered plugins."""
return list(self.plugins.keys())
# Global plugin registry instance
plugin_registry = PluginRegistry()
# Hook decorator for plugin integration
def with_plugins(plugin_names: List[str]):
"""Decorator to add plugin functionality to hooks."""
def decorator(hook_class):
original_run = hook_class.run
async def run_with_plugins(self, input_data):
# Get plugins
plugins = []
for plugin_name in plugin_names:
plugin = plugin_registry.get_plugin(plugin_name)
if plugin and plugin.enabled:
plugins.append(plugin)
# Notify plugins of execution start
for plugin in plugins:
if hasattr(plugin, 'on_hook_execution_start'):
await plugin.on_hook_execution_start(self.__class__.__name__, input_data.dict())
# Execute original hook
result = await original_run(self, input_data)
# Notify plugins of execution complete
for plugin in plugins:
if hasattr(plugin, 'on_hook_execution_complete'):
await plugin.on_hook_execution_complete(self.__class__.__name__, result.dict())
return result
hook_class.run = run_with_plugins
return hook_class
return decorator
from quickhooks.models import BaseHook, HookInput, HookOutput, HookStatus
from plugins.analytics_plugin import with_plugins
import time
import random
@with_plugins(["analytics"])
class MonitoredDataProcessor(BaseHook):
"""Hook with automatic analytics monitoring."""
async def run(self, input_data: HookInput) -> HookOutput:
"""Process data with automatic monitoring."""
# Simulate some processing work
processing_time = random.uniform(0.1, 2.0) # Random processing time
await asyncio.sleep(processing_time)
# Simulate occasional errors
if random.random() < 0.1: # 10% error rate
return HookOutput(
status=HookStatus.FAILED,
error=HookError(
code="SIMULATED_ERROR",
message="This is a simulated error for testing"
)
)
# Successful processing
return HookOutput(
status=HookStatus.SUCCESS,
data={
"processed_items": random.randint(1, 100),
"processing_time": processing_time
},
message="Data processed successfully"
)
Instant feedback with automatic file watching and server restart during development.
Intelligent agent recommendations using Groq and Pydantic AI for optimal task assignment.
High-performance async/await patterns for concurrent execution and scalability.
Built-in testing framework with parallel execution, multiple report formats, and TDD support.
Full Pydantic models and type annotations for reliable, self-documenting code.
Extensible architecture with custom plugins for analytics, monitoring, and more.