diff --git a/.gitignore b/.gitignore
index 7181da4..9e48e47 100644
--- a/.gitignore
+++ b/.gitignore
@@ -191,6 +191,9 @@ todo*
 # local blueprints dir
 /blueprints/
 
+# local jinja2 filters dir
+/j2_filters/
+
 # ruff
 .ruff*/
 
diff --git a/docs/api_reference.md b/docs/api_reference.md
index 62bb69f..35cb1f7 100644
--- a/docs/api_reference.md
+++ b/docs/api_reference.md
@@ -12,6 +12,7 @@
 - [Built-in Tasks](#built-in-tasks)
 - [Built-in Filters](#built-in-filters)
 - [Built-in Processors](#built-in-processors)
+- [Logging](#logging)
 
 ## Overview
 
@@ -74,6 +75,7 @@ def __init__(
 | `workflows_catalog` | `FileCatalog` | Registry of workflow files |
 | `filters_catalog` | `CallableCatalog` | Registry of inventory filters |
 | `blueprints_catalog` | `FileCatalog` | Registry of blueprint files |
+| `j2_filters_catalog` | `CallableCatalog` | Registry of available Jinja2 filters |
 | `workflow` | `WorkflowModel \| None` | Current workflow model or None |
 | `workflow_path` | `Path \| None` | Path to workflow file if loaded from file |
 | `processors` | `list` | List of processor instances |
@@ -201,6 +203,7 @@ Load settings from a YAML file with automatic resolution and overrides. This cal
 | `local_filters` | `list[str]` | Directories containing custom filters |
 | `local_hooks` | `list[str]` | Directories containing custom hooks |
 | `local_blueprints` | `list[str]` | Directories containing blueprint files |
+| `local_j2_filters` | `list[str]` | Directories containing custom Jinja2 filters |
 | `processors` | `list[dict[str, Any]]` | Nornir processor configurations |
 | `vars_dir` | `str` | Directory for variable files |
 | `failure_strategy` | `FailureStrategy` | Task failure handling strategy |
@@ -561,6 +564,60 @@ Internally used processor that manages hook execution for tasks.
 
 Internally used processor that handles variable resolution and template rendering.
 
+## Logging
+
+NornFlow provides a centralized logging system for capturing execution details, debugging, and auditing.
+
+### NornFlowLogger Class
+
+Singleton logger class providing centralized logging for NornFlow.
+
+```python
+from nornflow.logger import logger
+
+logger.info("This is an info message")
+logger.debug("Debug details")
+logger.error("An error occurred")
+```
+
+### Methods
+
+| Method | Description |
+|--------|-------------|
+| `debug(message, *args, **kwargs)` | Log debug message |
+| `info(message, *args, **kwargs)` | Log info message |
+| `warning(message, *args, **kwargs)` | Log warning message |
+| `error(message, *args, **kwargs)` | Log error message |
+| `critical(message, *args, **kwargs)` | Log critical message |
+| `exception(message, *args, **kwargs)` | Log exception with traceback |
+| `set_execution_context(...)` | Configure logging for a workflow execution |
+| `clear_execution_context()` | Stop file logging and clear context |
+| `get_execution_context()` | Get current execution context |
+
+### Execution Context
+
+The logger automatically creates timestamped log files when a workflow starts:
+
+```python
+logger.set_execution_context(
+    execution_name="my_workflow",
+    execution_type="workflow",
+    log_dir=".nornflow/logs",
+    log_level="INFO"
+)
+```
+
+### Sensitive Data Sanitization
+
+NornFlow Logger attempts to saniteze log messages to hid sensitive data. This is a best effort endeavour, and will only work for data explicitly related to what is considered a protected keyword.  
+> Note: `PROTECTED_KEYWORDS` can be found in [constants.py](../nornflow/constants.py)
+```python
+from nornflow.logger import sanitize_log_message
+
+# Values after keywords like 'password', 'secret', 'token' are redacted
+sanitize_log_message("password=secret123")  # "password=***REDACTED***"
+```
+
 <div align="center">
   
 ## Navigation
diff --git a/docs/blueprints_guide.md b/docs/blueprints_guide.md
index b1cd902..cab9708 100644
--- a/docs/blueprints_guide.md
+++ b/docs/blueprints_guide.md
@@ -122,7 +122,34 @@ tasks:
     if: "{{ 'interfaces' | is_set }}"
 ```
 
-**Important:** Blueprints contain ONLY the tasks key. No `workflow`, `name`, `description`, etc.
+**Important:** Blueprints contain ONLY the `tasks` key and an optional `description` key. No other fields are permitted. Attempting to include additional fields will result in a validation error when the blueprint is loaded during workflow execution.
+
+#### Optional Description Field
+
+Blueprints may optionally include a `description` field at the root level. This field is purely informational and is used for display purposes in the blueprints catalog (e.g., via `nornflow show -b`). If no description is provided, it defaults to "No description available".
+
+```yaml
+# blueprints/network_validation.yaml
+description: "Validates network interfaces and routing"
+tasks:
+  - name: netmiko_send_command
+    args:
+      command_string: "show ip interface brief"
+    set_to: interfaces
+```
+
+**Example output from `nornflow show -b` *(see 'Description' column)* :** 
+
+```nornflow show -b
+
+
+                                       BLUEPRINTS CATALOG                                         
+╭─────────────────────────┬──────────────────────────────────────────┬──────────────────────────────────────╮
+│     Blueprint Name      │ Description                              │ Source (file path)                   │
+├─────────────────────────┼──────────────────────────────────────────┼──────────────────────────────────────┤
+│ network_validation.yaml │ Validates network interfaces and routing │ ./blueprints/network_validation.yaml │
+╰─────────────────────────┴──────────────────────────────────────────┴──────────────────────────────────────╯
+```
 
 ### Blueprint Discovery
 
@@ -158,7 +185,7 @@ my_project/
 
 ### Blueprint Catalog
 
-All discovered blueprints are cataloged by filename (without extension):
+All discovered blueprints are cataloged by their full filename (including extension):
 
 ```bash
 # View discovered blueprints
@@ -166,19 +193,19 @@ nornflow show --blueprints
 ```
 
 **Catalog naming:**
-- `blueprints/validation.yaml` → `validation`
-- `blueprints/backup/full_backup.yaml` → `full_backup`
-- `blueprints/security/compliance_checks.yaml` → `compliance_checks`
+- `blueprints/validation.yaml` → `validation.yaml`
+- `blueprints/backup/full_backup.yaml` → `full_backup.yaml`
+- `blueprints/security/compliance_checks.yaml` → `compliance_checks.yaml`
 
 **Name conflicts:** If multiple blueprints have the same filename, the last discovered one wins. Use unique names.
 
-> **NOTE:** *We understand this is somehow restricting, but a decision was made to keep things simle here, as it shouldn't be too hard to prevent clashes by using different file names. Future releases of NornFlow, may revist this decision and allow blueprints to be ID in the catalogue with a fully qualified name.*
+> **NOTE:** *We understand this is somehow restricting, but a decision was made to keep things simple here, as it shouldn't be too hard to prevent clashes by using different file names. Future releases of NornFlow, may revisit this decision and allow blueprints to be ID in the catalogue with a fully qualified name.*
 
 ## Using Blueprints in Workflows
 
 ### Basic Blueprint Reference
 
-Reference blueprints by name from the catalog:
+Reference blueprints by their full filename from the catalog:
 
 ```yaml
 workflow:
diff --git a/docs/core_concepts.md b/docs/core_concepts.md
index f1ad9c3..1f8bb96 100644
--- a/docs/core_concepts.md
+++ b/docs/core_concepts.md
@@ -14,6 +14,7 @@
   - [Workflow Catalog](#workflow-catalog)
   - [Filter Catalog](#filter-catalog)
   - [Blueprint Catalog](#blueprint-catalog)
+  - [Jinja2 Filters Catalog](#jinja2-filters-catalog)
   - [Catalog Discovery](#catalog-discovery)
 - [Domains](#domains)
   - [What is a Domain?](#what-is-a-domain)
@@ -37,6 +38,9 @@
   - [Processor Precedence](#processor-precedence)
 - [Execution Model](#execution-model)
 - [Failure Strategies (Summary)](#failure-strategies-summary)
+- [Logging](#logging)
+  - [Log Files](#log-files)
+  - [Log Levels](#log-levels)
 - [Best Practices](#best-practices)
 
 ## Introduction
@@ -202,6 +206,8 @@ my_project/
 │   └── site_filters.py
 ├── hooks/                  # Custom hooks
 │   └── custom_hook.py
+├── j2_filters/             # Custom Jinja2 filters
+│   └── my_filters.py
 └── vars/                   # Variable files
     ├── defaults.yaml       # Global variables
     ├── backup/             # Domain variables
@@ -291,6 +297,10 @@ local_tasks:
 Tasks must follow Nornir's task signature to be discovered and used in NornFlow.  
 Your task function **must be properly type-annotated**, and the return type should be one of the following Nornir result types: `Result`, `AggregateResult`, or `MultiResult`.
 
+**Discovery rules:**
+- Only callable functions (not starting with '_') are registered as tasks
+- Functions must have proper type annotations for automatic discovery
+
 **Example:**
 ```python
 from nornir.core.task import Task, Result
@@ -329,8 +339,14 @@ local_filters:
 
 Filter functions must accept a `Host` object as the first parameter and return a boolean. Type annotations are required for discovery - the first parameter must be typed as `Host` and the return type must be `bool`:
 
+**Discovery rules:**
+- Only callable functions (not starting with '_') are registered as filters
+- Functions must have proper type annotations for automatic discovery
+
+**Example custom filter:**
+
 ```python
-from nornir.core.inventory import Host  # Import required
+from nornir.core.inventory import Host
 
 def site_filter(host: Host, region: str) -> bool:
     """Filter hosts by region."""
@@ -350,13 +366,53 @@ local_blueprints:
 
 All files with `.yaml` or `.yml` extensions in these directories (including subdirectories) are considered blueprints.
 
+### Jinja2 Filters Catalog
+
+The Jinja2 filters catalog contains all available Jinja2 filters that can be used in templates throughout NornFlow. Filters are discovered from:
+
+1. **Built-in filters** - NornFlow's custom filters and Python wrapper filters (always available)
+2. **Local directories** - Specified in `local_j2_filters` setting
+
+```yaml
+# nornflow.yaml
+local_j2_filters:
+  - "j2_filters"
+  - "/shared/custom_filters"
+```
+
+Custom Jinja2 filters are Python functions that transform values in templates. All callable functions in `.py` files within configured directories are registered as filters:
+
+**Discovery rules:**
+- Only callable functions (not starting with '_') are registered as filters
+
+**Example:**
+```python
+# j2_filters/my_filters.py
+
+def add_prefix(value: str, prefix: str = "NF_") -> str:
+    """Add a prefix to a string value."""
+    return f"{prefix}{value}"
+```
+
+Use in templates:
+```yaml
+tasks:
+  - name: echo
+    args:
+      msg: "{{ hostname | add_prefix('DEVICE_') }}"
+```
+
+View with: `nornflow show --j2-filters`
+
+> **Note:** The Jinja2 Filters catalog displays only NornFlow's built-in filters and custom filters from your `j2_filters` directories. It does not list Jinja2's native filters (like `upper`, `lower`, `join`, etc.).
+
 ### Catalog Discovery
 
 NornFlow performs recursive searches in all configured directories:
 
 - **Automatic discovery** happens during NornFlow initialization
 - **Name conflicts** - NornFlow prevents custom or imported tasks/filters to override built-in ones. However later custom or imported discoveries will override earlier ones. 
-- **View catalogs** - Use `nornflow show --catalogs` to see all discovered items, or specific `--tasks`, `--filters`, `--workflows`, and `--blueprints` options.
+- **View catalogs** - Use `nornflow show --catalogs` to see all discovered items, or specific `--tasks`, `--filters`, `--workflows`, `--blueprints`, and `--j2-filters` options.
 
 **Discovery order:**
 1. Built-in items are loaded first
@@ -407,7 +463,7 @@ Domain resolution:
 Blueprints are reusable collections of tasks that can be referenced within workflows. They enable code reuse, modularity, and maintainability by defining common task sequences once and using them across multiple workflows.
 
 **Key characteristics:**
-- Contain **only** a tasks list (no workflow metadata)
+- Contain **only** a MANDATORY `tasks` key and an OPTIONAL `description` key (no other workflow metadata)
 - Referenced by name or path in workflows
 - Support nesting (blueprints can reference other blueprints)
 - Expanded during workflow loading (assembly-time)
@@ -423,7 +479,9 @@ tasks:
   - name: netmiko_send_command
     args:
       command_string: "show interfaces status"
+```
 
+```yaml
 # workflows/deploy.yaml
 workflow:
   name: "Deploy Configuration"
@@ -792,6 +850,38 @@ NornFlow supports three failure handling strategies:
 
 See the full Failure Strategies guide for details.
 
+## Logging
+
+NornFlow provides centralized logging that captures detailed execution information for debugging and auditing purposes.  
+As users run workflows or tasks, nornflow automatically creates a .log file under the folder structure determined by the `logger.directory` setting.
+
+### Log Files
+
+- **Location**: Configured via `logger.directory` setting (default: `.nornflow/logs`)
+- **Naming**: Files are timestamped with the workflow/task name (e.g., `my_workflow_20260115_143022.log`)
+- **Format**: Each log entry includes timestamp, log level, logger name, and message
+
+### Log Levels
+
+Configure verbosity via `logger.level` (defaut: `INFO`) in `nornflow.yaml`:
+
+| Level | Description |
+|-------|-------------|
+| `DEBUG` | Detailed diagnostic information including variable resolution, template compilation |
+| `INFO` | General execution flow, task start/completion, workflow progress |
+| `WARNING` | Potential issues that don't stop execution |
+| `ERROR` | Errors that may affect results (also printed to console) |
+| `CRITICAL` | Severe errors that may halt execution |
+
+### Configuration
+
+```yaml
+# nornflow.yaml
+logger:
+  directory: ".nornflow/logs"
+  level: "INFO"
+```
+
 ## Best Practices
 
 1. **Structure workflows by domain**
diff --git a/docs/hooks_guide.md b/docs/hooks_guide.md
index 28810c4..0c43a19 100644
--- a/docs/hooks_guide.md
+++ b/docs/hooks_guide.md
@@ -361,7 +361,7 @@ If you configure custom processors via the `processors` setting (either globally
 In summary: 
 - **Signal mechanism**: The hook doesn't implement suppression directly - it signals to compatible processors
 - **Data preservation**: Result objects remain intact regardless of suppression
-- **Warning on incompatibility**: Shows warning if no compatible processor is found
+- **Warning on incompatibility**: Logs warningmessage if no compatible processor is found
 
 **Configuring Custom Processors:**
 
diff --git a/docs/jinja2_filters.md b/docs/jinja2_filters.md
index 8ad07ca..80f1b80 100644
--- a/docs/jinja2_filters.md
+++ b/docs/jinja2_filters.md
@@ -11,6 +11,7 @@
 - [NornFlow Python Wrapper Filters](#nornflow-python-wrapper-filters)
 - [Filter Chaining](#filter-chaining)
 - [Common Patterns](#common-patterns)
+- [Creating Custom Filters](#creating-custom-filters)
 
 ## Introduction
 
@@ -294,6 +295,113 @@ tasks:
       # Result: [100, 200]
 ```
 
+## Creating Custom Filters
+
+NornFlow allows you to define your own custom Jinja2 filters that can be used throughout your workflows, blueprints, and task arguments.
+
+### Configuration
+
+Custom filters are discovered from directories specified by `local_j2_filters` in your `nornflow.yaml`:
+
+```yaml
+# nornflow.yaml
+local_j2_filters:
+  - "j2_filters"
+  - "/opt/company/shared_filters"
+```
+
+### Writing Custom Filters
+
+Each filter is a Python function that takes at least one argument (the value being filtered) and returns the transformed value. Place your filter functions in `.py` files within the configured directories:
+
+```python
+# j2_filters/my_filters.py
+
+def join_with_separator(items: list, separator: str = "***") -> str:
+    """Joins list elements with a custom separator.
+    
+    Args:
+        items: List of items to join.
+        separator: The separator string (default: "***").
+    
+    Returns:
+        Joined string with separator between elements.
+    """
+    return separator.join(str(item) for item in items)
+
+
+def add_prefix(value: str, prefix: str = "NF_") -> str:
+    """Add a prefix to a string value.
+    
+    Args:
+        value: The string to prefix.
+        prefix: The prefix to add (default: "NF_").
+    
+    Returns:
+        Prefixed string.
+    """
+    return f"{prefix}{value}"
+
+
+def mask_sensitive(value: str, visible_chars: int = 4) -> str:
+    """Mask a sensitive string, showing only the last N characters.
+    
+    Args:
+        value: The string to mask.
+        visible_chars: Number of characters to leave visible at the end.
+    
+    Returns:
+        Masked string with asterisks.
+    """
+    if len(value) <= visible_chars:
+        return "*" * len(value)
+    return "*" * (len(value) - visible_chars) + value[-visible_chars:]
+```
+
+### Using Custom Filters
+
+Once defined, custom filters are automatically discovered at NornFlow initialization and can be used in any Jinja2 template:
+
+```yaml
+workflow:
+  name: "Custom Filter Demo"
+  tasks:
+    - name: echo
+      args:
+        msg: "{{ [1, 2, 3] | join_with_separator }}"  # Output: "1***2***3"
+    
+    - name: echo
+      args:
+        msg: "{{ [1, 2, 3] | join_with_separator(' | ') }}"  # Output: "1 | 2 | 3"
+    
+    - name: echo
+      args:
+        msg: "{{ hostname | add_prefix('DEVICE_') }}"  # Output: "DEVICE_router1"
+    
+    - name: echo
+      args:
+        msg: "{{ api_key | mask_sensitive(4) }}"  # Output: "************abcd"
+```
+
+### Viewing Available Filters
+
+To see all registered Jinja2 filters (both built-in and custom):
+
+```bash
+nornflow show --j2-filters
+```
+
+This displays a table with filter names, descriptions (extracted from docstrings), and their source location.
+
+### Filter Discovery Rules
+
+- All `.py` files in configured directories are scanned recursively
+- All callable functions (not starting with `_`) are registered as filters
+- Filter names match the function names
+- Docstrings are used for descriptions in the catalog display
+- Custom jinja2 filters will override other filters with the same name (use with caution)
+
+
 <div align="center">
   
 ## Navigation
diff --git a/docs/nornflow_settings.md b/docs/nornflow_settings.md
index 121cb90..0039974 100644
--- a/docs/nornflow_settings.md
+++ b/docs/nornflow_settings.md
@@ -11,10 +11,12 @@
   - [`local_filters`](#local_filters)
   - [`local_hooks`](#local_hooks)
   - [`local_blueprints`](#local_blueprints)
+  - [`local_j2_filters`](#local_j2_filters)
   - [`vars_dir`](#vars_dir)
   - [`dry_run`](#dry_run)
   - [`failure_strategy`](#failure_strategy)
   - [`processors`](#processors)
+  - [`logger`](#logger)
   - [`imported_packages`](#imported_packages)
 - [NornFlow Settings vs Nornir Configs](#nornflow-settings-vs-nornir-configs)
 
@@ -80,7 +82,7 @@ This means even if you set `NORNFLOW_SETTINGS_FAILURE_STRATEGY="fail-fast"`, pas
 
 ### `local_tasks`
 
-- **Description**: List of paths to directories containing the Nornir tasks to be included in NornFlow's task catalog. The search is recursive, meaning that all subdirectories will be searched as well. Be careful with this. Both absolute and relative paths are supported.
+- **Description**: List of paths to directories containing the Nornir tasks to be included in NornFlow's task catalog. The search is recursive, meaning that all subdirectories will be searched as well. Both absolute and relative paths are supported.
 - **Type**: list[str]
 - **Default**: ["tasks"]
 - **Path Resolution**: 
@@ -95,6 +97,7 @@ This means even if you set `NORNFLOW_SETTINGS_FAILURE_STRATEGY="fail-fast"`, pas
     - "../shared_tasks"          # Relative to settings file
   ```
 - **Environment Variable**: `NORNFLOW_SETTINGS_LOCAL_TASKS`
+- **Important**: If you plan to delete any of the automatically created directories (from `nornflow init`) without creating or pointing to your own alternative source directories for this setting, you must set `local_tasks` to an empty list (`[]`) in `nornflow.yaml`. Otherwise, NornFlow will raise ResourceError exceptions during initialization and break.
 
 ### `local_workflows`
 
@@ -112,6 +115,7 @@ This means even if you set `NORNFLOW_SETTINGS_FAILURE_STRATEGY="fail-fast"`, pas
     - "/shared/workflows"
   ```
 - **Environment Variable**: `NORNFLOW_SETTINGS_LOCAL_WORKFLOWS`
+- **Important**: If you plan to delete any of the automatically created directories (from `nornflow init`) without creating or pointing to your own alternative source directories for this setting, you must set `local_workflows` to an empty list (`[]`) in `nornflow.yaml`. Otherwise, NornFlow will raise ResourceError exceptions during initialization and break.
 
 ### `local_filters`
 
@@ -129,7 +133,7 @@ This means even if you set `NORNFLOW_SETTINGS_FAILURE_STRATEGY="fail-fast"`, pas
     - "../custom_filters"
   ```
 - **Environment Variable**: `NORNFLOW_SETTINGS_LOCAL_FILTERS`
-- **Note**: For details on how these filters can be used in workflows, see the Inventory Filtering section in the Workflows documentation.
+- **Important**: If you plan to delete any of the automatically created directories (from `nornflow init`) without creating or pointing to your own alternative source directories for this setting, you must set `local_filters` to an empty list (`[]`) in `nornflow.yaml`. Otherwise, NornFlow will raise ResourceError exceptions during initialization and break.
 
 ### `local_hooks`
 
@@ -147,7 +151,7 @@ This means even if you set `NORNFLOW_SETTINGS_FAILURE_STRATEGY="fail-fast"`, pas
     - "/shared/custom_hooks"
   ```
 - **Environment Variable**: `NORNFLOW_SETTINGS_LOCAL_HOOKS`
-- **Note**: For details on creating custom hooks, see the Hooks Guide documentation.
+- **Important**: If you plan to delete any of the automatically created directories (from `nornflow init`) without creating or pointing to your own alternative source directories for this setting, you must set `local_hooks` to an empty list (`[]`) in `nornflow.yaml`. Otherwise, NornFlow will raise ResourceError exceptions during initialization and break.
 
 ### `local_blueprints`
 
@@ -166,11 +170,29 @@ This means even if you set `NORNFLOW_SETTINGS_FAILURE_STRATEGY="fail-fast"`, pas
     - "/opt/company/blueprints"
   ```
 - **Environment Variable**: `NORNFLOW_SETTINGS_LOCAL_BLUEPRINTS`
-- **Note**: Blueprints are expanded during workflow loading (assembly-time) and have access to a subset of the variable system. See the Blueprints Guide for details.
+- **Important**: If you plan to delete any of the automatically created directories (from `nornflow init`) without creating or pointing to your own alternative source directories for this setting, you must set `local_blueprints` to an empty list (`[]`) in `nornflow.yaml`. Otherwise, NornFlow will raise ResourceError exceptions during initialization and break.
+
+### `local_j2_filters`
+
+- **Description**: List of paths to directories containing custom Jinja2 filter functions. These filters extend the built-in Jinja2 filters available in NornFlow templates and can be used throughout workflows, blueprints, and task arguments. The search is recursive, meaning all subdirectories will be searched. All callable functions in Python files are registered as filters. Both absolute and relative paths are supported.
+- **Type**: list[str]
+- **Default**: ["j2_filters"]
+- **Path Resolution**: 
+  - When loaded through `NornFlowSettings.load`, relative paths resolve against the settings file directory
+  - Direct instantiation leaves relative paths untouched, so they resolve against the runtime working directory
+  - Absolute paths are used as-is
+- **Example**:
+  ```yaml
+  local_j2_filters:
+    - "j2_filters"
+    - "/opt/company/shared_filters"
+  ```
+- **Environment Variable**: `NORNFLOW_SETTINGS_LOCAL_J2_FILTERS`
+- **Important**: If you plan to delete any of the automatically created directories (from `nornflow init`) without creating or pointing to your own alternative source directories for this setting, you must set `local_j2_filters` to an empty list (`[]`) in `nornflow.yaml`. Otherwise, NornFlow will raise ResourceError exceptions during initialization and break.
 
 ### `vars_dir`
 
-- **Description**: Path to the directory containing variable files for NornFlow's variable system. This directory will store global variables (`defaults.yaml`) and domain-specific variables. Both absolute and relative paths are supported.
+- **Description**: Path to the directory containing variable files for NornFlow's variable system. This directory will store global variables (defaults.yaml) and domain-specific variables. Both absolute and relative paths are supported.
 - **Type**: `str`
 - **Default**: "vars"
 - **Path Resolution**: 
@@ -237,6 +259,31 @@ This means even if you set `NORNFLOW_SETTINGS_FAILURE_STRATEGY="fail-fast"`, pas
   3. Processors defined in this settings file
   4. `DefaultNornFlowProcessor` (if no other processors specified)
 
+### `logger`
+
+- **Description**: Configuration for NornFlow's logging system. Controls where log files are written and the logging verbosity level.
+- **Type**: `dict` with keys `directory` and `level`
+- **Default**: `{"directory": ".nornflow/logs", "level": "INFO"}`
+- **Example**:
+  ```yaml
+  logger:
+    directory: ".nornflow/logs"
+    level: "DEBUG"
+  ```
+- **Sub-keys**:
+  - `directory`: Path to the directory where log files will be written. Relative paths resolve against the project root. The directory is created automatically if it doesn't exist.
+  - `level`: Logging verbosity level. Valid values: `"DEBUG"`, `"INFO"`, `"WARNING"`, `"ERROR"`, `"CRITICAL"`
+- **Log Levels**:
+  | Level | Description |
+  |-------|-------------|
+  | `DEBUG` | Detailed diagnostic information including variable resolution, template compilation |
+  | `INFO` | General execution flow, task start/completion, workflow progress |
+  | `WARNING` | Potential issues that don't stop execution |
+  | `ERROR` | Errors that may affect results (also printed to console) |
+  | `CRITICAL` | Severe errors that may halt execution |
+- **Note**: Log files are automatically created with timestamped filenames (e.g., `my_workflow_20260115_143022.log`). Each workflow execution creates a new log file. Errors (`ERROR` level and above) are printed to stderr regardless of the log level setting.
+- **Sensitive Data Protection**: NornFlow will attempt to redact sensitive data in log messages. Values explicitly associated with keys like `password`, `secret`, `token`, `api_key`, and similar are replaced with `***REDACTED***`. However, this is merely a best effort. It is the user's responsibility to avoid logging sensitive data.
+
 ---
 > 🚨 ***NOTE: `imported_packages` is planned, but not yet supported and right now has no effect at all.***
 ### *`imported_packages`*
diff --git a/docs/quick_start.md b/docs/quick_start.md
index 47b5d75..c3b8445 100644
--- a/docs/quick_start.md
+++ b/docs/quick_start.md
@@ -47,21 +47,28 @@ This creates:
 - 📁 filters - Custom Nornir inventory filters
 - 📁 hooks - Custom hook implementations for extending task behavior
 - 📁 blueprints - Reusable task collections
+- 📁 j2_filters - Custom Jinja2 filters for templates
 - 📁 vars - Will contain Global and Domain-specific default variables
 - 📁 nornir_configs - Nornir configuration
 - 📑 nornflow.yaml - NornFlow settings
 
+> **Important:** If you delete any automatically created directories without providing alternatives, set the corresponding `local_*` setting to `[]` in `nornflow.yaml` to avoid `ResourceError` exceptions. Example: Delete `blueprints` directory → set `local_blueprints: []`.
+
 ### 2. Check What's Available
 
 ```bash
 nornflow show --catalogs
 ```
 
-You'll see four catalogs:
+You'll see five catalogs:
 - **Tasks**: Individual Nornir tasks, that represent a single automation action.
 - **Workflows**: Sequences of tasks defined in YAML files that describe operations to be executed together.
 - **Filters**: Nornir filters that allow you to select specific devices from the inventory.
 - **Blueprints**: Reusable task collections that can be referenced across workflows.
+- **Jinja2 Filters**: Custom filters for use in Jinja2 templates throughout NornFlow.
+
+> **Notes on catalogs:**
+> - The **Jinja2 Filters** catalog displays only NornFlow's built-in filters and any custom filters you define in your `j2_filters` directory. It does not list Jinja2's native filters (like `upper`, `lower`, `join`, etc.) since those are always available by default.
 
 ## Running Tasks
 
@@ -160,12 +167,17 @@ local_hooks:
   - "hooks"
 local_blueprints:
   - "blueprints"
+local_j2_filters:
+  - "j2_filters"
 imported_packages: []
 dry_run: False
 failure_strategy: "skip-failed"
 processors:
   - class: "nornflow.builtins.DefaultNornFlowProcessor"
 vars_dir: "vars"
+logger:
+  directory: ".nornflow/logs"
+  level: "INFO"
 ```
 
 ### 4. Create a network automation workflow (`workflows/backup_configs.yaml`):
@@ -325,7 +337,7 @@ nornflow run service_check --inventory-filters "filter_by_service={'service': 'b
 ## Useful Commands
 
 ```bash
-# Show available tasks, workflows, filters, and blueprints (catalog)
+# Show available tasks, workflows, filters, blueprints, and j2 filters (catalog)
 nornflow show --catalogs
 
 # Show specific catalogs
@@ -333,6 +345,7 @@ nornflow show --tasks
 nornflow show --filters
 nornflow show --workflows
 nornflow show --blueprints
+nornflow show --j2-filters
 
 # Show current NornFlow settings
 nornflow show --settings
@@ -363,4 +376,4 @@ nornflow run my_workflow.yaml --dry-run
 </tr>
 </table>
 
-</div>
\ No newline at end of file
+</div>
diff --git a/nornflow/blueprints/expander.py b/nornflow/blueprints/expander.py
index cb3f518..2a48c92 100644
--- a/nornflow/blueprints/expander.py
+++ b/nornflow/blueprints/expander.py
@@ -1,4 +1,3 @@
-import logging
 from pathlib import Path
 from typing import Any
 
@@ -6,10 +5,9 @@ from pydantic_serdes.utils import load_file_to_dict
 
 from nornflow.blueprints.resolver import BlueprintResolver
 from nornflow.exceptions import BlueprintCircularDependencyError, BlueprintError
+from nornflow.logger import logger
 from nornflow.utils import get_file_content_hash
 
-logger = logging.getLogger(__name__)
-
 
 class BlueprintExpander:
     """Handles recursive blueprint expansion with circular dependency detection.
@@ -18,13 +16,13 @@ class BlueprintExpander:
     task definitions, including nested blueprint support and validation.
     """
 
-    def __init__(self, resolver: BlueprintResolver):
+    def __init__(self, resolver: BlueprintResolver | None = None):
         """Initialize the expander with a resolver.
 
         Args:
             resolver: BlueprintResolver for template resolution and context building.
         """
-        self.resolver = resolver
+        self.resolver = resolver or BlueprintResolver()
 
     def expand_blueprints(
         self,
@@ -54,11 +52,13 @@ class BlueprintExpander:
             BlueprintError: If blueprint expansion fails.
         """
         if not vars_dir or not workflow_roots:
+            logger.debug("Skipping blueprint expansion: vars_dir or workflow_roots not provided")
             return tasks
 
         if not blueprints_catalog:
             blueprints_catalog = {}
 
+        logger.debug("Building variable context for blueprint expansion")
         context = self.resolver.build_context(
             vars_dir=vars_dir,
             workflow_path=workflow_path,
@@ -78,6 +78,7 @@ class BlueprintExpander:
             )
             expanded.extend(processed_tasks)
 
+        logger.debug(f"Blueprint expansion complete: {len(tasks)} items -> {len(expanded)} tasks")
         return expanded
 
     def _process_task_item(
@@ -106,6 +107,8 @@ class BlueprintExpander:
             return [task_dict]
 
         if not self._should_include_blueprint(task_dict, context):
+            blueprint_name = task_dict.get("blueprint", "unknown")
+            logger.debug(f"Skipping blueprint '{blueprint_name}': condition evaluated to false")
             return []
 
         return self._expand_single_blueprint(
@@ -158,6 +161,8 @@ class BlueprintExpander:
             raise BlueprintError("Blueprint reference missing 'blueprint' field")
 
         resolved_name = self.resolver.resolve_template(blueprint_name, context)
+        logger.debug(f"Expanding blueprint '{resolved_name}'")
+
         blueprint_path = self._resolve_blueprint_to_path(resolved_name, blueprints_catalog)
         content_hash = get_file_content_hash(blueprint_path)
 
@@ -168,7 +173,10 @@ class BlueprintExpander:
         name_stack.append(blueprint_path.name)
         try:
             if content_hash not in content_cache:
+                logger.debug(f"Loading blueprint from '{blueprint_path}'")
                 content_cache[content_hash] = self._load_blueprint_tasks(blueprint_path)
+            else:
+                logger.debug(f"Using cached content for blueprint '{blueprint_path.name}'")
 
             blueprint_tasks = content_cache[content_hash]
 
@@ -179,6 +187,7 @@ class BlueprintExpander:
                 )
                 expanded.extend(processed)
 
+            logger.debug(f"Blueprint '{resolved_name}' expanded to {len(expanded)} tasks")
             return expanded
         finally:
             expansion_stack.pop()
@@ -203,18 +212,21 @@ class BlueprintExpander:
             BlueprintError: If blueprint cannot be found.
         """
         if blueprint_ref in blueprints_catalog:
+            logger.debug(f"Blueprint '{blueprint_ref}' found in catalog")
             return blueprints_catalog[blueprint_ref]
 
         path = Path(blueprint_ref)
 
         if path.is_absolute() and path.exists():
+            logger.debug(f"Blueprint resolved from absolute path: {path}")
             return path
 
-        # Relative to current working directory
         resolved = Path.cwd() / path
         if resolved.exists():
+            logger.debug(f"Blueprint resolved from relative path: {resolved}")
             return resolved
 
+        logger.error(f"Blueprint '{blueprint_ref}' not found in catalog or filesystem")
         raise BlueprintError(
             (
                 "Blueprint not found in catalog or filesystem. "
@@ -238,7 +250,7 @@ class BlueprintExpander:
 
     @staticmethod
     def _load_blueprint_tasks(blueprint_path: Path) -> list[dict[str, Any]]:
-        """Load and validate blueprint structure from file.
+        """Load and validate blueprint structure from file using BlueprintModel.
 
         Args:
             blueprint_path: Path to the blueprint file.
@@ -249,33 +261,21 @@ class BlueprintExpander:
         Raises:
             BlueprintError: If blueprint structure is invalid.
         """
+        # Import inside function to avoid circular import:
+        # - Validations must be carried out by custom models;
+        # - models MUST stay in models/ inner package;
+        # - importing BlueprintModel at top here would create cycle during module loading.
+        from nornflow.models import BlueprintModel  # noqa: PLC0415
+
         try:
             blueprint_data = load_file_to_dict(blueprint_path)
+            blueprint_model = BlueprintModel.model_validate(blueprint_data, strict=True)
+            logger.debug(f"Blueprint '{blueprint_path.name}' loaded with {len(blueprint_model.tasks)} tasks")
+            return blueprint_model.tasks
         except Exception as e:
+            logger.exception(f"Failed to load blueprint '{blueprint_path.name}': {e}")
             raise BlueprintError(
-                f"Failed to load blueprint file: {e}",
+                f"Failed to load or validate blueprint file: {e}",
                 blueprint_name=str(blueprint_path.name),
-                details={"path": str(blueprint_path)},
+                details={"path": str(blueprint_path), "error": str(e)},
             ) from e
-
-        actual_keys = set(blueprint_data.keys())
-
-        if actual_keys != {"tasks"}:
-            raise BlueprintError(
-                f"Blueprint must contain ONLY 'tasks' key, found: {', '.join(sorted(actual_keys))}",
-                blueprint_name=str(blueprint_path.name),
-                details={
-                    "path": str(blueprint_path),
-                    "expected": ["tasks"],
-                    "found": sorted(actual_keys),
-                },
-            )
-
-        if not isinstance(blueprint_data["tasks"], list):
-            raise BlueprintError(
-                f"'tasks' must be a list, got {type(blueprint_data['tasks']).__name__}",
-                blueprint_name=str(blueprint_path.name),
-                details={"path": str(blueprint_path)},
-            )
-
-        return blueprint_data["tasks"]
diff --git a/nornflow/blueprints/resolver.py b/nornflow/blueprints/resolver.py
index a67c51a..d9ddc72 100644
--- a/nornflow/blueprints/resolver.py
+++ b/nornflow/blueprints/resolver.py
@@ -1,4 +1,3 @@
-import logging
 import os
 from pathlib import Path
 from typing import Any
@@ -6,10 +5,9 @@ from typing import Any
 from pydantic_serdes.utils import load_file_to_dict
 
 from nornflow.exceptions import BlueprintError
-from nornflow.vars.constants import DEFAULTS_FILENAME, JINJA2_MARKERS, TRUTHY_STRING_VALUES
-from nornflow.vars.jinja2_utils import Jinja2EnvironmentManager
-
-logger = logging.getLogger(__name__)
+from nornflow.j2 import Jinja2Service
+from nornflow.logger import logger
+from nornflow.vars.constants import DEFAULTS_FILENAME
 
 
 class BlueprintResolver:
@@ -20,13 +18,9 @@ class BlueprintResolver:
     for blueprint references and conditions.
     """
 
-    def __init__(self, jinja2_manager: Jinja2EnvironmentManager):
-        """Initialize the resolver with a Jinja2 manager.
-
-        Args:
-            jinja2_manager: Manager for Jinja2 template rendering.
-        """
-        self.jinja2_manager = jinja2_manager
+    def __init__(self):
+        """Initialize resolver with Jinja2Service."""
+        self.jinja2 = Jinja2Service()
 
     def build_context(
         self,
@@ -55,28 +49,39 @@ class BlueprintResolver:
         Returns:
             Dictionary containing merged variables with proper precedence.
         """
+        logger.debug("Building blueprint variable context")
         context = {}
 
-        context.update(self._load_env_vars())
+        env_vars = self._load_env_vars()
+        if env_vars:
+            logger.debug(f"Loaded {len(env_vars)} environment variables")
+        context.update(env_vars)
 
         vars_dir_path = Path(vars_dir)
         defaults_path = vars_dir_path / DEFAULTS_FILENAME
         if defaults_path.exists():
             try:
-                context.update(load_file_to_dict(defaults_path))
+                defaults = load_file_to_dict(defaults_path)
+                logger.debug(f"Loaded default variables from '{defaults_path}'")
+                context.update(defaults)
             except Exception as e:
-                logger.warning(f"Failed to load defaults file {defaults_path}: {e}")
+                logger.exception(f"Failed to load defaults from '{defaults_path}': {e}")
 
         if workflow_path:
-            domain_defaults = self._load_domain_defaults(vars_dir_path, workflow_path, workflow_roots)
+            domain_defaults = self._load_domain_defaults(vars_dir_path, workflow_path, workflow_roots) or {}
+            if domain_defaults:
+                logger.debug(f"Loaded {len(domain_defaults)} domain-specific variables")
             context.update(domain_defaults)
 
         if inline_workflow_vars:
+            logger.debug(f"Merging {len(inline_workflow_vars)} inline workflow variables")
             context.update(inline_workflow_vars)
 
         if cli_vars:
+            logger.debug(f"Merging {len(cli_vars)} CLI variables")
             context.update(cli_vars)
 
+        logger.debug(f"Blueprint context built with {len(context)} total variables")
         return context
 
     def resolve_template(self, template_str: str, context: dict[str, Any]) -> str:
@@ -93,8 +98,12 @@ class BlueprintResolver:
             BlueprintError: If template has undefined variables or syntax errors.
         """
         try:
-            return self.jinja2_manager.render_template(template_str, context, "blueprint reference")
+            resolved = self.jinja2.resolve_string(template_str, context, error_context="blueprint reference")
+            if template_str != resolved:
+                logger.debug(f"Resolved template '{template_str}' -> '{resolved}'")
+            return resolved
         except Exception as e:
+            logger.exception(f"Failed to resolve blueprint template: {e}")
             raise BlueprintError(
                 f"Failed to resolve blueprint template: {e}", details={"template": template_str}
             ) from e
@@ -115,19 +124,11 @@ class BlueprintResolver:
             BlueprintError: If condition has undefined variables or syntax errors.
         """
         try:
-            if isinstance(condition, bool):
-                return condition
-
-            condition_stripped = condition.strip()
-
-            if not any(marker in condition_stripped for marker in JINJA2_MARKERS):
-                return condition_stripped.lower() in TRUTHY_STRING_VALUES
-
-            template_str = condition_stripped
-
-            result = self.jinja2_manager.render_template(template_str, context, "blueprint condition")
-            return result.lower() in TRUTHY_STRING_VALUES
+            result = self.jinja2.resolve_to_bool(condition, context)
+            logger.debug(f"Evaluated condition '{condition}' -> {result}")
+            return result
         except Exception as e:
+            logger.exception(f"Failed to evaluate blueprint condition: {e}")
             raise BlueprintError(
                 f"Failed to evaluate blueprint condition: {e}", details={"condition": condition}
             ) from e
@@ -165,7 +166,9 @@ class BlueprintResolver:
 
             relative_path = workflow_path.relative_to(root_path)
             if len(relative_path.parts) > 1:
-                return relative_path.parts[0]
+                domain = relative_path.parts[0]
+                logger.debug(f"Found domain '{domain}' for workflow '{workflow_path.name}'")
+                return domain
             break
 
         return None
@@ -190,10 +193,13 @@ class BlueprintResolver:
 
         domain_defaults_path = vars_dir / domain / DEFAULTS_FILENAME
         if not domain_defaults_path.exists():
+            logger.debug(f"No domain defaults found at '{domain_defaults_path}'")
             return {}
 
         try:
-            return load_file_to_dict(domain_defaults_path)
+            loaded = load_file_to_dict(domain_defaults_path)
+            logger.debug(f"Loaded domain defaults from '{domain_defaults_path}'")
+            return loaded
         except Exception as e:
-            logger.warning(f"Failed to load domain defaults from {domain_defaults_path}: {e}")
+            logger.exception(f"Failed to load domain defaults from '{domain_defaults_path}': {e}")
             return {}
diff --git a/nornflow/builder.py b/nornflow/builder.py
index b81954d..04a8c0f 100644
--- a/nornflow/builder.py
+++ b/nornflow/builder.py
@@ -5,6 +5,7 @@ from pydantic_serdes.utils import load_file_to_dict
 
 from nornflow.constants import FailureStrategy
 from nornflow.exceptions import InitializationError, ResourceError, SettingsError
+from nornflow.logger import logger
 from nornflow.models import WorkflowModel
 from nornflow.nornflow import NornFlow
 from nornflow.settings import NornFlowSettings
@@ -63,8 +64,10 @@ class NornFlowBuilder:
         """
         Initialize the NornFlowBuilder with default values.
         """
+        # Can hold WorkflowModel (pre-built), dict (in-memory workflow),
+        # tuple (value, source_type) for str paths/names, or None
+        self._workflow: WorkflowModel | dict[str, Any] | tuple[str, str] | None = None
         self._settings: NornFlowSettings | None = None
-        self._workflow: WorkflowModel | str | None = None
         self._processors: list[dict[str, Any]] | None = None
         self._vars: dict[str, Any] | None = None
         self._filters: dict[str, Any] | None = None
@@ -114,11 +117,7 @@ class NornFlowBuilder:
         Returns:
             The builder instance for method chaining.
         """
-        try:
-            workflow_dict = load_file_to_dict(workflow_path)
-            self._workflow = WorkflowModel.create(workflow_dict)
-        except Exception as e:
-            raise InitializationError(f"Failed to load workflow from '{workflow_path}': {e}") from e
+        self._workflow = (str(workflow_path), "path")
         return self
 
     def with_workflow_dict(self, workflow_dict: dict[str, Any]) -> "NornFlowBuilder":
@@ -131,10 +130,7 @@ class NornFlowBuilder:
         Returns:
             The builder instance for method chaining.
         """
-        try:
-            self._workflow = WorkflowModel.create(workflow_dict)
-        except Exception as e:
-            raise InitializationError(f"Failed to create workflow from dict: {e}") from e
+        self._workflow = workflow_dict
         return self
 
     def with_workflow_model(self, workflow_model: WorkflowModel) -> "NornFlowBuilder":
@@ -164,7 +160,7 @@ class NornFlowBuilder:
         Returns:
             The builder instance for method chaining.
         """
-        self._workflow = workflow_name
+        self._workflow = (workflow_name, "name")
         return self
 
     def with_processors(self, processors: list[dict[str, Any]]) -> "NornFlowBuilder":
@@ -256,6 +252,69 @@ class NornFlowBuilder:
         self._kwargs.update(kwargs)
         return self
 
+    def _create_workflow_from_source(
+        self, workflow_dict: dict[str, Any], workflow_path: Path | None, nornflow: NornFlow
+    ) -> WorkflowModel:
+        """
+        Create a WorkflowModel from a workflow dictionary and path.
+
+        Args:
+            workflow_dict: The workflow dictionary.
+            workflow_path: The path to the workflow file, or None.
+            nornflow: The NornFlow instance for accessing catalogs and settings.
+
+        Returns:
+            The created WorkflowModel.
+        """
+        return WorkflowModel.create(
+            workflow_dict,
+            blueprints_catalog=nornflow.blueprints_catalog,
+            vars_dir=nornflow.settings.vars_dir,
+            workflow_path=workflow_path,
+            workflow_roots=nornflow.settings.local_workflows,
+            cli_vars=self._vars,
+        )
+
+    def _handle_workflow_path(self, workflow_path: str, nornflow: NornFlow) -> None:
+        """
+        Handle workflow configuration when _workflow is a path.
+
+        Args:
+            workflow_path: The workflow file path.
+            nornflow: The NornFlow instance to modify.
+        """
+        try:
+            workflow_dict = load_file_to_dict(workflow_path)
+            workflow = self._create_workflow_from_source(workflow_dict, Path(workflow_path), nornflow)
+            nornflow.workflow = workflow
+        except Exception as e:
+            logger.exception(f"Failed to load workflow from '{workflow_path}': {e}")
+            raise InitializationError(f"Failed to load workflow from '{workflow_path}': {e}") from e
+
+    def _handle_workflow_name(self, workflow_name: str, nornflow: NornFlow) -> None:
+        """
+        Handle workflow configuration when _workflow is a name.
+
+        Args:
+            workflow_name: The workflow name.
+            nornflow: The NornFlow instance to modify.
+        """
+        nornflow.workflow = workflow_name
+
+    def _handle_workflow_dict(self, nornflow: NornFlow) -> None:
+        """
+        Handle workflow configuration when _workflow is a dictionary.
+
+        Args:
+            nornflow: The NornFlow instance to modify.
+        """
+        try:
+            workflow = self._create_workflow_from_source(self._workflow, None, nornflow)
+            nornflow.workflow = workflow
+        except Exception as e:
+            logger.exception(f"Failed to create workflow from dict: {e}")
+            raise InitializationError(f"Failed to create workflow from dict: {e}") from e
+
     def build(self) -> NornFlow:
         """
         Build and return a NornFlow object based on the provided configurations.
@@ -273,12 +332,25 @@ class NornFlowBuilder:
         if not self._settings:
             self._settings = NornFlowSettings.load()
 
-        return NornFlow(
+        nornflow = NornFlow(
             nornflow_settings=self._settings,
-            workflow=self._workflow,
+            workflow=None,
             processors=self._processors,
             vars=self._vars,
             filters=self._filters,
             failure_strategy=self._failure_strategy,
             **self._kwargs,
         )
+
+        if isinstance(self._workflow, tuple):
+            value, source_type = self._workflow
+            if source_type == "path":
+                self._handle_workflow_path(value, nornflow)
+            elif source_type == "name":
+                self._handle_workflow_name(value, nornflow)
+        elif isinstance(self._workflow, dict):
+            self._handle_workflow_dict(nornflow)
+        elif isinstance(self._workflow, WorkflowModel):
+            nornflow.workflow = self._workflow
+
+        return nornflow
diff --git a/nornflow/builtins/hooks/if_hook.py b/nornflow/builtins/hooks/if_hook.py
index f8d43b3..ac73f6f 100644
--- a/nornflow/builtins/hooks/if_hook.py
+++ b/nornflow/builtins/hooks/if_hook.py
@@ -23,7 +23,6 @@ Example:
     - host2: missing 'some_var', fails condition → skipped, no template resolution
 """
 
-import logging
 from collections.abc import Callable
 from functools import wraps
 from typing import Any, TYPE_CHECKING
@@ -33,12 +32,11 @@ from nornir.core.task import Result, Task
 
 from nornflow.hooks import Hook, Jinja2ResolvableMixin
 from nornflow.hooks.exceptions import HookValidationError
+from nornflow.logger import logger
 
 if TYPE_CHECKING:
     from nornflow.models import TaskModel
 
-logger = logging.getLogger(__name__)
-
 
 def skip_if_condition_flagged(task_func: Callable) -> Callable:
     """Decorator that implements deferred template resolution for conditional execution.
@@ -165,7 +163,7 @@ class IfHook(Hook, Jinja2ResolvableMixin):
                 host.data["nornflow_skip_flag"] = True
 
         except Exception as e:
-            logger.error(f"Error evaluating if condition for host '{host.name}': {e}")
+            logger.exception(f"Error evaluating if condition for host '{host.name}': {e}")
             raise HookValidationError(
                 "IfHook", [("evaluation_error", f"Failed to evaluate condition: {e}")]
             ) from e
diff --git a/nornflow/builtins/hooks/set_to.py b/nornflow/builtins/hooks/set_to.py
index 1bdd4c2..77a2d16 100644
--- a/nornflow/builtins/hooks/set_to.py
+++ b/nornflow/builtins/hooks/set_to.py
@@ -1,6 +1,5 @@
 # ruff: noqa: PERF203
 
-import logging
 from typing import Any, TYPE_CHECKING
 
 from nornir.core.inventory import Host
@@ -8,12 +7,11 @@ from nornir.core.task import MultiResult, Result, Task
 
 from nornflow.hooks import Hook
 from nornflow.hooks.exceptions import HookValidationError
+from nornflow.logger import logger
 
 if TYPE_CHECKING:
     from nornflow.models import TaskModel
 
-logger = logging.getLogger(__name__)
-
 
 class SetToHook(Hook):
     """
@@ -196,12 +194,12 @@ class SetToHook(Hook):
                             f"for host '{host.name}'"
                         )
                     except Exception as e:
-                        logger.error(
+                        logger.exception(
                             f"Failed to extract '{extraction_path}' for variable '{var_name}' "
                             f"on host '{host.name}': {e}"
                         )
         except Exception as e:
-            logger.error(f"Error in set_to hook for host '{host.name}': {e}")
+            logger.exception(f"Error in set_to hook for host '{host.name}': {e}")
             raise
 
     def _extract_data_from_result(self, result: Result, extraction_path: str) -> Any:
diff --git a/nornflow/builtins/hooks/shush.py b/nornflow/builtins/hooks/shush.py
index ad5357a..ea72455 100644
--- a/nornflow/builtins/hooks/shush.py
+++ b/nornflow/builtins/hooks/shush.py
@@ -1,7 +1,8 @@
-# ruff: noqa: SLF001, T201
+# ruff: noqa: SLF001
 from nornir.core.task import AggregatedResult, Task
 
 from nornflow.hooks import Hook, Jinja2ResolvableMixin
+from nornflow.logger import logger
 
 
 class ShushHook(Hook, Jinja2ResolvableMixin):
@@ -34,7 +35,7 @@ class ShushHook(Hook, Jinja2ResolvableMixin):
         )
 
         if not has_compatible_processor:
-            print(
+            logger.warning(
                 "Warning: 'shush' hook has no effect - "
                 "no compatible processor found in chain. "
                 "Outputs are not going to be suppressed."
diff --git a/nornflow/builtins/jinja2_filters/__init__.py b/nornflow/builtins/jinja2_filters/__init__.py
index d8e5d3c..32cb666 100644
--- a/nornflow/builtins/jinja2_filters/__init__.py
+++ b/nornflow/builtins/jinja2_filters/__init__.py
@@ -4,10 +4,10 @@ from nornflow.builtins.jinja2_filters.custom_filters import CUSTOM_FILTERS
 from nornflow.builtins.jinja2_filters.py_wrapper_filters import PY_WRAPPER_FILTERS
 
 # Combine all filters into a single registry
-ALL_FILTERS = {**PY_WRAPPER_FILTERS, **CUSTOM_FILTERS}
+ALL_BUILTIN_J2_FILTERS = {**PY_WRAPPER_FILTERS, **CUSTOM_FILTERS}
 
 __all__ = [
-    "ALL_FILTERS",
+    "ALL_BUILTIN_J2_FILTERS",
     "CUSTOM_FILTERS",
     "PY_WRAPPER_FILTERS",
 ]
diff --git a/nornflow/builtins/jinja2_filters/custom_filters.py b/nornflow/builtins/jinja2_filters/custom_filters.py
index 0d7db2e..8c40150 100644
--- a/nornflow/builtins/jinja2_filters/custom_filters.py
+++ b/nornflow/builtins/jinja2_filters/custom_filters.py
@@ -8,6 +8,8 @@ import jmespath
 from jinja2 import pass_context
 from jinja2.runtime import Context, Undefined
 
+from nornflow.logger import logger
+
 
 def flatten_list(lst: list[Any]) -> list[Any]:
     """Flatten nested lists.
@@ -130,6 +132,7 @@ def _resolve_from_context(context: Context, key: str) -> tuple[bool, Any]:
             return (False, None)
         return (True, value)
     except Exception:
+        logger.exception(f"Failed to resolve key '{key}' from context")
         return (False, None)
 
 
@@ -203,6 +206,7 @@ def _nested_exists_in_obj(obj: Any, path: str) -> bool:
             try:
                 current = getattr(current, part)
             except AttributeError:
+                logger.exception(f"Failed to access attribute '{part}' on object")
                 return False
 
     return current is not None
diff --git a/nornflow/builtins/processors/decorators.py b/nornflow/builtins/processors/decorators.py
index 6f052c1..899e665 100644
--- a/nornflow/builtins/processors/decorators.py
+++ b/nornflow/builtins/processors/decorators.py
@@ -3,6 +3,8 @@ from collections.abc import Callable
 from functools import wraps
 from typing import Any, TYPE_CHECKING
 
+from nornflow.logger import logger
+
 if TYPE_CHECKING:
     pass
 
@@ -38,6 +40,7 @@ def hook_delegator(func: Callable) -> Callable:
                 try:
                     hook_method(*args, **kwargs)
                 except Exception as e:
+                    logger.exception(f"Exception in hook method '{method_name}': {e}")
                     if hasattr(hook, "exception_handlers") and hook.exception_handlers:
                         for exc_class, handler_name in hook.exception_handlers.items():
                             if isinstance(e, exc_class):
diff --git a/nornflow/builtins/processors/failure_strategy_processor.py b/nornflow/builtins/processors/failure_strategy_processor.py
index 48c5efb..f84dce8 100644
--- a/nornflow/builtins/processors/failure_strategy_processor.py
+++ b/nornflow/builtins/processors/failure_strategy_processor.py
@@ -8,6 +8,7 @@ from nornir.core.task import Result, Task
 from tabulate import tabulate
 
 from nornflow.constants import FailureStrategy
+from nornflow.logger import logger
 
 # Initialize colorama
 init(autoreset=True)
@@ -62,9 +63,11 @@ class NornFlowFailureStrategyProcessor(Processor):
         """Called after each host completes for a task."""
         if result.failed:
             self.collected_errors.append((task.name, host.name, result))
+            logger.debug(f"Collected error for task '{task.name}' on host '{host.name}'.")
 
             if self.failure_strategy == FailureStrategy.FAIL_FAST and not self.fail_fast_triggered:
                 self.fail_fast_triggered = True
+                logger.debug(f"Fail fast triggered for task '{task.name}' on host '{host.name}'.")
 
                 # Add ALL hosts to failed_hosts immediately
                 # This causes Nornir to skip them in all running threads
diff --git a/nornflow/builtins/processors/hook_processor.py b/nornflow/builtins/processors/hook_processor.py
index 8aa4cad..2a50122 100644
--- a/nornflow/builtins/processors/hook_processor.py
+++ b/nornflow/builtins/processors/hook_processor.py
@@ -4,6 +4,7 @@ from nornir.core.inventory import Host
 from nornir.core.processor import Processor
 from nornir.core.task import AggregatedResult, MultiResult, Task
 
+from nornflow.logger import logger
 from .decorators import hook_delegator
 
 if TYPE_CHECKING:
@@ -88,6 +89,7 @@ class NornFlowHookProcessor(Processor):
             value: The task-specific context containing task_model and hooks
         """
         self._task_specific_context = value
+        logger.debug(f"Set task-specific context for task with {len(value)} items.")
 
     @property
     def context(self) -> dict[str, Any]:
@@ -115,6 +117,7 @@ class NornFlowHookProcessor(Processor):
     def task_completed(self, task: Task, result: AggregatedResult) -> None:
         """Delegate to hooks' task_completed methods."""
         self.task_specific_context = {}
+        logger.debug(f"Cleared task-specific context after task '{task.name}'.")
 
     @hook_delegator
     def task_instance_started(self, task: Task, host: Host) -> None:
diff --git a/nornflow/builtins/tasks.py b/nornflow/builtins/tasks.py
index 3bfa258..45afba4 100644
--- a/nornflow/builtins/tasks.py
+++ b/nornflow/builtins/tasks.py
@@ -3,6 +3,7 @@ from pathlib import Path
 from nornir.core.task import Result, Task
 
 from nornflow.builtins.utils import build_set_task_report, get_task_vars_manager
+from nornflow.logger import logger
 
 
 def set(task: Task, print_output: bool = True, **kwargs) -> Result:
@@ -124,4 +125,5 @@ def write_file(task: Task, filename: str, content: str, append: bool = False, mk
         return Result(host=task.host, result={"path": str(file_path)}, changed=True)
 
     except Exception as e:
+        logger.exception(f"Failed to write file '{filename}': {e}")
         return Result(host=task.host, failed=True, exception=e)
diff --git a/nornflow/builtins/utils.py b/nornflow/builtins/utils.py
index b59958c..03090b2 100644
--- a/nornflow/builtins/utils.py
+++ b/nornflow/builtins/utils.py
@@ -4,6 +4,7 @@ from typing import Any
 from nornir.core.task import Task
 
 from nornflow.exceptions import ProcessorError
+from nornflow.logger import logger
 from nornflow.vars import NornFlowVariablesManager
 
 
@@ -22,6 +23,7 @@ def get_task_vars_manager(task: Task) -> NornFlowVariablesManager:
     """
     for processor in task.nornir.processors:
         if hasattr(processor, "vars_manager"):
+            logger.debug(f"Found vars_manager in processor for task '{task.name}'.")
             return processor.vars_manager
 
     raise ProcessorError(
@@ -121,4 +123,5 @@ def build_set_task_report(task: Task, kwargs: dict[str, Any]) -> str:
         value_display = format_value_for_display(resolved_value)
         report_lines.append(f"  • {var_name} = {value_display}")
 
+    logger.debug(f"Built set task report for host '{task.host.name}' with {len(kwargs)} variables.")
     return "\n".join(report_lines)
diff --git a/nornflow/catalogs.py b/nornflow/catalogs.py
index f350e0b..cce67d1 100644
--- a/nornflow/catalogs.py
+++ b/nornflow/catalogs.py
@@ -6,6 +6,7 @@ from pathlib import Path
 from typing import Any
 
 from nornflow.exceptions import CatalogError, CoreError, ResourceError
+from nornflow.logger import logger
 from nornflow.utils import import_module_from_path
 
 
@@ -50,6 +51,7 @@ class Catalog(ABC, dict[str, Any]):
             The registered value.
         """
         self.__setitem__(name, item, **kwargs)
+        logger.debug(f"Registered item '{name}' in {self.name} catalog")
         return item
 
     @property
@@ -136,8 +138,10 @@ class Catalog(ABC, dict[str, Any]):
         Raises:
             ResourceError: If directory doesn't exist.
         """
+        logger.info(f"Starting discovery of {self.name} items in directory: {dir_path}")
         path = Path(dir_path)
         if not path.is_dir():
+            logger.error(f"Directory not found for {self.name} discovery: {dir_path}")
             raise ResourceError(
                 f"Directory not found: {dir_path}. Couldn't load {self.name}.",
                 resource_type=self.name,
@@ -146,11 +150,15 @@ class Catalog(ABC, dict[str, Any]):
 
         total_items = 0
         files = self._get_files_to_process(path, **kwargs)
+        logger.debug(f"Found {len(files)} files to process in {dir_path}")
 
         for file_path in files:
             items_added = self._process_file(file_path, **kwargs)
             total_items += items_added
 
+        logger.info(
+            f"Completed {self.name} discovery: {total_items} items registered from {len(files)} files"
+        )
         return total_items
 
     @abstractmethod
@@ -211,13 +219,16 @@ class CallableCatalog(Catalog):
         is_builtin = bool(module_name and module_name.startswith("nornflow.builtins"))
 
         if name in self and self.sources.get(name, {}).get("is_builtin", False):
+            logger.warning(f"Attempted to override built-in '{name}' in {self.name} catalog")
             raise CatalogError(
                 f"Cannot override built-in '{name}' with a custom implementation", catalog_name=self.name
             )
 
-        return super().register(
+        result = super().register(
             name, item, module_path=module_path, module_name=module_name, is_builtin=is_builtin, **kwargs
         )
+        logger.debug(f"Registered callable '{name}' from module '{module_name}' in {self.name} catalog")
+        return result
 
     def register_from_module(
         self,
@@ -252,6 +263,7 @@ class CallableCatalog(Catalog):
             self.register(name, obj, module_path=module_path, module_name=module_name)
             count += 1
 
+        logger.debug(f"Registered {count} items from module '{module_name}'")
         return count
 
     def _get_files_to_process(self, dir_path: Path, **kwargs) -> list[Path]:
@@ -286,8 +298,11 @@ class CallableCatalog(Catalog):
 
         try:
             module = import_module_from_path(module_name, module_path)
-            return self.register_from_module(module, predicate, transform_item)
+            count = self.register_from_module(module, predicate, transform_item)
+            logger.debug(f"Processed file '{file_path}': {count} items registered")
+            return count
         except Exception as e:
+            logger.exception(f"Failed to process file '{file_path}': {e}")
             raise CoreError(
                 f"Failed to import module '{module_name}' from '{module_path}': {e!s}",
                 component="ItemDiscovery",
@@ -376,6 +391,7 @@ class FileCatalog(Catalog):
 
         if file_path.is_file() and predicate and predicate(file_path):
             self.register(name=file_path.name, item=file_path, file_path=str(file_path))
+            logger.debug(f"Registered file '{file_path}' in {self.name} catalog")
             return 1
         return 0
 
diff --git a/nornflow/cli/init.py b/nornflow/cli/init.py
index 6c7eddc..de1b8df 100644
--- a/nornflow/cli/init.py
+++ b/nornflow/cli/init.py
@@ -18,6 +18,7 @@ from nornflow.cli.constants import (
 from nornflow.cli.exceptions import CLIInitError
 from nornflow.cli.show import show_catalog, show_nornflow_settings
 from nornflow.exceptions import NornFlowError
+from nornflow.logger import logger
 from nornflow.settings import NornFlowSettings
 
 app = typer.Typer()
@@ -65,6 +66,7 @@ def init(ctx: typer.Context) -> None:
             original_exception=e,
         ) from e
     except Exception as e:
+        logger.exception(f"An unexpected error occurred during initialization: {e}")
         raise CLIInitError(
             "An unexpected error occurred during initialization",
             hint=f"Error details: {e!s}",
@@ -162,6 +164,9 @@ def create_directories_from_settings(settings: NornFlowSettings) -> None:
     for blueprints_dir in settings.local_blueprints:
         create_directory(Path(blueprints_dir))
 
+    for j2_filters_dir in settings.local_j2_filters:
+        create_directory(Path(j2_filters_dir))
+
     create_directory(Path(settings.vars_dir))
 
 
diff --git a/nornflow/cli/run.py b/nornflow/cli/run.py
index 7586cd3..9d92847 100644
--- a/nornflow/cli/run.py
+++ b/nornflow/cli/run.py
@@ -15,6 +15,7 @@ from nornflow.constants import (
     NORNFLOW_SUPPORTED_YAML_EXTENSIONS,
 )
 from nornflow.exceptions import NornFlowError
+from nornflow.logger import logger
 from nornflow.utils import normalize_failure_strategy
 
 app = typer.Typer(help="Run NornFlow tasks and workflows")
@@ -349,7 +350,7 @@ def get_nornflow_builder(
         else:
             builder.with_workflow_name(target)
     else:
-        timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
+        timestamp = datetime.now().strftime("%d-%m-%Y %H-%M-%S")
         workflow_dict = {
             "workflow": {
                 "name": f"Task {target} - exec {timestamp}",
@@ -507,11 +508,8 @@ def run(
         )
 
         nornflow = builder.build()
-
-        # Capture the exit code from nornflow.run()
         exit_code = nornflow.run()
 
-        # Exit with the workflow's exit code if non-zero, otherwise return normally
         if exit_code != 0:
             sys.exit(exit_code)
 
@@ -539,6 +537,7 @@ def run(
         raise typer.Exit(code=104)  # noqa: B904
 
     except Exception as e:
+        logger.exception(f"Unexpected error while running {target}: {e}")
         CLIRunError(
             message=f"Unexpected error while running {target}: {e}",
             hint="This may be a bug. Please report it if the issue persists.",
diff --git a/nornflow/cli/samples/nornflow.yaml b/nornflow/cli/samples/nornflow.yaml
index f45aaa1..1095118 100644
--- a/nornflow/cli/samples/nornflow.yaml
+++ b/nornflow/cli/samples/nornflow.yaml
@@ -33,6 +33,9 @@ local_hooks:
 local_blueprints:
   - "blueprints"
 
+local_j2_filters:
+  - "j2_filters"
+
 imported_packages: []
 
 processors:
@@ -43,4 +46,8 @@ vars_dir: "vars"
 
 failure_strategy: "skip-failed"
 
-dry_run: false
\ No newline at end of file
+dry_run: false
+
+logger:
+  directory: ".nornflow/logs"
+  level: "INFO"
diff --git a/nornflow/cli/show.py b/nornflow/cli/show.py
index 6aaab59..de055cf 100644
--- a/nornflow/cli/show.py
+++ b/nornflow/cli/show.py
@@ -1,11 +1,13 @@
 import json
 import textwrap
+from collections.abc import Callable
 from pathlib import Path
 from typing import Any
 
 import typer
 import yaml
 from nornir.core.exceptions import PluginNotRegistered
+from pydantic_serdes.utils import load_file_to_dict
 from tabulate import tabulate
 from termcolor import colored
 
@@ -14,6 +16,7 @@ from nornflow.catalogs import Catalog
 from nornflow.cli.constants import CWD, DESCRIPTION_FIRST_SENTENCE_LENGTH
 from nornflow.cli.exceptions import CLIShowError
 from nornflow.exceptions import NornFlowError
+from nornflow.logger import logger
 
 app = typer.Typer()
 
@@ -35,6 +38,7 @@ def show(  # noqa: PLR0912
     filters: bool = typer.Option(False, "--filters", "-f", help="Display the filter catalog"),
     workflows: bool = typer.Option(False, "--workflows", "-w", help="Display the workflow catalog"),
     blueprints: bool = typer.Option(False, "--blueprints", "-b", help="Display the blueprint catalog"),
+    j2_filters: bool = typer.Option(False, "--j2-filters", "-j", help="Display the Jinja2 filters catalog"),
     settings: bool = typer.Option(False, "--settings", "-s", help="Display current NornFlow Settings"),
     nornir_configs: bool = typer.Option(
         False, "--nornir-configs", "-n", help="Display current Nornir Configs"
@@ -46,10 +50,12 @@ def show(  # noqa: PLR0912
     """
     show_all_catalogs = catalog or catalogs
 
-    if not any([show_all_catalogs, tasks, filters, workflows, blueprints, settings, nornir_configs, all]):
+    if not any(
+        [show_all_catalogs, tasks, filters, workflows, blueprints, j2_filters, settings, nornir_configs, all]
+    ):
         raise typer.BadParameter(
             "You must provide at least one option: --catalogs, --tasks, --filters, --workflows, "
-            "--blueprints, --settings, --nornir-configs, or --all."
+            "--blueprints, --j2-filters, --settings, --nornir-configs, or --all."
         )
 
     try:
@@ -66,6 +72,7 @@ def show(  # noqa: PLR0912
             show_filters_catalog(nornflow)
             show_workflows_catalog(nornflow)
             show_blueprints_catalog(nornflow)
+            show_j2_filters_catalog(nornflow)
             show_nornflow_settings(nornflow)
             show_nornir_configs(nornflow)
         else:
@@ -74,6 +81,7 @@ def show(  # noqa: PLR0912
                 show_filters_catalog(nornflow)
                 show_workflows_catalog(nornflow)
                 show_blueprints_catalog(nornflow)
+                show_j2_filters_catalog(nornflow)
             else:
                 if tasks:
                     show_tasks_catalog(nornflow)
@@ -83,6 +91,8 @@ def show(  # noqa: PLR0912
                     show_workflows_catalog(nornflow)
                 if blueprints:
                     show_blueprints_catalog(nornflow)
+                if j2_filters:
+                    show_j2_filters_catalog(nornflow)
 
             if settings:
                 show_nornflow_settings(nornflow)
@@ -122,6 +132,7 @@ def show(  # noqa: PLR0912
         raise typer.Exit(code=2) from None
 
     except Exception as e:
+        logger.exception(f"Failed to show requested information: {e}")
         CLIShowError(
             message=f"Failed to show requested information: {e}",
             hint="Check your configuration and try again.",
@@ -131,11 +142,12 @@ def show(  # noqa: PLR0912
 
 
 def show_catalog(nornflow: "NornFlow") -> None:
-    """Display all catalogs: tasks, filters, workflows, and blueprints."""
+    """Display all catalogs: tasks, filters, workflows, blueprints, and j2_filters."""
     show_tasks_catalog(nornflow)
     show_filters_catalog(nornflow)
     show_workflows_catalog(nornflow)
     show_blueprints_catalog(nornflow)
+    show_j2_filters_catalog(nornflow)
 
 
 def show_tasks_catalog(nornflow: "NornFlow") -> None:
@@ -178,6 +190,16 @@ def show_blueprints_catalog(nornflow: "NornFlow") -> None:
     )
 
 
+def show_j2_filters_catalog(nornflow: "NornFlow") -> None:
+    """Display the Jinja2 filters catalog."""
+    show_formatted_table(
+        "JINJA2 FILTERS CATALOG",
+        render_j2_filters_catalog_table_data,
+        ["Filter Name", "Description", "Source"],
+        nornflow,
+    )
+
+
 def show_nornflow_settings(nornflow: "NornFlow") -> None:
     """Display the NornFlow settings."""
     show_formatted_table("NORNFLOW SETTINGS", render_settings_table_data, ["Setting", "Value"], nornflow)
@@ -189,7 +211,7 @@ def show_nornir_configs(nornflow: "NornFlow") -> None:
 
 
 def show_formatted_table(
-    banner_text: str, table_data_renderer: callable, headers: list[str], nornflow: "NornFlow"
+    banner_text: str, table_data_renderer: Callable, headers: list[str], nornflow: "NornFlow"
 ) -> None:
     """Display information in a formatted table.
 
@@ -254,28 +276,41 @@ def get_source_from_catalog(catalog: Catalog, item_name: str) -> str:  # noqa: P
     return "Unknown"
 
 
-def render_task_catalog_table_data(nornflow: "NornFlow") -> list[list[str]]:
-    """Render the task catalog as a list of lists.
+def render_callable_catalog_table_data(
+    catalog, description_processor: Callable[[Any], str]
+) -> list[list[str]]:
+    """Render a callable catalog (tasks, filters, or J2 filters) as a list of lists.
 
     Args:
-        nornflow: The NornFlow object.
+        catalog: The catalog to render.
+        description_processor: Function to process the description from the callable.
 
     Returns:
         The table data.
     """
-    tasks_catalog = nornflow.tasks_catalog
     table_data = []
+    item_names = sorted(catalog.get_builtin_items()) + sorted(catalog.get_custom_items())
+    for item_name in item_names:
+        item = catalog[item_name]
+        description = description_processor(item)
+        source_path = get_source_from_catalog(catalog, item_name)
+        table_data.append(get_colored_row(item_name, description, source_path))
+    return table_data
 
-    task_names = sorted(tasks_catalog.get_builtin_items())
-    task_names.extend(sorted(tasks_catalog.get_custom_items()))
 
-    for task_name in task_names:
-        task_func = tasks_catalog[task_name]
-        docstring = task_func.__doc__ or "No description available"
-        description = process_task_description(docstring)
-        source_path = get_source_from_catalog(tasks_catalog, task_name)
-        table_data.append(get_colored_row(task_name, description, source_path))
-    return table_data
+def render_task_catalog_table_data(nornflow: "NornFlow") -> list[list[str]]:
+    """Render the task catalog as a list of lists.
+
+    Args:
+        nornflow: The NornFlow object.
+
+    Returns:
+        The table data.
+    """
+    return render_callable_catalog_table_data(
+        nornflow.tasks_catalog,
+        lambda func: process_task_description(func.__doc__ or "No description available"),
+    )
 
 
 def render_workflows_catalog_table_data(nornflow: "NornFlow") -> list[list[str]]:
@@ -288,7 +323,7 @@ def render_workflows_catalog_table_data(nornflow: "NornFlow") -> list[list[str]]
         The table data.
     """
     return render_file_based_catalog_table_data(
-        nornflow.workflows_catalog, get_workflow_description, nornflow
+        nornflow.workflows_catalog, lambda path: get_yaml_description(path, "workflow"), nornflow
     )
 
 
@@ -302,7 +337,7 @@ def render_blueprints_catalog_table_data(nornflow: "NornFlow") -> list[list[str]
         The table data.
     """
     return render_file_based_catalog_table_data(
-        nornflow.blueprints_catalog, get_blueprint_description, nornflow
+        nornflow.blueprints_catalog, lambda path: get_yaml_description(path, "description"), nornflow
     )
 
 
@@ -315,19 +350,25 @@ def render_filters_catalog_table_data(nornflow: "NornFlow") -> list[list[str]]:
     Returns:
         The table data.
     """
-    filters_catalog = nornflow.filters_catalog
-    table_data = []
+    return render_callable_catalog_table_data(
+        nornflow.filters_catalog,
+        lambda item: process_filter_description(item[0].__doc__ or "No description available", item[1]),
+    )
 
-    filter_names = sorted(filters_catalog.get_builtin_items())
-    filter_names.extend(sorted(filters_catalog.get_custom_items()))
 
-    for filter_name in filter_names:
-        filter_func, param_names = filters_catalog[filter_name]
-        docstring = filter_func.__doc__ or "No description available"
-        description = process_filter_description(docstring, param_names)
-        source_path = get_source_from_catalog(filters_catalog, filter_name)
-        table_data.append(get_colored_row(filter_name, description, source_path))
-    return table_data
+def render_j2_filters_catalog_table_data(nornflow: "NornFlow") -> list[list[str]]:
+    """Render the Jinja2 filters catalog as a list of lists.
+
+    Args:
+        nornflow: The NornFlow object.
+
+    Returns:
+        The table data.
+    """
+    return render_callable_catalog_table_data(
+        nornflow.j2_filters_catalog,
+        lambda func: extract_first_sentence(func.__doc__ or "No description available"),
+    )
 
 
 def render_settings_table_data(nornflow: "NornFlow") -> list[list[str]]:
@@ -423,46 +464,27 @@ def display_banner(banner_text: str, table: str) -> None:
     typer.echo("\n\n" + centered_banner)
 
 
-def get_workflow_description(workflow_path: Path) -> str:
-    """Get description from workflow YAML file.
-
-    Args:
-        workflow_path: Path to the workflow file.
-
-    Returns:
-        The workflow description.
-    """
-    try:
-        with workflow_path.open() as f:
-            workflow_dict = yaml.safe_load(f)
-            return workflow_dict["workflow"].get("description", "No description available")
-    except Exception:
-        return "Could not load description from file"
-
-
-def get_blueprint_description(blueprint_path: Path) -> str:
-    """Get description from blueprint YAML file.
+def get_yaml_description(file_path: Path, key: str) -> str:
+    """Get description from a YAML file by key.
 
     Args:
-        blueprint_path: Path to the blueprint file.
+        file_path: Path to the YAML file.
+        key: The key to extract the description from.
 
     Returns:
-        The blueprint description.
+        The description or a fallback.
     """
     try:
-        with blueprint_path.open() as f:
-            blueprint_dict = yaml.safe_load(f)
-            # Blueprints may have a description at the top level or under a 'blueprint' key
-            return blueprint_dict.get(
-                "description",
-                blueprint_dict.get("blueprint", {}).get("description", "No description available"),
-            )
+        data = load_file_to_dict(file_path)
+        if key == "workflow":
+            return data.get(key, {}).get("description", "No description available")
+        return data.get(key, "No description available")
     except Exception:
         return "Could not load description from file"
 
 
 def render_file_based_catalog_table_data(
-    catalog, description_getter, nornflow: "NornFlow"
+    catalog, description_getter: Callable[[Path], str], nornflow: "NornFlow"
 ) -> list[list[str]]:
     """Render a file-based catalog (workflows or blueprints) as a list of lists.
 
diff --git a/nornflow/constants.py b/nornflow/constants.py
index 0a0841e..0615593 100644
--- a/nornflow/constants.py
+++ b/nornflow/constants.py
@@ -64,17 +64,21 @@ NORNFLOW_DEFAULT_FILTERS_DIR = "filters"
 NORNFLOW_DEFAULT_HOOKS_DIR = "hooks"
 NORNFLOW_DEFAULT_BLUEPRINTS_DIR = "blueprints"
 NORNFLOW_DEFAULT_VARS_DIR = "vars"
+NORNFLOW_DEFAULT_J2_FILTERS_DIR = "j2_filters"
+NORNFLOW_DEFAULT_LOGGER = {"directory": ".nornflow/logs", "level": "INFO"}
 
 NORNFLOW_SETTINGS_OPTIONAL = {
     "local_tasks": [NORNFLOW_DEFAULT_TASKS_DIR],
     "local_workflows": [NORNFLOW_DEFAULT_WORKFLOWS_DIR],
     "local_filters": [NORNFLOW_DEFAULT_FILTERS_DIR],
     "local_hooks": [NORNFLOW_DEFAULT_HOOKS_DIR],
+    "local_j2_filters": [NORNFLOW_DEFAULT_J2_FILTERS_DIR],
     "imported_packages": [],
     "processors": [],
     "vars_dir": NORNFLOW_DEFAULT_VARS_DIR,
     "failure_strategy": FailureStrategy.SKIP_FAILED,
     "dry_run": False,
+    "logger": NORNFLOW_DEFAULT_LOGGER,
 }
 
 # Kwargs that cannot be passed to NornFlow.__init__; they must be set via the settings YAML file.
@@ -85,7 +89,9 @@ NORNFLOW_INVALID_INIT_KWARGS = (
     "local_workflows",
     "local_filters",
     "local_hooks",
+    "local_j2_filters",
     "imported_packages",
+    "logger",
 )
 
 # Supported extensions
diff --git a/nornflow/exceptions.py b/nornflow/exceptions.py
index 470c36a..733a00d 100644
--- a/nornflow/exceptions.py
+++ b/nornflow/exceptions.py
@@ -5,10 +5,11 @@ This module defines the core exceptions used throughout the NornFlow application
 organized hierarchically with clear inheritance paths.
 """
 
+from typing import Any
+
 ###############################################################################
 # ROOT EXCEPTION
 ###############################################################################
-from typing import Any
 
 
 class NornFlowError(Exception):
@@ -19,6 +20,9 @@ class NornFlowError(Exception):
     It should never be raised directly but rather inherited from.
     """
 
+    def __init__(self, message: str = ""):
+        super().__init__(message)
+
 
 ###############################################################################
 # CORE EXCEPTIONS
diff --git a/nornflow/hooks/base.py b/nornflow/hooks/base.py
index 7d3cf5b..8ecbf01 100644
--- a/nornflow/hooks/base.py
+++ b/nornflow/hooks/base.py
@@ -4,6 +4,7 @@ from nornir.core.inventory import Host
 from nornir.core.task import AggregatedResult, MultiResult, Task
 
 from nornflow.hooks.exceptions import HookRegistrationError
+from nornflow.logger import logger
 
 if TYPE_CHECKING:
     from nornflow.models import TaskModel
@@ -33,7 +34,7 @@ class Hook:
         exception_handlers: Maps exception types to handler method names.
     """
 
-    hook_name: ClassVar[str | None] = None
+    hook_name: ClassVar[str]
     run_once_per_task: bool = False
     exception_handlers: ClassVar[dict[type[Exception], str]] = {}
 
@@ -45,13 +46,15 @@ class Hook:
 
         Raises:
             HookRegistrationError: If a different class already registered
-                                   the same hook_name.
+                                   the same hook_name, or if hook_name is invalid.
         """
         super().__init_subclass__(**kwargs)
 
-        # Only register if hook_name is defined
-        if not hasattr(cls, "hook_name") or not cls.hook_name:
-            return
+        if not hasattr(cls, "hook_name") or not isinstance(cls.hook_name, str) or not cls.hook_name.strip():
+            raise HookRegistrationError(
+                f"Hook class {cls.__module__}.{cls.__name__} must define a non-empty string "
+                f"'hook_name' attribute."
+            )
 
         # Check for duplicate registration
         if cls.hook_name in HOOK_REGISTRY:
@@ -64,6 +67,7 @@ class Hook:
                 )
 
         HOOK_REGISTRY[cls.hook_name] = cls
+        logger.info(f"Hook class {cls.__name__} registered with hook_name '{cls.hook_name}'")
 
     def __init__(self, value: Any = None):
         """Initialize hook with configuration value.
diff --git a/nornflow/hooks/loader.py b/nornflow/hooks/loader.py
index 92a392e..892b3db 100644
--- a/nornflow/hooks/loader.py
+++ b/nornflow/hooks/loader.py
@@ -1,6 +1,7 @@
 from typing import Any, TYPE_CHECKING
 
 from nornflow.hooks.base import HOOK_REGISTRY
+from nornflow.logger import logger
 
 if TYPE_CHECKING:
     from nornflow.hooks import Hook
@@ -24,7 +25,12 @@ def load_hooks(hooks_dict: dict[str, Any]) -> list["Hook"]:
     for hook_name, hook_config in hooks_dict.items():
         hook_class = HOOK_REGISTRY.get(hook_name)
         if hook_class:
-            hook_instance = hook_class(hook_config)
-            hooks.append(hook_instance)
-
+            try:
+                hook_instance = hook_class(hook_config)
+                hooks.append(hook_instance)
+            except Exception as e:
+                logger.exception(f"Failed to instantiate hook '{hook_name}': {e}")
+                raise
+
+    logger.debug(f"Loaded {len(hooks)} hooks from configuration.")
     return hooks
diff --git a/nornflow/hooks/mixins.py b/nornflow/hooks/mixins.py
index f16524f..bfaec8d 100644
--- a/nornflow/hooks/mixins.py
+++ b/nornflow/hooks/mixins.py
@@ -1,12 +1,12 @@
 from typing import Any, TYPE_CHECKING
 
-from jinja2 import TemplateSyntaxError
 from nornir.core.inventory import Host
 from nornir.core.task import Task
 
 from nornflow.hooks.exceptions import HookError, HookValidationError
-from nornflow.vars.constants import JINJA2_MARKERS, TRUTHY_STRING_VALUES
-from nornflow.vars.jinja2_utils import Jinja2EnvironmentManager
+from nornflow.j2 import Jinja2Service
+from nornflow.j2.exceptions import TemplateValidationError
+from nornflow.logger import logger
 
 if TYPE_CHECKING:
     from nornflow.models import TaskModel
@@ -50,6 +50,17 @@ class Jinja2ResolvableMixin:
                 should_run = self.get_resolved_value(task, host=host, as_bool=True)
     """
 
+    @property
+    def jinja2(self) -> Jinja2Service:
+        """Get the Jinja2Service instance, creating it lazily if needed.
+
+        Returns:
+            The Jinja2Service instance for template operations.
+        """
+        if not hasattr(self, "_jinja2"):
+            self._jinja2 = Jinja2Service()
+        return self._jinja2
+
     def execute_hook_validations(self, task_model: "TaskModel") -> None:
         """Validate hook configuration, including automatic Jinja2 validation.
 
@@ -86,17 +97,23 @@ class Jinja2ResolvableMixin:
             )
 
         try:
-            manager = Jinja2EnvironmentManager()
-            manager.env.from_string(self.value)
-        except TemplateSyntaxError as e:
+            self.jinja2.compile_template(self.value)
+            logger.debug(
+                f"Validated Jinja2 expression for hook '{self.hook_name}' in task '{task_model.name}'."
+            )
+        except TemplateValidationError as e:
             raise HookValidationError(
                 self.hook_name,
                 [("jinja2_syntax", f"Task '{task_model.name}': Jinja2 syntax error: {e}")],
             ) from e
         except Exception as e:
+            logger.exception(
+                f"Unexpected error validating Jinja2 for hook '{self.hook_name}' in task '{task_model.name}'"
+                f": {e}"
+            )
             raise HookValidationError(
                 self.hook_name,
-                [("jinja2_validation", f"Task '{task_model.name}': Jinja2 validation failed: {e}")],
+                [("jinja2_validation", f"Task '{task_model.name}': Unexpected Jinja2 error: {e}")],
             ) from e
 
     def get_resolved_value(
@@ -126,6 +143,7 @@ class Jinja2ResolvableMixin:
             if not host:
                 host = self._extract_host_from_task(task)
             resolved = self._resolve_jinja2(self.value, host)
+            logger.debug(f"Resolved Jinja2 value for hook '{self.hook_name}' on host '{host.name}'.")
         else:
             resolved = self.value
 
@@ -146,7 +164,7 @@ class Jinja2ResolvableMixin:
         if not isinstance(value, str):
             return False
 
-        return any(marker in value for marker in JINJA2_MARKERS)
+        return self.jinja2.is_template(value)
 
     def _extract_host_from_task(self, task: Task) -> Host:
         """Extract a host from task inventory.
@@ -197,10 +215,4 @@ class Jinja2ResolvableMixin:
         Returns:
             Boolean representation of the value.
         """
-        if isinstance(value, bool):
-            return value
-
-        if isinstance(value, str):
-            return value.lower() in TRUTHY_STRING_VALUES
-
-        return bool(value)
+        return self.jinja2.to_bool(value)
diff --git a/nornflow/j2/__init__.py b/nornflow/j2/__init__.py
new file mode 100644
index 0000000..c464efe
--- /dev/null
+++ b/nornflow/j2/__init__.py
@@ -0,0 +1,17 @@
+"""NornFlow Jinja2 Service Package.
+
+This package provides centralized Jinja2 template management for NornFlow,
+including environment caching, template compilation, and standardized
+resolution methods.
+"""
+
+from nornflow.j2.constants import JINJA2_MARKERS
+from nornflow.j2.core import Jinja2Service
+from nornflow.j2.exceptions import TemplateError, TemplateValidationError
+
+__all__ = [
+    "JINJA2_MARKERS",
+    "Jinja2Service",
+    "TemplateError",
+    "TemplateValidationError",
+]
diff --git a/nornflow/j2/constants.py b/nornflow/j2/constants.py
new file mode 100644
index 0000000..a0edc57
--- /dev/null
+++ b/nornflow/j2/constants.py
@@ -0,0 +1,15 @@
+"""Jinja2-related constants for NornFlow."""
+
+# Template markers for detecting Jinja2 templates - all opening variations
+JINJA2_MARKERS = [
+    "{{",  # Standard variable output
+    "{{-",  # Variable with left whitespace control
+    "{%",  # Statement/control structure
+    "{%-",  # Statement with left whitespace control
+    "{#",  # Comment
+    "{#-",  # Comment with left whitespace control
+]
+
+# Lower case string values that evaluate to True when converting to boolean.
+# This provides a centralized reference point to avoid ambiguity across the codebase.
+TRUTHY_STRING_VALUES = ("true", "yes", "1", "on", "y", "t", "enabled")
diff --git a/nornflow/j2/core.py b/nornflow/j2/core.py
new file mode 100644
index 0000000..3a223fc
--- /dev/null
+++ b/nornflow/j2/core.py
@@ -0,0 +1,307 @@
+from functools import lru_cache
+from threading import Lock
+from typing import Any
+
+from jinja2 import Environment, StrictUndefined, TemplateSyntaxError, UndefinedError
+
+from nornflow.builtins.jinja2_filters import ALL_BUILTIN_J2_FILTERS
+from nornflow.catalogs import CallableCatalog
+from nornflow.j2.constants import JINJA2_MARKERS, TRUTHY_STRING_VALUES
+from nornflow.j2.exceptions import Jinja2ServiceError, TemplateError, TemplateValidationError
+from nornflow.logger import logger
+from nornflow.settings import NornFlowSettings
+from nornflow.utils import is_public_callable
+
+
+class Jinja2Service:
+    """Centralized Jinja2 management for NornFlow.
+
+    Provides a single, cached Jinja2 environment and standardized
+    template operations used throughout NornFlow.
+
+    This service is a singleton that:
+    - Maintains a single Jinja2 environment instance
+    - Provides thread-safe template compilation and caching
+    - Offers standardized resolution methods
+    - Centralizes error handling
+    - Supports registration of custom filters from external directories
+    - Assembles and exposes a shared catalog of J2 filters (built-ins + custom)
+    """
+
+    _instance = None
+    _lock = Lock()
+    _initialized = False
+
+    @classmethod
+    def _initialize_environment(cls, instance) -> None:
+        """Initialize the Jinja2 environment and J2 filters catalog for the instance.
+
+        Args:
+            instance: The Jinja2Service instance to initialize.
+        """
+        instance.environment = Environment(
+            undefined=StrictUndefined,
+            extensions=["jinja2.ext.loopcontrols"],
+            # Autoescape disabled as NornFlow generates network configs, not HTML;
+            # escaping would break outputs like XML/JSON.
+            autoescape=False,  # noqa: S701
+        )
+
+        instance._j2_filters_catalog = CallableCatalog("j2_filters")  # noqa: SLF001
+
+        # Add ALL_BUILTIN_J2_FILTERS to instances j2_filters_catalog
+        for name, func in ALL_BUILTIN_J2_FILTERS.items():
+            instance.j2_filters_catalog.register(name, func, module_name="nornflow.builtins.jinja2_filters")
+
+        # Update environment filters from catalog to ensure consistency
+        instance.environment.filters.update(instance.j2_filters_catalog)
+
+    @classmethod
+    def initialize_with_settings(cls, settings: NornFlowSettings) -> None:
+        """Initialize the service with NornFlow settings, registering custom filters.
+
+        This method configures the Jinja2Service singleton using the provided settings,
+        ensuring custom filters from local_j2_filters directories are registered.
+
+        Args:
+            settings: NornFlowSettings instance containing configuration.
+        """
+        cls.register_custom_filters(settings.local_j2_filters)
+
+    @classmethod
+    def register_custom_filters(cls, local_j2_filters_dirs: list[str]) -> None:
+        """Register custom Jinja2 filters from specified directories into the catalog.
+
+        This method can be called to register custom filters into the Jinja2
+        environment and catalog. It allows multiple calls, with later calls overriding
+        previous filters.
+
+        Args:
+            local_j2_filters_dirs: List of directory paths to scan for custom filters.
+        """
+        instance = cls()
+
+        for dir_path in local_j2_filters_dirs:
+            instance._j2_filters_catalog.discover_items_in_dir(dir_path, predicate=is_public_callable)
+
+        # Update environment filters from catalog to reflect changes
+        instance.environment.filters.update(instance._j2_filters_catalog)
+
+    @classmethod
+    def get_registered_j2_filters(cls) -> dict[str, Any]:
+        """Retrieve the list of registered Jinja2 filters for display purposes.
+
+        This is used by the CLI show command to list available filters.
+
+        Returns:
+            Dictionary of filter names to their callable functions.
+        """
+        instance = cls()
+        return dict(instance.environment.filters)
+
+    @property
+    def j2_filters_catalog(self) -> CallableCatalog:
+        """Get the shared J2 filters catalog (built-ins + custom).
+
+        This catalog is assembled internally and cannot be set directly.
+
+        Returns:
+            CallableCatalog: The shared catalog of J2 filters.
+        """
+        return self._j2_filters_catalog
+
+    @j2_filters_catalog.setter
+    def j2_filters_catalog(self, value: Any) -> None:
+        """Prevent setting the J2 filters catalog directly.
+
+        Raises:
+            Jinja2ServiceError: Always raised to prevent direct setting.
+        """
+        raise Jinja2ServiceError("J2 filters catalog cannot be set directly.")
+
+    def __new__(cls):
+        with cls._lock:
+            if cls._instance is None:
+                cls._instance = super().__new__(cls)
+                cls._initialize_environment(cls._instance)
+                cls._initialized = True
+        return cls._instance
+
+    @property
+    def environment(self) -> Environment:
+        """Get the cached Jinja2 environment."""
+        return self._environment
+
+    @environment.setter
+    def environment(self, value: Environment) -> None:
+        """Set the Jinja2 environment."""
+        if not isinstance(value, Environment):
+            raise Jinja2ServiceError(f"Expected Environment instance, got {type(value).__name__}")
+        self._environment = value
+
+    # @lru_cache is safe here despite B019: as a singleton, only one instance exists,
+    # so no risk of accumulating references that prevent garbage collection. Templates
+    # are cached for the app's lifetime anyway, aligning with singleton behavior.
+    @lru_cache(maxsize=256)  # noqa: B019
+    def compile_template(self, template_str: str) -> Any:
+        """Compile and cache a template string.
+
+        Args:
+            template_str: The template string to compile
+
+        Returns:
+            Compiled Template object
+
+        Raises:
+            TemplateValidationError: If template has syntax errors
+        """
+        try:
+            compiled = self._environment.from_string(template_str)
+            logger.debug(f"Compiled template (length={len(template_str)})")
+            return compiled
+        except Exception as e:
+            logger.exception(f"Unexpected error compiling template (length={len(template_str)}): {e}")
+            raise TemplateValidationError(f"Template compilation failed: {e}", template=template_str) from e
+
+    def resolve_string(self, template_str: str, context: dict[str, Any], error_context: str = "") -> str:
+        """Resolve a Jinja2 template string.
+
+        Args:
+            template_str: The template string to resolve
+            context: Variables for resolution
+            error_context: Description for error messages
+
+        Returns:
+            Resolved string
+
+        Raises:
+            TemplateError: If resolution fails
+        """
+        if not isinstance(template_str, str):
+            raise TemplateValidationError(
+                f"Expected string for 'template_str', got {type(template_str).__name__}"
+            )
+
+        if not self.is_template(template_str):
+            return template_str
+
+        try:
+            template = self.compile_template(template_str)
+            result = template.render(context)
+            logger.debug(f"Resolved template: input_len={len(template_str)}, output_len={len(result)}")
+            return result
+        except UndefinedError as e:
+            context_info = f" ({error_context})" if error_context else ""
+            raise TemplateError(f"Undefined variable in template{context_info}: {e}") from e
+        except TemplateSyntaxError as e:
+            context_info = f" ({error_context})" if error_context else ""
+            raise TemplateError(f"Template syntax error{context_info}: {e}") from e
+        except Exception as e:
+            context_info = f" ({error_context})" if error_context else ""
+            logger.exception(
+                f"Unexpected error resolving template (length={len(template_str)}){context_info}: {e}"
+            )
+            raise TemplateError(f"Template rendering error{context_info}: {e}") from e
+
+    def resolve_to_bool(self, value: Any, context: dict[str, Any]) -> bool:
+        """Resolve a value to boolean, handling templates and literals.
+
+        Args:
+            value: Value to resolve (bool, string, or template)
+            context: Variables for template resolution
+
+        Returns:
+            Boolean result
+        """
+        if isinstance(value, bool):
+            return value
+
+        if isinstance(value, str):
+            # Check if it's a template
+            if self.is_template(value):
+                resolved = self.resolve_string(value, context)
+                return self.to_bool(resolved)
+            # Plain string literal
+            return self.to_bool(value)
+
+        return bool(value)
+
+    def resolve_data(self, data: Any, context: dict[str, Any], error_context: str = "") -> Any:
+        """Recursively resolve templates in data structures.
+
+        Args:
+            data: Data structure to process
+            context: Variables for resolution
+            error_context: Description for error messages
+
+        Returns:
+            Data with all templates resolved
+        """
+        result = self._render_data_recursive_impl(data, context, error_context)
+        logger.debug(f"Resolved data structure with {len(str(data)) if data else 0} chars.")
+        return result
+
+    def validate_template(self, template_str: str) -> tuple[bool, str]:
+        """Validate template syntax without rendering.
+
+        Args:
+            template_str: Template to validate
+
+        Returns:
+            Tuple of (is_valid, error_message)
+        """
+        try:
+            self.compile_template(template_str)
+            return (True, "")
+        except Exception as e:
+            return (False, str(e))
+
+    def is_template(self, value: str) -> bool:
+        """Check if string contains Jinja2 markers.
+
+        Args:
+            value: String to check
+
+        Returns:
+            True if string contains Jinja2 markers
+        """
+        return any(marker in value for marker in JINJA2_MARKERS)
+
+    def to_bool(self, value: Any) -> bool:
+        """Convert value to boolean using NornFlow conventions.
+
+        Args:
+            value: Value to convert
+
+        Returns:
+            Boolean result
+        """
+        if isinstance(value, bool):
+            return value
+        if isinstance(value, str):
+            return value.lower() in TRUTHY_STRING_VALUES
+        return bool(value)
+
+    def _render_data_recursive_impl(self, data: Any, context: dict[str, Any], error_context: str) -> Any:
+        """Implementation of recursive data rendering.
+
+        Args:
+            data: The data to process
+            context: Variables for rendering
+            error_context: Description for error messages
+
+        Returns:
+            The processed data
+        """
+        if isinstance(data, str):
+            if self.is_template(data):
+                return self.resolve_string(data, context, error_context)
+            return data
+        if isinstance(data, dict):
+            return {k: self._render_data_recursive_impl(v, context, error_context) for k, v in data.items()}
+        # Handle both lists and tuples, and normalize to list.
+        # This preserves behavior where YAML-defined lists remain lists,
+        # even if converted to tuples for internal use (e.g., hashability).
+        if isinstance(data, (list, tuple)):
+            return [self._render_data_recursive_impl(item, context, error_context) for item in data]
+        return data
diff --git a/nornflow/j2/exceptions.py b/nornflow/j2/exceptions.py
new file mode 100644
index 0000000..6e67793
--- /dev/null
+++ b/nornflow/j2/exceptions.py
@@ -0,0 +1,25 @@
+"""Jinja2-specific exceptions for NornFlow."""
+
+from nornflow.exceptions import NornFlowError
+
+
+class Jinja2ServiceError(NornFlowError):
+    """Base exception for Jinja2Service-related errors."""
+
+
+class TemplateError(Jinja2ServiceError):
+    """
+    Exception class for template rendering errors.
+    """
+
+    def __init__(self, message: str = "", template: str = ""):
+        # Truncate very long templates
+        template_preview = template[:97] + "..." if len(template) > 100 else template  # noqa: PLR2004
+
+        context = f" Template: '{template_preview}'" if template else ""
+        super().__init__(f"{message}{context}")
+        self.template = template
+
+
+class TemplateValidationError(TemplateError):
+    """Exception raised when template validation fails."""
diff --git a/nornflow/logger.py b/nornflow/logger.py
new file mode 100644
index 0000000..90c2ade
--- /dev/null
+++ b/nornflow/logger.py
@@ -0,0 +1,316 @@
+"""
+NornFlow Logging Module
+
+This module provides a centralized logging system for NornFlow applications.
+It implements a singleton logger that supports synchronous file-based logging with
+timestamped log files and execution context tracking.
+
+Key Features:
+- Singleton pattern for consistent logging across the application
+- Synchronous file logging with automatic log file creation
+- Custom formatter for precise timestamps with microseconds
+- Execution context tracking for workflow and task runs
+- Configurable log levels and directories
+
+Usage:
+    from nornflow.logger import logger
+
+    logger.info("This is an info message")
+    logger.set_execution_context("my_workflow", "workflow", "/path/to/logs", "INFO")
+    logger.debug("This will go to the log file")
+"""
+
+import logging
+import re
+import sys
+from datetime import datetime
+from pathlib import Path
+from typing import Any
+
+from nornflow.constants import NORNFLOW_DEFAULT_LOGGER, PROTECTED_KEYWORDS
+
+REDACTED = "***REDACTED***"
+
+
+def _get_sanitize_pattern() -> re.Pattern:
+    """Get or build the compiled regex pattern for sensitive data detection."""
+    if not hasattr(_get_sanitize_pattern, "_pattern"):
+        keywords = "|".join(re.escape(kw) for kw in PROTECTED_KEYWORDS)
+        _get_sanitize_pattern._pattern = re.compile(  # noqa: SLF001
+            rf"({keywords})(\s*[:=]\s*)(['\"]?)(\S+?)(\3)(?=\s|,|}}|\]|$)", re.IGNORECASE
+        )
+    return _get_sanitize_pattern._pattern  # noqa: SLF001
+
+
+def sanitize_log_message(message: str) -> str:
+    """Sanitize sensitive data from a log message.
+
+    Args:
+        message: The log message to sanitize.
+
+    Returns:
+        Message with sensitive values replaced by REDACTED.
+    """
+    if not isinstance(message, str):
+        return message
+    return _get_sanitize_pattern().sub(rf"\1\2\3{REDACTED}\5", message)
+
+
+def sanitize_filename(name: str) -> str:
+    """Sanitize a string for safe use as a filename.
+
+    Prevents path traversal attacks and invalid filename characters.
+
+    Args:
+        name: The raw name to sanitize.
+
+    Returns:
+        A safe filename string containing only alphanumeric chars, dots,
+        hyphens, and underscores.
+    """
+    if not name:
+        return "unnamed"
+
+    sanitized = name.replace("/", "_").replace("\\", "_")
+    sanitized = sanitized.replace("..", "_")
+    sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", sanitized)
+    sanitized = re.sub(r"_+", "_", sanitized)
+    sanitized = sanitized.strip("_. ")
+
+    if not sanitized:
+        return "unnamed"
+
+    return sanitized
+
+
+class MicrosecondFormatter(logging.Formatter):
+    """Custom formatter with microsecond timestamps and sensitive data sanitization."""
+
+    def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str:  # noqa: N802
+        """Format the time with microseconds support."""
+        ct = datetime.fromtimestamp(record.created)  # noqa: DTZ006
+        s = ct.strftime(datefmt) if datefmt else ct.isoformat()
+        return s
+
+    def format(self, record: logging.LogRecord) -> str:
+        """Format the log record with sanitized message."""
+        record.msg = sanitize_log_message(str(record.msg))
+        if record.args:
+            record.args = tuple(
+                sanitize_log_message(arg) if isinstance(arg, str) else arg for arg in record.args
+            )
+        return super().format(record)
+
+
+class NornFlowLogger:
+    """
+    Singleton logger class for NornFlow.
+
+    This class manages a single logger instance that can be configured
+    to write to files based on execution context.
+    """
+
+    _instance = None
+
+    def __new__(cls):
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+        return cls._instance
+
+    def __init__(self):
+        if hasattr(self, "_initialized"):
+            return
+        self._initialized = True
+
+        # Core logger
+        self._logger = logging.getLogger("nornflow")
+        self._logger.setLevel(logging.DEBUG)
+
+        # Console handler for ERROR level and above (always active for visibility)
+        console_handler = logging.StreamHandler(sys.stderr)
+        console_handler.setLevel(logging.ERROR)
+        console_formatter = logging.Formatter(
+            "%(asctime)s [%(levelname)s] [%(name)s] - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
+        )
+        console_handler.setFormatter(console_formatter)
+        self._logger.addHandler(console_handler)
+
+        # Execution context
+        self._execution_context = None
+        self._file_handler = None
+
+    def set_execution_context(
+        self,
+        execution_name: str,
+        execution_type: str,
+        log_dir: str | Path | None = None,
+        log_level: str = "INFO",
+    ) -> None:
+        """
+        Set the execution context for logging.
+
+        This creates a timestamped log file and configures the logger to write to it.
+
+        Args:
+            execution_name: Name of the execution (workflow name, task name, etc.)
+            execution_type: Type of execution ("workflow", "task", etc.)
+            log_dir: Directory to store log files. If None, uses default.
+            log_level: Logging level (e.g., "DEBUG", "INFO").
+        """
+        # Remove existing file handler if present
+        if self._file_handler:
+            self._logger.removeHandler(self._file_handler)
+            self._file_handler.close()
+            self._file_handler = None
+
+        # Use default if log_dir is None
+        if not log_dir:
+            log_dir = NORNFLOW_DEFAULT_LOGGER["directory"]
+
+        # Set logger level
+        level = getattr(logging, log_level.upper(), logging.INFO)
+        self._logger.setLevel(level)
+
+        # Create log directory if it doesn't exist
+        log_path = Path(log_dir)
+        log_path.mkdir(parents=True, exist_ok=True)
+
+        # Generate timestamped filename with sanitized execution name
+        safe_name = sanitize_filename(execution_name)
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{safe_name}_{timestamp}.log"
+        filepath = log_path / filename
+
+        # Create file handler
+        self._file_handler = logging.FileHandler(filepath, encoding="utf-8")
+        self._file_handler.setLevel(level)
+
+        # Create file formatter with microsecond timestamps
+        self._file_handler.setFormatter(
+            MicrosecondFormatter("%(asctime)s [%(levelname)s] [%(name)s] - %(message)s")
+        )
+
+        # Add file handler to logger
+        self._logger.addHandler(self._file_handler)
+
+        # Store execution context
+        self._execution_context = {
+            "execution_name": execution_name,
+            "execution_type": execution_type,
+            "log_dir": str(log_dir),
+            "log_file": str(filepath),
+            "start_time": datetime.now(),
+        }
+
+        # Log the start of execution
+        self.info(f"Started {execution_type} execution: {execution_name}")
+
+    def update_execution_context(
+        self,
+        execution_name: str | None = None,
+        execution_type: str | None = None,
+        log_dir: str | Path | None = None,
+        log_level: str | None = None,
+    ) -> None:
+        """Update the execution context for logging by renaming the existing file and updating the handler.
+
+        Args:
+            execution_name: New execution name (optional).
+            execution_type: New execution type (optional).
+            log_dir: New log directory (optional).
+            log_level: New log level (optional).
+        """
+        if not self._file_handler or not self._execution_context:
+            return
+
+        needs_file_rename = False
+        old_filepath = Path(self._execution_context["log_file"])
+
+        if execution_name or execution_type:
+            self._execution_context["execution_name"] = execution_name or self._execution_context.get(
+                "execution_name", "unknown"
+            )
+            self._execution_context["execution_type"] = execution_type or self._execution_context.get(
+                "execution_type", "unknown"
+            )
+            needs_file_rename = True
+
+        if log_level:
+            level = getattr(logging, log_level.upper(), logging.INFO)
+            self._logger.setLevel(level)
+            self._file_handler.setLevel(level)
+
+        if log_dir or needs_file_rename:
+            new_log_path = Path(log_dir or self._execution_context["log_dir"])
+            new_log_path.mkdir(parents=True, exist_ok=True)
+            safe_name = sanitize_filename(self._execution_context["execution_name"])
+            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+            new_filename = f"{safe_name}_{timestamp}.log"
+            new_filepath = new_log_path / new_filename
+
+            # Close the handler, try to rename, then reopen
+            self._file_handler.close()
+            try:
+                old_filepath.rename(new_filepath)
+                actual_filepath = new_filepath
+            except OSError:
+                # If rename fails, keep using the old file
+                actual_filepath = old_filepath
+
+            self._file_handler.baseFilename = str(actual_filepath)
+            self._file_handler.stream = actual_filepath.open("a", encoding="utf-8")
+
+            self._execution_context["log_dir"] = str(new_log_path)
+            self._execution_context["log_file"] = str(actual_filepath)
+
+    def clear_execution_context(self) -> None:
+        """
+        Clear the current execution context and stop file logging.
+        """
+        if self._file_handler:
+            self._logger.removeHandler(self._file_handler)
+            self._file_handler.close()
+            self._file_handler = None
+
+        if self._execution_context:
+            execution_time = datetime.now() - self._execution_context["start_time"]
+            self.info(f"Completed execution in {execution_time.total_seconds():.2f} seconds")
+
+        self._execution_context = None
+
+    def get_execution_context(self) -> dict[str, Any] | None:
+        """
+        Get the current execution context.
+
+        Returns:
+            Current execution context dict or None if not set
+        """
+        return self._execution_context
+
+    def debug(self, message: str, *args: object, **kwargs) -> None:
+        """Log a debug message."""
+        self._logger.debug(message, *args, **kwargs)
+
+    def info(self, message: str, *args: object, **kwargs) -> None:
+        """Log an info message."""
+        self._logger.info(message, *args, **kwargs)
+
+    def warning(self, message: str, *args: object, **kwargs) -> None:
+        """Log a warning message."""
+        self._logger.warning(message, *args, **kwargs)
+
+    def error(self, message: str, *args: object, **kwargs) -> None:
+        """Log an error message."""
+        self._logger.error(message, *args, **kwargs)
+
+    def critical(self, message: str, *args: object, **kwargs) -> None:
+        """Log a critical message."""
+        self._logger.critical(message, *args, **kwargs)
+
+    def exception(self, message: str, *args: object, **kwargs) -> None:
+        """Log an exception with traceback."""
+        self._logger.exception(message, *args, **kwargs)
+
+
+# Create the singleton instance
+logger = NornFlowLogger()
diff --git a/nornflow/models/__init__.py b/nornflow/models/__init__.py
index 27ddbec..5bcf4a6 100644
--- a/nornflow/models/__init__.py
+++ b/nornflow/models/__init__.py
@@ -1,11 +1,13 @@
 """NornFlow models package for workflow and task definitions."""
 
 from .base import NornFlowBaseModel
+from .blueprint import BlueprintModel
 from .hookable import HookableModel
 from .task import TaskModel
 from .workflow import WorkflowModel
 
 __all__ = [
+    "BlueprintModel",
     "HookableModel",
     "NornFlowBaseModel",
     "TaskModel",
diff --git a/nornflow/models/base.py b/nornflow/models/base.py
index 842b9d9..caa2227 100644
--- a/nornflow/models/base.py
+++ b/nornflow/models/base.py
@@ -2,6 +2,7 @@ from typing import Any, ClassVar
 
 from pydantic_serdes.models import PydanticSerdesBaseModel
 
+from nornflow.logger import logger
 from nornflow.models.validators import run_universal_field_validation
 
 
@@ -18,6 +19,10 @@ class NornFlowBaseModel(PydanticSerdesBaseModel):
         """
         Create model instance with universal field validation.
         """
-        new_instance = super().create(model_dict, *args, **kwargs)
-        run_universal_field_validation(new_instance)
-        return new_instance
+        try:
+            new_instance = super().create(model_dict, *args, **kwargs)
+            run_universal_field_validation(new_instance)
+            return new_instance
+        except Exception as e:
+            logger.exception(f"Failed to create model instance for {cls.__name__}: {e}")
+            raise
diff --git a/nornflow/models/blueprint.py b/nornflow/models/blueprint.py
new file mode 100644
index 0000000..cbd118c
--- /dev/null
+++ b/nornflow/models/blueprint.py
@@ -0,0 +1,31 @@
+"""
+Blueprint model for transient validation during workflow expansion.
+
+This model is intentionally NOT a PydanticSerdesBaseModel subclass because:
+1. Blueprints are temporary - used only during workflow loading/expansion
+2. They don't need data store persistence or retrieval
+3. They don't need hashability - they're validated and immediately discarded
+4. Avoiding PydanticSerdes inheritance prevents circular import issues
+
+Blueprint files are loaded, validated for structure, and their tasks are
+extracted and expanded into the workflow. The BlueprintModel instance itself
+is never stored or referenced after validation.
+"""
+
+from typing import Any
+
+from pydantic import BaseModel, ConfigDict
+
+
+class BlueprintModel(BaseModel):
+    """
+    Model for blueprint files, ensuring valid structure for expansion.
+
+    Blueprints define reusable task collections with optional descriptions.
+    They are loaded and validated during workflow expansion.
+    """
+
+    model_config = ConfigDict(extra="forbid")
+
+    description: str | None = None
+    tasks: list[dict[str, Any]]
diff --git a/nornflow/models/hookable.py b/nornflow/models/hookable.py
index 14ae2fc..942c356 100644
--- a/nornflow/models/hookable.py
+++ b/nornflow/models/hookable.py
@@ -1,4 +1,3 @@
-import logging
 from abc import ABC
 from collections.abc import Callable
 from typing import Any
@@ -15,8 +14,6 @@ from nornflow.nornir_manager import NornirManager
 from nornflow.vars.manager import NornFlowVariablesManager
 from .base import NornFlowBaseModel
 
-logger = logging.getLogger(__name__)
-
 
 class HookableModel(NornFlowBaseModel, ABC):
     """Abstract base class for models that support hooks.
diff --git a/nornflow/models/task.py b/nornflow/models/task.py
index 9d7e80d..7ff4f28 100644
--- a/nornflow/models/task.py
+++ b/nornflow/models/task.py
@@ -7,6 +7,7 @@ from pydantic_serdes.custom_collections import HashableDict
 from pydantic_serdes.utils import convert_to_hashable
 
 from nornflow.exceptions import TaskError
+from nornflow.logger import logger
 from nornflow.models import HookableModel
 from nornflow.models.validators import run_post_creation_task_validation
 from nornflow.nornir_manager import NornirManager
@@ -85,13 +86,17 @@ class TaskModel(HookableModel):
         tasks_catalog: dict[str, Callable],
     ) -> AggregatedResult:
         """Execute the task using the provided managers and tasks catalog."""
+        logger.info(f"Starting execution of task '{self.canonical_id}'")
         task_func = tasks_catalog.get(self.name)
         if not task_func:
+            logger.error(f"Task function for '{self.name}' not found in tasks catalog")
             raise TaskError(f"Task function for '{self.name}' not found in tasks catalog")
 
         task_args = self.get_task_args()
+        logger.debug(f"Task '{self.canonical_id}' prepared with args: {list(task_args.keys())}")
 
         self.validate_hooks_and_set_task_context(nornir_manager, vars_manager, task_func)
 
         result = nornir_manager.nornir.run(task=task_func, **task_args)
+        logger.info(f"Task '{self.canonical_id}' execution completed")
         return result
diff --git a/nornflow/models/workflow.py b/nornflow/models/workflow.py
index a3214fa..f3aca60 100644
--- a/nornflow/models/workflow.py
+++ b/nornflow/models/workflow.py
@@ -6,12 +6,12 @@ from pydantic import field_validator
 from pydantic_serdes.custom_collections import HashableDict, OneToMany
 from pydantic_serdes.utils import convert_to_hashable
 
-from nornflow.blueprints import BlueprintExpander, BlueprintResolver
+from nornflow.blueprints import BlueprintExpander
 from nornflow.constants import FailureStrategy
 from nornflow.exceptions import WorkflowError
+from nornflow.logger import logger
 from nornflow.models import NornFlowBaseModel, TaskModel
 from nornflow.utils import normalize_failure_strategy
-from nornflow.vars.jinja2_utils import Jinja2EnvironmentManager
 
 
 class WorkflowModel(NornFlowBaseModel):
@@ -53,6 +53,7 @@ class WorkflowModel(NornFlowBaseModel):
             BlueprintError: If blueprint expansion fails.
         """
         if "workflow" not in dict_args:
+            logger.error("Workflow creation failed: missing 'workflow' key in dict_args")
             raise WorkflowError("Workflow file must have 'workflow' as a root-level key.")
 
         workflow_dict = dict_args["workflow"]
@@ -60,10 +61,6 @@ class WorkflowModel(NornFlowBaseModel):
         if "tasks" not in workflow_dict:
             workflow_dict["tasks"] = []
 
-        jinja2_manager = Jinja2EnvironmentManager()
-        resolver = BlueprintResolver(jinja2_manager)
-        expander = BlueprintExpander(resolver)
-
         # Pop blueprint-specific kwargs to consume them and remove them from the dict.
         blueprints_catalog = kwargs.pop("blueprints_catalog", None)
         vars_dir = kwargs.pop("vars_dir", None)
@@ -71,7 +68,7 @@ class WorkflowModel(NornFlowBaseModel):
         workflow_roots = kwargs.pop("workflow_roots", None)
         cli_vars = kwargs.pop("cli_vars", None)
 
-        expanded_tasks = expander.expand_blueprints(
+        expanded_tasks = BlueprintExpander().expand_blueprints(
             tasks=workflow_dict["tasks"],
             blueprints_catalog=blueprints_catalog,
             vars_dir=vars_dir,
diff --git a/nornflow/nornflow.py b/nornflow/nornflow.py
index 8df83d3..7a5808e 100644
--- a/nornflow/nornflow.py
+++ b/nornflow/nornflow.py
@@ -18,6 +18,8 @@ from nornflow.exceptions import (
     TaskError,
     WorkflowError,
 )
+from nornflow.j2 import Jinja2Service
+from nornflow.logger import logger
 from nornflow.models import WorkflowModel
 from nornflow.nornir_manager import NornirManager
 from nornflow.settings import NornFlowSettings
@@ -96,8 +98,8 @@ class NornFlow:
             workflow: Pre-configured WorkflowModel instance or workflow name string (optional)
             processors: List of processor configurations to override default processors
             vars: Variables with highest precedence in the variable resolution chain.
-                While named "vars" due to their primary source being command-line
-                arguments, these serve as a universal override mechanism that can be:
+                While named "vars" due to their primary use case (command-line arguments), these
+                serve as a universal override mechanism that can be:
                 - Parsed from actual CLI arguments (--vars)
                 - Set programmatically for workflow customization
                 - Updated at runtime for dynamic behavior
@@ -114,23 +116,36 @@ class NornFlow:
             InitializationError: If initialization fails due to invalid configuration
         """
         try:
+            logger.info("Initializing NornFlow instance")
             self._validate_init_kwargs(kwargs)
             self._initialize_settings(nornflow_settings, kwargs)
+
+            logger.set_execution_context(
+                execution_name="loading",
+                execution_type="workflow",
+                log_dir=self.settings.logger.get("directory"),
+                log_level=self.settings.logger.get("level", "INFO"),
+            )
+
             self._initialize_instance_vars(vars, filters, failure_strategy, dry_run, processors)
             self._initialize_hooks()
             self._initialize_catalogs()
             self._initialize_processors()
+            self._initialize_j2_service()
             if workflow:
                 self.workflow = workflow
+            logger.info("NornFlow instance initialized successfully")
         except CoreError:
             raise
         except Exception as e:
+            logger.exception(f"Failed to initialize NornFlow: {e!s}")
             raise InitializationError(f"Failed to initialize NornFlow: {e!s}", component="NornFlow") from e
 
     def _initialize_settings(
         self, nornflow_settings: NornFlowSettings | None, kwargs: dict[str, Any]
     ) -> None:
         """Initialize NornFlow settings from provided object or kwargs."""
+        logger.debug("Initializing NornFlow settings")
         if nornflow_settings:
             self._settings = nornflow_settings
         else:
@@ -148,6 +163,7 @@ class NornFlow:
         processors: list[dict[str, Any]] | None,
     ) -> None:
         """Initialize core instance variables."""
+        logger.debug("Initializing instance variables")
         self._vars = vars or {}
         self._filters = filters or {}
         self._failure_strategy = failure_strategy
@@ -165,17 +181,17 @@ class NornFlow:
 
     def _initialize_catalogs(self) -> None:
         """Initialize and load catalogs."""
-        self._tasks_catalog = CallableCatalog("tasks")
-        self._filters_catalog = CallableCatalog("filters")
-        self._workflows_catalog = FileCatalog("workflows")
-        self._blueprints_catalog = FileCatalog("blueprints")
+        logger.debug("Initializing catalogs")
         self._load_tasks_catalog()
         self._load_filters_catalog()
         self._load_workflows_catalog()
         self._load_blueprints_catalog()
+        # Note: j2_filters_catalog is handled by Jinja2Service
+        # and doesn't need a separate load method.
 
     def _initialize_hooks(self) -> None:
         """Initialize hooks by importing modules from configured directories."""
+        logger.debug("Initializing hooks")
         for dir_path in self.settings.local_hooks:
             dir_path_obj = Path(dir_path)
             if dir_path_obj.exists():
@@ -183,19 +199,21 @@ class NornFlow:
 
     def _initialize_nornir(self) -> None:
         """Initialize Nornir configurations and manager."""
+        logger.debug("Initializing Nornir manager")
         if self._nornir_manager:
             return
 
         try:
-            self._nornir_configs = load_file_to_dict(self.settings.nornir_config_file)
+            self._nornir_configs = load_file_to_dict(self.nornir_config_file)
         except Exception as e:
+            logger.exception(f"Failed to load Nornir config from '{self.nornir_config_file}': {e}")
             raise CoreError(
-                f"Failed to load Nornir config from '{self.settings.nornir_config_file}': {e}",
+                f"Failed to load Nornir config from '{self.nornir_config_file}': {e}",
                 component="NornFlow",
             ) from e
 
         self._nornir_manager = NornirManager(
-            nornir_settings=self.settings.nornir_config_file,
+            nornir_settings=self.nornir_config_file,
             **self._nornir_configs,
         )
 
@@ -222,6 +240,7 @@ class NornFlow:
         2. Processors from settings
         3. DefaultNornFlowProcessor
         """
+        logger.debug("Initializing processors")
         processors_list = self.processors or self.settings.processors
         if not processors_list:
             self._processors = [DefaultNornFlowProcessor()]
@@ -235,6 +254,11 @@ class NornFlow:
         except ProcessorError as err:
             raise InitializationError(f"Failed to load processor: {err}") from err
 
+    def _initialize_j2_service(self) -> None:
+        """Initialize the Jinja2 service and register custom filters."""
+        logger.debug("Initializing Jinja2 service")
+        Jinja2Service.initialize_with_settings(self.settings)
+
     @property
     def nornir_configs(self) -> dict[str, Any]:
         """
@@ -540,6 +564,29 @@ class NornFlow:
         """
         raise ImmutableAttributeError("Cannot set blueprints catalog directly.")
 
+    @property
+    def j2_filters_catalog(self) -> CallableCatalog:
+        """
+        Get the J2 filters catalog (shared from Jinja2Service).
+
+        This is a reference to the global catalog assembled by Jinja2Service,
+        containing both built-in and custom filters with full metadata.
+
+        Returns:
+            CallableCatalog: Catalog of J2 filter names and their corresponding functions.
+        """
+        return Jinja2Service().j2_filters_catalog
+
+    @j2_filters_catalog.setter
+    def j2_filters_catalog(self, _: Any) -> None:
+        """
+        Prevent setting the J2 filters catalog directly.
+
+        Raises:
+            ImmutableAttributeError: Always raised to prevent direct setting of the J2 filters catalog.
+        """
+        raise ImmutableAttributeError("Cannot set J2 filters catalog directly.")
+
     @property
     def workflow(self) -> WorkflowModel | None:
         """
@@ -567,8 +614,12 @@ class NornFlow:
         elif isinstance(value, WorkflowModel):
             self._workflow = value
             self._workflow_path = None
+            log_name = self._workflow.name.replace(" ", "_")
+            logger.update_execution_context(execution_name=log_name)
         elif isinstance(value, str):
             self._workflow, self._workflow_path = self._load_workflow_from_name(value)
+            log_name = self._workflow.name.replace(" ", "_")
+            logger.update_execution_context(execution_name=log_name)
         else:
             raise WorkflowError(
                 "Workflow must be a WorkflowModel instance, string name, or None, "
@@ -632,6 +683,7 @@ class NornFlow:
         self,
         catalog_type: type,
         name: str,
+        catalog: Any = None,
         builtin_module: Any = None,
         predicate: Any = None,
         transform_item: Any = None,
@@ -645,6 +697,7 @@ class NornFlow:
         Args:
             catalog_type: The catalog class to instantiate (e.g., CallableCatalog, FileCatalog).
             name: Name of the catalog for error messages.
+            catalog: Optional existing catalog instance to update instead of creating a new one.
             builtin_module: Optional module to register builtins from (for CallableCatalog).
             predicate: Predicate function for filtering items during discovery.
             transform_item: Optional transform function for items (for CallableCatalog).
@@ -659,7 +712,8 @@ class NornFlow:
             ResourceError: If directories don't exist or discovery fails.
             CatalogError: If check_empty is True and catalog is empty.
         """
-        catalog = catalog_type(name)
+        if not catalog:
+            catalog = catalog_type(name)
 
         if builtin_module and predicate:
             catalog.register_from_module(builtin_module, predicate=predicate, transform_item=transform_item)
@@ -679,6 +733,7 @@ class NornFlow:
                         dir_path, predicate=predicate, transform_item=transform_item
                     )
             except Exception as e:
+                logger.exception(f"Error loading {name} from {dir_path}: {e!s}")
                 raise ResourceError(
                     f"Error loading {name} from {dir_path}: {e!s}",
                     resource_type=name,
@@ -721,7 +776,7 @@ class NornFlow:
         Load inventory filters from built-ins and from directories specified in settings.
 
         Filters are loaded in two phases:
-        1. Built-in filters from nornflow.builtins.filters module
+        1. Built-in filters from nflow.builtins.filters module
         2. User-defined filters from configured local_filters
         """
         self._filters_catalog = self._load_catalog(
@@ -786,11 +841,13 @@ class NornFlow:
         Raises:
             TaskError: If any tasks in the workflow are not found in the tasks catalog.
         """
+        logger.debug("Checking tasks in workflow")
         task_names = [task.name for task in self.workflow.tasks]
 
         missing_tasks = [task_name for task_name in task_names if task_name not in self.tasks_catalog]
 
         if missing_tasks:
+            logger.error(f"Missing tasks in catalog: {missing_tasks}")
             available_tasks = ", ".join(sorted(self.tasks_catalog.keys()))
             raise TaskError(
                 f"Task(s) not found in tasks catalog: {', '.join(missing_tasks)}. "
@@ -799,6 +856,7 @@ class NornFlow:
 
     def _apply_filters(self) -> None:
         """Apply inventory filters to the Nornir manager."""
+        logger.debug("Applying inventory filters")
         filter_kwargs_list = self._get_filtering_kwargs()
 
         for filter_kwargs in filter_kwargs_list:
@@ -911,6 +969,7 @@ class NornFlow:
             )
             return workflow, workflow_path
         except Exception as e:
+            logger.exception(f"Failed to load workflow '{name}' from path '{workflow_path}': {e}")
             raise WorkflowError(
                 f"Failed to load workflow '{name}' from path '{workflow_path}': {e}",
                 component="NornFlow",
@@ -955,6 +1014,7 @@ class NornFlow:
         3. User-configurable processors (custom business logic)
         4. NornFlowFailureStrategyProcessor (system - error handling)
         """
+        logger.debug("Applying processors to Nornir instance")
         # Build processor chain with system processors at fixed positions
         # The var_processor property will handle lazy initialization if needed
         all_processors = [
@@ -983,6 +1043,7 @@ class NornFlow:
 
     def _orchestrate_execution(self) -> None:
         """Orchestrate the execution of workflow tasks in sequence."""
+        logger.info("Starting workflow execution")
         with self.nornir_manager:
             for task in self.workflow.tasks:
                 self.nornir_manager.set_dry_run(self.dry_run)
@@ -1000,8 +1061,7 @@ class NornFlow:
             effective_dry_run=self.dry_run,
             hosts_count=len(self.nornir_manager.nornir.inventory.hosts),
             inventory_filters=self.filters or self.workflow.inventory_filters or {},
-            workflow_vars=dict(self.workflow.vars) if self.workflow.vars else {},
-            vars=self.vars,
+            vars_manager=self.var_processor.vars_manager,
             failure_strategy=self.failure_strategy,
         )
 
diff --git a/nornflow/nornir_manager.py b/nornflow/nornir_manager.py
index 7d6c0a2..1ff5cbc 100644
--- a/nornflow/nornir_manager.py
+++ b/nornflow/nornir_manager.py
@@ -3,9 +3,11 @@ from typing import Any
 from nornir import InitNornir
 from nornir.core import Nornir
 from nornir.core.processor import Processor
+from typing_extensions import Self
 
 from nornflow.constants import NORNFLOW_SETTINGS_OPTIONAL
 from nornflow.exceptions import CoreError, ProcessorError
+from nornflow.logger import logger
 
 
 class NornirManager:
@@ -36,6 +38,9 @@ class NornirManager:
             nornir_settings: Path to Nornir config file (YAML)
             **kwargs: Additional arguments to pass to InitNornir
         """
+        logger.info("Initializing NornirManager")
+        logger.debug(f"Nornir settings path: {nornir_settings}")
+        logger.debug(f"Additional kwargs count: {len(kwargs)}")
         # Clean up kwargs before passing to InitNornir
         self._remove_optional_nornflow_settings_from_kwargs(kwargs)
 
@@ -48,8 +53,9 @@ class NornirManager:
             config_file=self.nornir_settings,
             **kwargs,
         )
+        logger.info("NornirManager initialized")
 
-    def __enter__(self) -> "NornirManager":
+    def __enter__(self) -> Self:
         """
         Enter the context manager protocol.
 
@@ -80,9 +86,11 @@ class NornirManager:
         This implementation silently closes connections without producing
         task output to keep the user interface clean.
         """
+        logger.info("Closing Nornir connections")
         if hasattr(self, "nornir"):
             # Store original processors
             original_processors = self.nornir.processors.copy()
+            logger.debug(f"Stored {len(original_processors)} original processors during connection closure")
 
             try:
                 # Clear processors to prevent output during connection closure
@@ -93,6 +101,8 @@ class NornirManager:
             finally:
                 # Restore processors
                 self.nornir.processors = original_processors
+                logger.debug("Restored original processors after connection closure")
+        logger.info("Closed Nornir connections")
 
     def _remove_optional_nornflow_settings_from_kwargs(self, kwargs: dict[str, Any]) -> None:
         """
@@ -101,8 +111,12 @@ class NornirManager:
         Args:
             kwargs: The kwargs dictionary to modify in-place
         """
+        removed_keys = []
         for key in NORNFLOW_SETTINGS_OPTIONAL:
-            kwargs.pop(key, None)
+            if key in kwargs:
+                kwargs.pop(key, None)
+                removed_keys.append(key)
+        logger.debug(f"Removed NornFlow settings from kwargs: {removed_keys}")
 
     def apply_filters(self, **kwargs) -> Nornir:
         """
@@ -123,10 +137,12 @@ class NornirManager:
         Raises:
             ProcessorError: If no filters are provided
         """
+        logger.debug(f"Applying filters with kwargs: {kwargs}")
         if not kwargs:
             raise ProcessorError("No filters informed.")
 
         self.nornir = self.nornir.filter(**kwargs)
+        logger.debug(f"Filtered Nornir inventory now has {len(self.nornir.inventory.hosts)} hosts")
         return self.nornir
 
     def apply_processors(self, processors: list[Processor]) -> Nornir:
@@ -145,10 +161,12 @@ class NornirManager:
         Raises:
             ProcessorError: If no processors are provided
         """
+        logger.debug(f"Applying {len(processors)} processors: {[type(p).__name__ for p in processors]}")
         if not processors:
             raise ProcessorError("No processors informed.")
 
         self.nornir = self.nornir.with_processors(processors)
+        logger.debug(f"Nornir now has {len(self.nornir.processors)} total processors")
         return self.nornir
 
     def apply_runner(self, runner: "RunnerPlugin") -> Nornir:
@@ -164,6 +182,7 @@ class NornirManager:
         Returns:
             Nornir: Nornir instance with the runner applied
         """
+        logger.debug(f"Applying runner: {type(runner).__name__}")
         self.nornir = self.nornir.with_runner(runner)
         return self.nornir
 
@@ -181,6 +200,7 @@ class NornirManager:
             manager.set_dry_run(True)   # Enable dry-run mode
             manager.set_dry_run(False)  # Disable dry-run mode (default)
         """
+        logger.debug(f"Setting dry_run to: {value}")
         if not isinstance(value, bool):
             raise CoreError(
                 f"dry_run value must be a boolean, got {type(value).__name__}: {value}",
@@ -202,8 +222,11 @@ class NornirManager:
         Raises:
             ProcessorError: If no processor of the requested type is found
         """
+        logger.debug(f"Searching for processor of type: {processor_type.__name__}")
         for processor in self.nornir.processors:
             if isinstance(processor, processor_type):
+                logger.debug(f"Found processor: {type(processor).__name__}")
                 return processor
 
+        logger.debug(f"No processor of type {processor_type.__name__} found")
         raise ProcessorError(f"No processor of type {processor_type.__name__} found in Nornir instance")
diff --git a/nornflow/settings.py b/nornflow/settings.py
index 9e4b7ef..2a44076 100644
--- a/nornflow/settings.py
+++ b/nornflow/settings.py
@@ -2,8 +2,8 @@ import os
 from pathlib import Path
 from typing import Any
 
-import yaml
 from pydantic import Field, field_validator, PrivateAttr
+from pydantic_serdes.utils import load_file_to_dict
 from pydantic_settings import BaseSettings, SettingsConfigDict
 
 from nornflow.constants import (
@@ -11,11 +11,14 @@ from nornflow.constants import (
     NORNFLOW_DEFAULT_BLUEPRINTS_DIR,
     NORNFLOW_DEFAULT_FILTERS_DIR,
     NORNFLOW_DEFAULT_HOOKS_DIR,
+    NORNFLOW_DEFAULT_J2_FILTERS_DIR,
+    NORNFLOW_DEFAULT_LOGGER,
     NORNFLOW_DEFAULT_TASKS_DIR,
     NORNFLOW_DEFAULT_VARS_DIR,
     NORNFLOW_DEFAULT_WORKFLOWS_DIR,
 )
 from nornflow.exceptions import SettingsError
+from nornflow.logger import logger
 
 
 class NornFlowSettings(BaseSettings):
@@ -23,24 +26,29 @@ class NornFlowSettings(BaseSettings):
     NornFlow settings management using Pydantic.
 
     Settings are loaded with the following priority (highest to lowest):
-    1. Environment variables (prefixed with NORNFLOW_SETTINGS_)
+    1. Programmatic overrides (passed directly to NornFlowSettings.load() as **overrides)
     2. Values from settings YAML file
-    3. Default values defined in the model
+    3. Environment variables (prefixed with NORNFLOW_SETTINGS_)
+    4. Default values defined in this class
 
     Note the careful terminology:
     - "Settings" refers to NornFlow's own configuration
     - "Configuration/Config" is reserved for Nornir's configuration
 
-    Environment variable examples:
-    - NORNFLOW_SETTINGS_VARS_DIR=/custom/vars
-    - NORNFLOW_SETTINGS_LOCAL_TASKS=["tasks", "custom_tasks"]
-    - NORNFLOW_SETTINGS_FAILURE_STRATEGY=fail-fast
+    Environment variables are case-sensitive. Ensure exact prefix and key matching.
+
+    Examples:
+    - NORNFLOW_SETTINGS_vars_dir=/custom/vars
+    - NORNFLOW_SETTINGS_local_tasks=tasks,custom_tasks
+    - NORNFLOW_SETTINGS_failure_strategy=fail-fast
+    - NORNFLOW_SETTINGS_logger__directory=/custom/logs # notice the '__' for nested keys traversal
+    - NORNFLOW_SETTINGS_logger__level=DEBUG
     """
 
     model_config = SettingsConfigDict(
         env_prefix="NORNFLOW_SETTINGS_",
         env_nested_delimiter="__",
-        case_sensitive=False,
+        case_sensitive=True,
         extra="allow",
     )
 
@@ -64,6 +72,10 @@ class NornFlowSettings(BaseSettings):
         default=[NORNFLOW_DEFAULT_BLUEPRINTS_DIR],
         description="List of directories containing blueprint definitions",
     )
+    local_j2_filters: list[str] = Field(
+        default=[NORNFLOW_DEFAULT_J2_FILTERS_DIR],
+        description="List of directories containing custom Jinja2 filter functions",
+    )
     imported_packages: list[str] = Field(
         default_factory=list, description="List of Python packages to import for additional resources"
     )
@@ -77,6 +89,9 @@ class NornFlowSettings(BaseSettings):
         default=FailureStrategy.SKIP_FAILED, description="Strategy for handling task failures"
     )
     dry_run: bool = Field(default=False, description="Whether to run in dry-run mode")
+    logger: dict[str, Any] = Field(
+        default_factory=lambda: NORNFLOW_DEFAULT_LOGGER.copy(), description="Logger configuration dictionary"
+    )
 
     _base_dir: Path | None = PrivateAttr(default=None)
     _settings_file: str | None = PrivateAttr(default=None)
@@ -89,7 +104,7 @@ class NornFlowSettings(BaseSettings):
             return []
 
         if not isinstance(v, list):
-            raise TypeError("processors must be a list")
+            raise SettingsError("processors must be a list")
 
         validated = []
         for item in v:
@@ -122,6 +137,41 @@ class NornFlowSettings(BaseSettings):
                     ) from e
         return v
 
+    @field_validator("logger", mode="before")
+    @classmethod
+    def validate_logger(cls, v: Any) -> dict[str, Any]:
+        """Validate logger configuration, merging with defaults for missing keys."""
+        if not isinstance(v, dict):
+            raise SettingsError("logger must be a dictionary")
+
+        merged = {**NORNFLOW_DEFAULT_LOGGER, **v}
+
+        if not isinstance(merged["directory"], str):
+            raise SettingsError("logger.directory must be a string")
+        if not isinstance(merged["level"], str):
+            raise SettingsError("logger.level must be a string")
+
+        return merged
+
+    def _resolve_path_field(self, field_name: str, key: str | None, base_dir: Path) -> None:
+        """Resolve a relative path in a field or dict key to an absolute path.
+
+        Args:
+            field_name: Name of the attribute to resolve.
+            key: Key in the dict if the field is a dict, otherwise None.
+            base_dir: Base directory to resolve against.
+        """
+        if key:
+            field_value = getattr(self, field_name)
+            if field_value and key in field_value:
+                path = Path(field_value[key])
+                if not path.is_absolute():
+                    field_value[key] = str(base_dir / path)
+        else:
+            path = Path(getattr(self, field_name))
+            if not path.is_absolute():
+                setattr(self, field_name, str(base_dir / path))
+
     def resolve_relative_paths(self) -> "NornFlowSettings":
         """Resolve relative paths to absolute paths based on base directory."""
         base_dir = self.base_dir
@@ -130,14 +180,9 @@ class NornFlowSettings(BaseSettings):
 
         self._resolve_local_directories(base_dir)
 
-        vars_path = Path(self.vars_dir)
-        if not vars_path.is_absolute():
-            self.vars_dir = str(base_dir / vars_path)
-
-        if self.nornir_config_file:
-            config_path = Path(self.nornir_config_file)
-            if not config_path.is_absolute():
-                self.nornir_config_file = str(base_dir / config_path)
+        self._resolve_path_field("vars_dir", None, base_dir)
+        self._resolve_path_field("nornir_config_file", None, base_dir)
+        self._resolve_path_field("logger", "directory", base_dir)
 
         return self
 
@@ -153,6 +198,7 @@ class NornFlowSettings(BaseSettings):
             "local_filters",
             "local_hooks",
             "local_blueprints",
+            "local_j2_filters",
         ]:
             dirs = getattr(self, field_name)
             if not dirs:
@@ -229,9 +275,9 @@ class NornFlowSettings(BaseSettings):
             base_dir = settings_path.parent
 
         try:
-            with settings_path.open() as f:
-                yaml_data = yaml.safe_load(f) or {}
+            yaml_data = load_file_to_dict(settings_path)
         except Exception as e:
+            logger.exception(f"Failed to load settings from {resolved_file}: {e}")
             raise SettingsError(f"Failed to load settings from {resolved_file}: {e}") from e
 
         if not isinstance(yaml_data, dict):
@@ -245,7 +291,13 @@ class NornFlowSettings(BaseSettings):
         instance._base_dir = base_dir
         instance._settings_file = str(settings_path)
 
-        return instance.resolve_relative_paths()
+        resolved_instance = instance.resolve_relative_paths()
+
+        # Create logger directory if it doesn't exist
+        logger_dir = Path(resolved_instance.logger["directory"])
+        logger_dir.mkdir(parents=True, exist_ok=True)
+
+        return resolved_instance
 
     @property
     def as_dict(self) -> dict[str, Any]:
diff --git a/nornflow/utils.py b/nornflow/utils.py
index 84a416e..84c88d1 100644
--- a/nornflow/utils.py
+++ b/nornflow/utils.py
@@ -1,17 +1,17 @@
 import hashlib
 import importlib
 import inspect
-import logging
 from collections.abc import Callable
 from pathlib import Path
 from types import ModuleType
-from typing import Any, Literal
+from typing import Any, Literal, TYPE_CHECKING
 
 import yaml
 from nornir.core.inventory import Host
 from nornir.core.processor import Processor
 from nornir.core.task import AggregatedResult, MultiResult, Result, Task
 from pydantic_serdes.custom_collections import HashableDict
+from pydantic_serdes.utils import load_file_to_dict
 from rich.align import Align
 from rich.columns import Columns
 from rich.console import Console, Group
@@ -32,8 +32,10 @@ from nornflow.exceptions import (
     ResourceError,
     WorkflowError,
 )
+from nornflow.logger import logger
 
-logger = logging.getLogger(__name__)
+if TYPE_CHECKING:
+    from nornflow.vars.manager import NornFlowVariablesManager
 
 TYPE_DISPLAY_MAPPING: dict[str, str] = {
     "HashableDict": "map",
@@ -45,6 +47,16 @@ TYPE_DISPLAY_MAPPING: dict[str, str] = {
 
 NORNIR_RESULT_TYPES: set[type] = {Result, MultiResult, AggregatedResult}
 
+VAR_SOURCE_CONFIG: list[tuple[str, str, str]] = [
+    ("env_vars", "e", "e: environment variable"),
+    ("default_vars", "g", "g: global defaults"),
+    ("domain_vars", "d", "d: domain defaults"),
+    ("inline_workflow_vars", "w", "w: defined in workflow"),
+    ("cli_vars", "c*", "c*: CLI/programmatic override"),
+]
+
+VAR_SOURCE_ORDER: dict[str, int] = {label: -idx for idx, (_, label, _) in enumerate(VAR_SOURCE_CONFIG)}
+
 
 def normalize_failure_strategy(
     value: str | FailureStrategy, exception_class: type[Exception]
@@ -98,8 +110,10 @@ def import_module_from_path(module_name: str, module_path: str | Path) -> Module
         spec = importlib.util.spec_from_file_location(module_name, str(module_path))
         module = importlib.util.module_from_spec(spec)
         spec.loader.exec_module(module)
+        logger.debug(f"Successfully imported module '{module_name}' from '{module_path}'")
         return module
     except Exception as e:
+        logger.exception(f"Failed to import module '{module_name}' from '{module_path}': {e}")
         raise CoreError(
             f"Failed to import module '{module_name}' from '{module_path}': {e!s}",
             component="ModuleLoader",
@@ -146,6 +160,7 @@ def import_modules_recursively(dir_path: Path) -> list[str]:
     dir_path = dir_path.resolve()
     cwd = Path.cwd().resolve()
 
+    logger.info(f"Starting recursive import of modules from directory: {dir_path}")
     for py_file in dir_path.rglob("*.py"):
         if py_file.name == "__init__.py":
             continue
@@ -161,8 +176,9 @@ def import_modules_recursively(dir_path: Path) -> list[str]:
             imported_modules.append(module_name)
             logger.debug(f"Imported module: {module_name}")
         except Exception as e:
-            logger.error(f"Failed to import module {py_file}: {e}")
+            logger.exception(f"Failed to import module {py_file}: {e}")
 
+    logger.info(f"Completed recursive import: {len(imported_modules)} modules imported")
     return imported_modules
 
 
@@ -181,7 +197,7 @@ def is_nornir_task(attr: Callable) -> bool:
     Returns:
         True if the attribute is a properly annotated Nornir task.
     """
-    if not callable(attr) or not hasattr(attr, "__annotations__"):
+    if not is_public_callable(attr) or not hasattr(attr, "__annotations__"):
         return False
 
     annotations = attr.__annotations__
@@ -225,7 +241,7 @@ def is_nornir_filter(attr: Callable) -> bool:
     Returns:
         True if the attribute is a properly annotated Nornir filter.
     """
-    if not callable(attr):
+    if not is_public_callable(attr):
         return False
 
     try:
@@ -269,6 +285,19 @@ def is_yaml_file(file_path: str | Path) -> bool:
     return path.is_file() and path.suffix in NORNFLOW_SUPPORTED_YAML_EXTENSIONS
 
 
+def is_public_callable(attr: Any) -> bool:
+    """
+    Check if an attribute is a public callable (not starting with '_').
+
+    Args:
+        attr: The attribute to check.
+
+    Returns:
+        True if the attribute is callable and its name does not start with '_'.
+    """
+    return callable(attr) and not getattr(attr, "__name__", "").startswith("_")
+
+
 def load_processor(processor_config: dict) -> Processor:
     """
     Dynamically load and instantiate a processor from config.
@@ -292,7 +321,9 @@ def load_processor(processor_config: dict) -> Processor:
         module_path, class_name = dotted_path.rsplit(".", 1)
         module = importlib.import_module(module_path)
         processor_class = getattr(module, class_name)
-        return processor_class(**args)
+        processor = processor_class(**args)
+        logger.debug(f"Successfully loaded processor '{dotted_path}'")
+        return processor
     except (ImportError, AttributeError) as e:
         raise ProcessorError(f"Failed to load processor '{dotted_path}': {e!s}") from e
     except Exception as e:
@@ -330,6 +361,7 @@ def check_for_jinja2_recursive(obj: Any, path: str) -> None:
     """
     if isinstance(obj, str):
         if JINJA_PATTERN.search(obj):
+            logger.warning(f"Jinja2 code detected in '{path}'; raising error as it's not allowed")
             raise WorkflowError(
                 f"Jinja2 code found in '{path}' which is not allowed. "
                 "Jinja2 expressions are only permitted in specific fields like task args."
@@ -367,40 +399,30 @@ def _get_type_display(value: Any) -> str:
     return TYPE_DISPLAY_MAPPING.get(type_name, type_name)
 
 
-def _add_vars_to_table(
-    table: Table,
-    vars_dict: dict[str, Any],
-    source_label: str,
-) -> None:
-    """
-    Add variables to a Rich table with consistent formatting.
-
-    Args:
-        table: The Rich Table to add rows to.
-        vars_dict: Dictionary of variable name -> value.
-        source_label: Label for the source column (e.g., 'w', 'c*').
-    """
-    for key, value in sorted(vars_dict.items(), key=lambda item: item[0]):
-        table.add_row(
-            source_label,
-            key,
-            format_variable_value(key, value),
-            _get_type_display(value),
-        )
-
-
-def _build_vars_section(workflow_vars: dict[str, Any], cli_vars: dict[str, Any]) -> list[Any]:
+def _build_vars_section(vars_manager: "NornFlowVariablesManager | None") -> list[Any]:
     """
     Build the variables section for the workflow overview panel.
 
     Args:
-        workflow_vars: Variables defined in the workflow.
-        cli_vars: Variables from CLI/programmatic override.
+        vars_manager: Variables manager for assembly-time vars.
 
     Returns:
         List of Rich renderables for the vars section, or empty list if no vars.
     """
-    if not workflow_vars and not cli_vars:
+    if not vars_manager:
+        return []
+
+    sources_used: set[str] = set()
+    all_vars: list[tuple[str, str, Any]] = []
+
+    for attr_name, source_label, _ in VAR_SOURCE_CONFIG:
+        vars_dict = getattr(vars_manager, attr_name, {})
+        if vars_dict:
+            sources_used.add(source_label)
+            for key, value in vars_dict.items():
+                all_vars.append((source_label, key, value))
+
+    if not all_vars:
         return []
 
     vars_table = Table(show_header=True, box=None)
@@ -409,19 +431,23 @@ def _build_vars_section(workflow_vars: dict[str, Any], cli_vars: dict[str, Any])
     vars_table.add_column("Value", style="yellow")
     vars_table.add_column("Type", style="blue", no_wrap=True)
 
-    if workflow_vars:
-        _add_vars_to_table(vars_table, workflow_vars, "w")
-    if cli_vars:
-        _add_vars_to_table(vars_table, cli_vars, "c*")
+    for source, key, value in sorted(all_vars, key=lambda x: (x[1], VAR_SOURCE_ORDER.get(x[0], 999))):
+        vars_table.add_row(
+            source,
+            key,
+            format_variable_value(key, value),
+            _get_type_display(value),
+        )
 
     legend_text = Text()
     legend_text.append("Sources", style="bold dim")
-    legend_text.append("\nw: defined in workflow", style="dim")
-    legend_text.append("\nc*: CLI/programmatic override", style="dim")
+    for _, source_label, legend_desc in VAR_SOURCE_CONFIG:
+        if source_label in sources_used:
+            legend_text.append(f"\n{legend_desc}", style="dim")
 
     return [
         Text("\n"),
-        Padding.indent(Text("Variables", style="bold cyan"), 1),
+        Padding.indent(Text("Assembly-Time Variables", style="bold cyan"), 1),
         Padding.indent(Columns([vars_table, Align.right(legend_text)], expand=True), 2),
     ]
 
@@ -431,9 +457,8 @@ def print_workflow_overview(
     effective_dry_run: bool,
     hosts_count: int,
     inventory_filters: dict[str, Any],
-    workflow_vars: dict[str, Any],
-    vars: dict[str, Any],
     failure_strategy: FailureStrategy | None,
+    vars_manager: "NornFlowVariablesManager | None" = None,
 ) -> None:
     """
     Print a comprehensive workflow overview before execution using Rich.
@@ -443,9 +468,8 @@ def print_workflow_overview(
         effective_dry_run: Whether dry-run mode is enabled.
         hosts_count: Number of hosts in the filtered inventory.
         inventory_filters: Dictionary of applied inventory filters.
-        workflow_vars: Workflow-defined variables.
-        vars: Vars with highest precedence (CLI/programmatic).
         failure_strategy: The active failure handling strategy.
+        vars_manager: Variables manager for assembly-time vars.
     """
     console = Console()
 
@@ -469,7 +493,7 @@ def print_workflow_overview(
     )
 
     elements: list[Any] = [table]
-    elements.extend(_build_vars_section(workflow_vars, vars))
+    elements.extend(_build_vars_section(vars_manager))
 
     panel = Panel(
         Group(*elements),
@@ -498,11 +522,13 @@ def get_file_content_hash(file_path: Path) -> str:
         ResourceError: If file cannot be read or parsed.
     """
     try:
-        content = file_path.read_text(encoding="utf-8")
-        data = yaml.safe_load(content)
+        data = load_file_to_dict(file_path)
         normalized = yaml.dump(data, sort_keys=True, default_flow_style=False)
-        return hashlib.sha256(normalized.encode()).hexdigest()[:16]
+        hash_value = hashlib.sha256(normalized.encode()).hexdigest()[:16]
+        logger.debug(f"Generated content hash for '{file_path}': {hash_value}")
+        return hash_value
     except Exception as e:
+        logger.exception(f"Failed to hash file content: {e}")
         raise ResourceError(
             f"Failed to hash file content: {e}", resource_type="file", resource_name=str(file_path)
         ) from e
diff --git a/nornflow/vars/__init__.py b/nornflow/vars/__init__.py
index 8275fb3..b7f6e98 100644
--- a/nornflow/vars/__init__.py
+++ b/nornflow/vars/__init__.py
@@ -8,17 +8,13 @@ This package provides the variable management functionality for NornFlow, includ
 - Template rendering with Jinja2
 """
 
-from nornflow.vars.constants import JINJA2_MARKERS
 from nornflow.vars.context import NornFlowDeviceContext
-from nornflow.vars.exceptions import TemplateError, VariableError
-from nornflow.vars.jinja2_utils import Jinja2EnvironmentManager
+from nornflow.vars.exceptions import VariableError
 from nornflow.vars.manager import NornFlowVariablesManager
 from nornflow.vars.processors import NornFlowVariableProcessor
 from nornflow.vars.proxy import NornirHostProxy
 
 __all__ = [
-    "JINJA2_MARKERS",
-    "Jinja2EnvironmentManager",
     "NornFlowDeviceContext",
     "NornFlowVariableProcessor",
     "NornFlowVariablesManager",
diff --git a/nornflow/vars/constants.py b/nornflow/vars/constants.py
index d4edcd2..a8c7ab8 100644
--- a/nornflow/vars/constants.py
+++ b/nornflow/vars/constants.py
@@ -3,18 +3,3 @@ ENV_VAR_PREFIX = "NORNFLOW_VAR_"
 
 # File name for default variables
 DEFAULTS_FILENAME = "defaults.yaml"
-
-# Template markers for detecting Jinja2 templates - all opening variations
-JINJA2_MARKERS = [
-    "{{",  # Standard variable output
-    "{{-",  # Variable with left whitespace control
-    "{%",  # Statement/control structure
-    "{%-",  # Statement with left whitespace control
-    "{#",  # Comment
-    "{#-",  # Comment with left whitespace control
-]
-
-# Lower case string values that evaluate to True when converting to boolean.
-# This provides a centralized reference point to avoid ambiguity across the codebase.
-# Note: This has limited use - primarily for hook configurations and CLI flags.
-TRUTHY_STRING_VALUES = ("true", "yes", "1", "on", "y", "t", "enabled")
diff --git a/nornflow/vars/context.py b/nornflow/vars/context.py
index 05f4f74..bda1849 100644
--- a/nornflow/vars/context.py
+++ b/nornflow/vars/context.py
@@ -1,5 +1,7 @@
 from typing import Any, ClassVar
 
+from nornflow.logger import logger
+
 
 class NornFlowDeviceContext:
     """
@@ -61,6 +63,7 @@ class NornFlowDeviceContext:
         cls._initial_default_vars = default_vars.copy()
         cls._initial_env_vars = env_vars.copy()
         cls._shared_state_initialized = True
+        logger.info("NornFlowDeviceContext shared state initialized.")
 
     def __init__(self, host_name: str) -> None:
         """
@@ -207,4 +210,5 @@ class NornFlowDeviceContext:
         for layer in precedence_layers:
             flat_context.update(layer)
 
+        logger.debug(f"Built flat context for host '{self.host_name}' with {len(flat_context)} variables.")
         return flat_context
diff --git a/nornflow/vars/exceptions.py b/nornflow/vars/exceptions.py
index c1b1b98..cf9c44b 100644
--- a/nornflow/vars/exceptions.py
+++ b/nornflow/vars/exceptions.py
@@ -29,22 +29,3 @@ class VariableError(NornFlowError):
         super().__init__(f"{prefix}{message}")
         self.var_name = var_name
         self.host_name = host_name
-
-
-###############################################################################
-# TEMPLATE EXCEPTIONS
-###############################################################################
-
-
-class TemplateError(VariableError):
-    """
-    Base exception class for template rendering errors.
-    """
-
-    def __init__(self, message: str = "", template: str = ""):
-        # Truncate very long templates
-        template_preview = template[:97] + "..." if len(template) > 100 else template  # noqa: PLR2004
-
-        context = f" Template: '{template_preview}'" if template else ""
-        super().__init__(f"{message}{context}")
-        self.template = template
diff --git a/nornflow/vars/jinja2_utils.py b/nornflow/vars/jinja2_utils.py
deleted file mode 100644
index dcae046..0000000
--- a/nornflow/vars/jinja2_utils.py
+++ /dev/null
@@ -1,115 +0,0 @@
-from typing import Any
-
-from jinja2 import Environment, StrictUndefined, TemplateSyntaxError, UndefinedError
-
-from nornflow.builtins.jinja2_filters import ALL_FILTERS
-from nornflow.vars.exceptions import TemplateError, VariableError
-
-
-class Jinja2EnvironmentManager:
-    """Centralized Jinja2 environment management for NornFlow.
-
-    Provides a single source of truth for Jinja2 environment configuration
-    and template rendering with consistent error handling. All NornFlow custom
-    filters are automatically registered during initialization.
-    """
-
-    def __init__(self):
-        """Initialize the Jinja2 environment with NornFlow configuration.
-
-        Creates a Jinja2 environment with:
-        - StrictUndefined for catching missing variables
-        - Loop controls extension
-        - All NornFlow custom filters pre-registered
-        """
-        self.env = Environment(
-            undefined=StrictUndefined,
-            extensions=["jinja2.ext.loopcontrols"],
-            autoescape=False,  # noqa: S701
-        )
-
-        for filter_name, filter_func in ALL_FILTERS.items():
-            self.env.filters[filter_name] = filter_func
-
-    def render_template(self, template_str: str, context: dict[str, Any], error_context: str = "") -> str:
-        """Render a Jinja2 template string with the provided context.
-
-        Args:
-            template_str: The Jinja2 template string to render.
-            context: Dictionary of variables to use in template rendering.
-            error_context: Description of where this template is being used.
-
-        Returns:
-            The rendered template string.
-
-        Raises:
-            VariableError: If template contains undefined variables.
-            TemplateError: If template has syntax errors or other rendering issues.
-        """
-        try:
-            template = self.env.from_string(template_str)
-            return template.render(context)
-        except UndefinedError as e:
-            context_info = f" ({error_context})" if error_context else ""
-            raise VariableError(f"Undefined variable in template{context_info}: {e}") from e
-        except TemplateSyntaxError as e:
-            context_info = f" ({error_context})" if error_context else ""
-            raise TemplateError(f"Template syntax error{context_info}: {e}") from e
-        except Exception as e:
-            context_info = f" ({error_context})" if error_context else ""
-            raise TemplateError(f"Template rendering error{context_info}: {e}") from e
-
-
-def render_string(template_str: str, context: dict[str, Any], error_context: str = "") -> str:
-    """Convenience function for simple string rendering.
-
-    Args:
-        template_str: The template string to render.
-        context: Dictionary of variables for rendering.
-        error_context: Description for error messages.
-
-    Returns:
-        The rendered string.
-    """
-    manager = Jinja2EnvironmentManager()
-    return manager.render_template(template_str, context, error_context)
-
-
-def render_data_recursive(data: Any, context: dict[str, Any], error_context: str = "") -> Any:
-    """Recursively render Jinja2 templates in data structures.
-
-    Args:
-        data: The data structure to process (dict, list, string, etc.).
-        context: Dictionary of variables for rendering.
-        error_context: Description for error messages.
-
-    Returns:
-        The data structure with all templates rendered.
-    """
-    manager = Jinja2EnvironmentManager()
-    return _render_data_recursive_impl(data, context, manager, error_context)
-
-
-def _render_data_recursive_impl(
-    data: Any, context: dict[str, Any], manager: Jinja2EnvironmentManager, error_context: str
-) -> Any:
-    """Implementation of recursive data rendering.
-
-    Args:
-        data: The data to process.
-        context: Variables for rendering.
-        manager: The Jinja2 manager instance.
-        error_context: Description for error messages.
-
-    Returns:
-        The processed data.
-    """
-    if isinstance(data, str):
-        if any(marker in data for marker in ["{{", "{%", "{#"]):
-            return manager.render_template(data, context, error_context)
-        return data
-    if isinstance(data, dict):
-        return {k: _render_data_recursive_impl(v, context, manager, error_context) for k, v in data.items()}
-    if isinstance(data, list):
-        return [_render_data_recursive_impl(item, context, manager, error_context) for item in data]
-    return data
diff --git a/nornflow/vars/manager.py b/nornflow/vars/manager.py
index b7d42d7..a9a45b4 100644
--- a/nornflow/vars/manager.py
+++ b/nornflow/vars/manager.py
@@ -1,23 +1,21 @@
-import logging
 import os
 from pathlib import Path
 from typing import Any
 
-import jinja2.exceptions
 import yaml
+from pydantic_serdes.utils import load_file_to_dict
 
+from nornflow.j2 import Jinja2Service
+from nornflow.j2.exceptions import TemplateError
+from nornflow.logger import logger
 from nornflow.vars.constants import (
     DEFAULTS_FILENAME,
     ENV_VAR_PREFIX,
-    JINJA2_MARKERS,
 )
 from nornflow.vars.context import NornFlowDeviceContext
-from nornflow.vars.exceptions import TemplateError, VariableError
-from nornflow.vars.jinja2_utils import Jinja2EnvironmentManager
+from nornflow.vars.exceptions import VariableError
 from nornflow.vars.proxy import NornirHostProxy
 
-logger = logging.getLogger(__name__)
-
 # Constants for magic values
 MAX_LOG_VALUE_LENGTH = 80
 
@@ -182,8 +180,34 @@ class NornFlowVariablesManager:
             env_vars=self._env_vars,
         )
 
-        self._jinja2_manager = Jinja2EnvironmentManager()
+        self.jinja2 = Jinja2Service()
         self._device_contexts: dict[str, NornFlowDeviceContext] = {}
+        logger.debug(f"Initialized NornFlowVariablesManager with vars_dir: {self.vars_dir}")
+
+    @property
+    def cli_vars(self) -> dict[str, Any]:
+        """Get CLI variables (highest precedence assembly-time vars)."""
+        return self._cli_vars.copy()
+
+    @property
+    def inline_workflow_vars(self) -> dict[str, Any]:
+        """Get inline workflow variables."""
+        return self._inline_workflow_vars.copy()
+
+    @property
+    def domain_vars(self) -> dict[str, Any]:
+        """Get domain-specific default variables."""
+        return self._domain_vars.copy()
+
+    @property
+    def default_vars(self) -> dict[str, Any]:
+        """Get global default variables."""
+        return self._default_vars.copy()
+
+    @property
+    def env_vars(self) -> dict[str, Any]:
+        """Get environment variables (lowest precedence assembly-time vars)."""
+        return self._env_vars.copy()
 
     def _load_environment_variables(self) -> dict[str, Any]:
         """
@@ -295,25 +319,24 @@ class NornFlowVariablesManager:
             return {}
 
         try:
-            with file_path.open(encoding="utf-8") as f:
-                loaded_vars = yaml.safe_load(f)
-                if loaded_vars is None:
-                    logger.debug(
-                        f"{context_description} file '{file_path}' is empty or contains only null values."
-                    )
-                    return {}
-                if not isinstance(loaded_vars, dict):
-                    raise VariableError(
-                        f"Expected a dictionary from {context_description} file at '{file_path}', "
-                        f"but got {type(loaded_vars).__name__}."
-                    )
-                logger.debug(f"Successfully loaded {context_description} from '{file_path}'.")
-                return loaded_vars
+            loaded_vars = load_file_to_dict(file_path)
+            if not loaded_vars:
+                logger.debug(
+                    f"{context_description} file '{file_path}' is empty or contains only null values."
+                )
+                return {}
+            # this shouldn't happen as load_file_to_dict should always return dict
+            if not isinstance(loaded_vars, dict):
+                raise VariableError(
+                    f"Expected a dictionary from {context_description} file at '{file_path}', "
+                    f"but got {type(loaded_vars).__name__}."
+                )
+            logger.debug(f"Successfully loaded {context_description} from '{file_path}'.")
+            return loaded_vars
         except yaml.YAMLError as e:
-            logger.exception(f"YAML parsing error in {context_description} file '{file_path}'")
             raise VariableError(f"YAML parsing error in {context_description} file '{file_path}': {e}") from e
         except Exception as e:
-            logger.exception(f"Unexpected error loading {context_description} file '{file_path}'")
+            logger.exception(f"Unexpected error loading {context_description} file '{file_path}': {e}")
             raise VariableError(
                 f"Unexpected error loading {context_description} file '{file_path}': {e}"
             ) from e
@@ -345,13 +368,12 @@ class NornFlowVariablesManager:
             host_name: The name of the host for which this variable is being set.
         """
         if not host_name:
-            logger.error("Cannot set runtime variable: host_name is missing.")
-            return
+            raise VariableError("Cannot set runtime variable: host_name is missing.")
 
         ctx = self.get_device_context(host_name)
         ctx.runtime_vars[name] = value
         value_str = str(value)
-        logger.debug(
+        logger.info(
             f"Runtime variable '{name}' set for host '{host_name}'. Value: "
             f"{value_str[:MAX_LOG_VALUE_LENGTH]}"
             f"{'...' if len(value_str) > MAX_LOG_VALUE_LENGTH else ''}"
@@ -381,6 +403,7 @@ class NornFlowVariablesManager:
         flat_context = device_ctx.get_flat_context()
 
         if var_name in flat_context:
+            logger.debug(f"Retrieved NornFlow variable '{var_name}' for host '{host_name}'.")
             return flat_context[var_name]
 
         raise VariableError(
@@ -407,12 +430,6 @@ class NornFlowVariablesManager:
         Raises:
             TemplateError: If template resolution fails or host_name is missing.
         """
-        if not isinstance(template_str, str):
-            return template_str
-
-        if not any(marker in template_str for marker in JINJA2_MARKERS):
-            return template_str
-
         if not host_name:
             raise TemplateError(f"Host name not provided for template resolution: {template_str}")
 
@@ -424,21 +441,22 @@ class NornFlowVariablesManager:
             if additional_vars:
                 resolution_context_dict.update(additional_vars)
 
-            context_for_jinja = VariableLookupContext(self, host_name, resolution_context_dict)
+            context = VariableLookupContext(self, host_name, resolution_context_dict)
 
-            template = self._jinja2_manager.env.from_string(template_str)
-            return template.render(context_for_jinja)
-
-        except jinja2.exceptions.UndefinedError as e:
-            logger.exception(f"Jinja2 UndefinedError for host '{host_name}' in template '{template_str}'")
-            raise TemplateError(f"Undefined variable in template '{template_str}': {e}") from e
-        except VariableError as e:
-            logger.exception(f"NornFlow VariableError for host '{host_name}' in template '{template_str}'")
-            raise TemplateError(f"Variable error in template '{template_str}': {e}") from e
+            result = self.jinja2.resolve_string(
+                template_str, context, error_context=f"variable resolution for host {host_name}"
+            )
+            logger.debug(
+                f"Resolved template string for host '{host_name}': "
+                f"'{template_str}' -> length: '{len(result)}'"
+            )
+            return result
+        except TemplateError as e:
+            logger.error(f"Template error resolving string '{template_str}' for host '{host_name}': {e}")
+            raise
         except Exception as e:
             logger.exception(
-                f"Jinja2 TemplateError or unexpected issue for host '{host_name}' "
-                f"in template '{template_str}'"
+                f"Unexpected error resolving template '{template_str}' for host '{host_name}': {e}"
             )
             raise TemplateError(f"Template rendering error in '{template_str}': {e}") from e
 
@@ -454,16 +472,27 @@ class NornFlowVariablesManager:
         Returns:
             The data structure with all templates resolved.
         """
-        if isinstance(data, str):
-            # Check if the string contains Jinja2 markers
-            if any(marker in data for marker in JINJA2_MARKERS):
-                return self.resolve_string(data, host_name, additional_vars)
-            return data
-        if isinstance(data, dict):
-            return {k: self.resolve_data(v, host_name, additional_vars) for k, v in data.items()}
-        if isinstance(data, (list, tuple)):
-            # Convert both lists and tuples to lists after resolving items
-            # This ensures YAML-defined lists remain lists, even if converted to tuples for hashability
-            return [self.resolve_data(item, host_name, additional_vars) for item in data]
-        # Return other types as-is
-        return data
+        if not host_name:
+            raise TemplateError("Host name not provided for data resolution")
+
+        try:
+            device_ctx = self.get_device_context(host_name)
+            nornflow_default_vars = device_ctx.get_flat_context()
+
+            resolution_context_dict = nornflow_default_vars.copy()
+            if additional_vars:
+                resolution_context_dict.update(additional_vars)
+
+            context = VariableLookupContext(self, host_name, resolution_context_dict)
+
+            result = self.jinja2.resolve_data(
+                data, context, error_context=f"data resolution for host {host_name}"
+            )
+            logger.debug(f"Resolved data structure for host '{host_name}'.")
+            return result
+        except TemplateError as e:
+            logger.error(f"Template error resolving data for host '{host_name}': {e}")
+            raise
+        except Exception as e:
+            logger.exception(f"Unexpected error resolving data for host '{host_name}': {e}")
+            raise TemplateError(f"Data resolution error: {e}") from e
diff --git a/nornflow/vars/processors.py b/nornflow/vars/processors.py
index f50b672..34f4bd8 100644
--- a/nornflow/vars/processors.py
+++ b/nornflow/vars/processors.py
@@ -31,17 +31,15 @@ This is particularly useful for hooks that require evaluating Jinja2 template in
 BEFORE anything else is evaluated by the Jinja2 Environment in the same task execution.
 """
 
-import logging
 from typing import Any
 
 from nornir.core.inventory import Host
 from nornir.core.processor import Processor
 from nornir.core.task import MultiResult, Task
 
+from nornflow.logger import logger
 from nornflow.vars.manager import NornFlowVariablesManager
 
-logger = logging.getLogger(__name__)
-
 
 class NornFlowVariableProcessor(Processor):
     """
@@ -139,7 +137,7 @@ class NornFlowVariableProcessor(Processor):
             return resolved_params
 
         except Exception:
-            logger.exception(f"Error resolving templates for task '{task.name}' on host '{host.name}'")
+            logger.exception(f"Error resolving deferred params for task '{task.name}' on host '{host.name}'")
             self._deferred_params.pop(key, None)
             raise
 
diff --git a/nornflow/vars/proxy.py b/nornflow/vars/proxy.py
index d862753..078e663 100644
--- a/nornflow/vars/proxy.py
+++ b/nornflow/vars/proxy.py
@@ -1,13 +1,11 @@
-import logging
 from typing import Any
 
 from nornir.core import Nornir
 from nornir.core.inventory import Host
 
+from nornflow.logger import logger
 from nornflow.vars.exceptions import VariableError
 
-logger = logging.getLogger(__name__)
-
 
 class NornirHostProxy:
     """
diff --git a/pyproject.toml b/pyproject.toml
index 17e501b..3f6d1c0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
 [project]
 name = "nornflow"
-version = "0.6.0"
+version = "0.7.0"
 description = "A workflow orchestration tool for network automation built around Nornir."
 readme = "README.md"
 requires-python = ">=3.10"
@@ -12,7 +12,7 @@ dependencies = [
     "jmespath>=1.0.1",
     "nornir>=3.5.0",
     "nornir-utils>=0.2.0",
-    "pydantic-serdes>=1.0.3",
+    "pydantic-serdes>=1.0.4",
     "pydantic-settings>=2.12.0",
     "pytest>=8.3.4",
     "pyyaml>=6.0.2",
@@ -37,9 +37,9 @@ include = ["nornflow*"]
 
 [project.optional-dependencies]
 dev = [
-    "black>=25.1.0",
+    "black>=25.12.0",
     "mypy>=1.15.0",
-    "ruff>=0.9.4",
+    "ruff>=0.14.10",
     "pytest-cov>=6.0.0",
 ]
 manual-tests = [
@@ -113,4 +113,4 @@ line-length = 110
 extend-exclude = '(^|/)tests/|(^|/)debug_.*\.py$'
 
 [project.scripts]
-nornflow = "nornflow.cli.entrypoint:app"
+nornflow = "nornflow.cli.entrypoint:app"
\ No newline at end of file
diff --git a/tests/unit/blueprints/conftest.py b/tests/unit/blueprints/conftest.py
index da53a2d..2278f6f 100644
--- a/tests/unit/blueprints/conftest.py
+++ b/tests/unit/blueprints/conftest.py
@@ -1,22 +1,17 @@
 import pytest
-from nornflow.blueprints.resolver import BlueprintResolver
-from nornflow.vars.jinja2_utils import Jinja2EnvironmentManager
-
+from pathlib import Path
 
-@pytest.fixture
-def jinja2_manager():
-    """Provides a Jinja2EnvironmentManager instance for blueprint tests."""
-    return Jinja2EnvironmentManager()
+from nornflow.blueprints.resolver import BlueprintResolver
 
 
 @pytest.fixture
-def blueprint_resolver(jinja2_manager):
+def blueprint_resolver() -> BlueprintResolver:
     """Provides a BlueprintResolver instance for blueprint tests."""
-    return BlueprintResolver(jinja2_manager)
+    return BlueprintResolver()
 
 
 @pytest.fixture
-def mock_blueprints_catalog(tmp_path):
+def mock_blueprints_catalog(tmp_path: Path) -> dict[str, Path]:
     """Provides a mock blueprint catalog with sample files."""
     catalog = {}
     # Create a sample blueprint file
@@ -34,7 +29,7 @@ tasks:
 
 
 @pytest.fixture
-def mock_vars_dir(tmp_path):
+def mock_vars_dir(tmp_path: Path) -> Path:
     """Provides a mock vars directory with defaults."""
     vars_dir = tmp_path / "vars"
     vars_dir.mkdir()
@@ -44,7 +39,7 @@ def mock_vars_dir(tmp_path):
 
 
 @pytest.fixture
-def mock_workflow_path(tmp_path):
+def mock_workflow_path(tmp_path: Path) -> Path:
     """Provides a mock workflow path."""
     workflow_file = tmp_path / "workflow.yaml"
     workflow_file.write_text("workflow: {}")
@@ -52,12 +47,12 @@ def mock_workflow_path(tmp_path):
 
 
 @pytest.fixture
-def mock_workflow_roots(tmp_path):
+def mock_workflow_roots(tmp_path: Path) -> list[str]:
     """Provides mock workflow roots."""
     return [str(tmp_path / "workflows")]
 
 
 @pytest.fixture
-def mock_cli_vars():
+def mock_cli_vars() -> dict[str, str]:
     """Provides mock CLI variables."""
     return {"cli_var": "cli_value"}
\ No newline at end of file
diff --git a/tests/unit/blueprints/test_blueprint_expander.py b/tests/unit/blueprints/test_blueprint_expander.py
index 929cf70..d6a8b40 100644
--- a/tests/unit/blueprints/test_blueprint_expander.py
+++ b/tests/unit/blueprints/test_blueprint_expander.py
@@ -8,7 +8,7 @@ class TestBlueprintExpander:
 
     def test_expand_blueprints_no_catalog(self, blueprint_resolver):
         """Test expansion with no blueprints catalog."""
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"name": "task1", "task": "echo"}]
         result = expander.expand_blueprints(
             tasks=tasks,
@@ -22,7 +22,7 @@ class TestBlueprintExpander:
 
     def test_expand_blueprints_simple_blueprint(self, blueprint_resolver, mock_blueprints_catalog, mock_vars_dir, mock_workflow_path, mock_workflow_roots, mock_cli_vars):
         """Test expanding a simple blueprint reference."""
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "sample"}]
         result = expander.expand_blueprints(
             tasks=tasks,
@@ -38,7 +38,7 @@ class TestBlueprintExpander:
 
     def test_expand_blueprints_with_condition_true(self, blueprint_resolver, mock_blueprints_catalog, mock_vars_dir, mock_workflow_path, mock_workflow_roots):
         """Test blueprint expansion with a true condition."""
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "sample", "if": "true"}]
         result = expander.expand_blueprints(
             tasks=tasks,
@@ -52,7 +52,7 @@ class TestBlueprintExpander:
 
     def test_expand_blueprints_with_condition_false(self, blueprint_resolver, mock_blueprints_catalog, mock_vars_dir, mock_workflow_path, mock_workflow_roots):
         """Test blueprint expansion with a false condition."""
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "sample", "if": "false"}]
         result = expander.expand_blueprints(
             tasks=tasks,
@@ -81,7 +81,7 @@ tasks:
 """)
         catalog = {"parent": parent_blueprint, "child": child_blueprint}
 
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "parent"}]
         result = expander.expand_blueprints(
             tasks=tasks,
@@ -110,7 +110,7 @@ tasks:
 """)
         catalog = {"a": blueprint_a, "b": blueprint_b}
 
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "a"}]
         with pytest.raises(BlueprintCircularDependencyError):
             expander.expand_blueprints(
@@ -126,7 +126,7 @@ tasks:
         """Test error when blueprint is missing."""
         monkeypatch.chdir(tmp_path)
         
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "this_blueprint_absolutely_does_not_exist_anywhere_xyz123"}]
         with pytest.raises(BlueprintError, match="Blueprint not found in catalog or filesystem"):
             expander.expand_blueprints(
@@ -146,7 +146,7 @@ tasks:
         invalid_blueprint.write_text("invalid: yaml: content: [")
         catalog = {"invalid": invalid_blueprint}
 
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "invalid"}]
         with pytest.raises(ResourceError, match="Failed to hash file content"):
             expander.expand_blueprints(
@@ -166,9 +166,9 @@ tasks:
         invalid_blueprint.write_text("not_tasks: []")
         catalog = {"invalid": invalid_blueprint}
 
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "invalid"}]
-        with pytest.raises(BlueprintError, match="Blueprint must contain ONLY 'tasks' key"):
+        with pytest.raises(BlueprintError, match=r"(?s)Field required.*Extra inputs are not permitted"):
             expander.expand_blueprints(
                 tasks=tasks,
                 blueprints_catalog=catalog,
@@ -186,9 +186,9 @@ tasks:
         invalid_blueprint.write_text("tasks: not_a_list")
         catalog = {"invalid": invalid_blueprint}
 
-        expander = BlueprintExpander(blueprint_resolver)
+        expander = BlueprintExpander()
         tasks = [{"blueprint": "invalid"}]
-        with pytest.raises(BlueprintError, match="'tasks' must be a list"):
+        with pytest.raises(BlueprintError, match=r"Input should be a valid list"):
             expander.expand_blueprints(
                 tasks=tasks,
                 blueprints_catalog=catalog,
diff --git a/tests/unit/blueprints/test_blueprint_resolver.py b/tests/unit/blueprints/test_blueprint_resolver.py
index 58ae70c..5f61aa3 100644
--- a/tests/unit/blueprints/test_blueprint_resolver.py
+++ b/tests/unit/blueprints/test_blueprint_resolver.py
@@ -108,7 +108,7 @@ class TestBlueprintResolver:
     def test_resolve_template_syntax_error(self, blueprint_resolver):
         """Test that template with syntax error raises error."""
         context = {}
-        with pytest.raises(BlueprintError, match="Template syntax error"):
+        with pytest.raises(BlueprintError, match="Template rendering error"):
             blueprint_resolver.resolve_template("{{ unclosed", context)
 
     def test_evaluate_condition_true(self, blueprint_resolver):
diff --git a/tests/unit/builtins/test_if_hook.py b/tests/unit/builtins/test_if_hook.py
index d75576d..a0ebf7c 100644
--- a/tests/unit/builtins/test_if_hook.py
+++ b/tests/unit/builtins/test_if_hook.py
@@ -49,8 +49,10 @@ class TestIfHook:
         """Test validation passes for valid Jinja2 expression."""
         hook = IfHook("{{ host.platform == 'ios' }}")
         mock_task_model = MagicMock()
-
-        hook.execute_hook_validations(mock_task_model)
+        
+        # Mock the Jinja2Service's compile_template method to avoid actual compilation
+        with patch('nornflow.j2.Jinja2Service.compile_template'):
+            hook.execute_hook_validations(mock_task_model)
 
     def test_execute_hook_validations_invalid_multiple_filters(self):
         """Test validation fails for dict with multiple filter keys."""
@@ -336,4 +338,4 @@ class TestIfHook:
         hook.task_completed(mock_task, mock_result)
         hook.task_instance_completed(mock_task, mock_host, mock_result)
         hook.subtask_instance_started(mock_task, mock_host)
-        hook.subtask_instance_completed(mock_task, mock_host, mock_result)
\ No newline at end of file
+        hook.subtask_instance_completed(mock_task, mock_host, mock_result)
diff --git a/tests/unit/builtins/test_shush_hook.py b/tests/unit/builtins/test_shush_hook.py
index addd380..2cae8d1 100644
--- a/tests/unit/builtins/test_shush_hook.py
+++ b/tests/unit/builtins/test_shush_hook.py
@@ -1,4 +1,6 @@
 # ruff: noqa: SLF001, T201
+import logging
+
 from unittest.mock import MagicMock
 from nornflow.builtins.hooks import ShushHook
 
@@ -60,30 +62,28 @@ class TestShushHook:
 
         assert not hasattr(mock_task.nornir, '_nornflow_suppressed_tasks')
 
-    def test_task_started_warns_when_no_compatible_processor(self, capsys, mock_task):
+    def test_task_started_warns_when_no_compatible_processor(self, caplog, mock_task):
         """Test task_started warns when no processor supports shush hook."""
         hook = ShushHook(True)
         
         mock_task.nornir.processors = []
 
-        hook.task_started(mock_task)
+        with caplog.at_level(logging.WARNING, logger="nornflow"):
+            hook.task_started(mock_task)
 
-        captured = capsys.readouterr()
-        assert "Warning" in captured.out
-        assert "no compatible processor found" in captured.out
+            assert "Warning: 'shush' hook has no effect - no compatible processor found in chain. Outputs are not going to be suppressed." in caplog.text
         assert not hasattr(mock_task.nornir, '_nornflow_suppressed_tasks')
 
-    def test_task_started_warns_when_processor_lacks_support(self, capsys, mock_task, mock_processor_incompatible):
+    def test_task_started_warns_when_processor_lacks_support(self, caplog, mock_task, mock_processor_incompatible):
         """Test task_started warns when processor doesn't have supports_shush_hook attribute."""
         hook = ShushHook(True)
         
         mock_task.nornir.processors = [mock_processor_incompatible]
 
-        hook.task_started(mock_task)
+        with caplog.at_level(logging.WARNING, logger="nornflow"):
+            hook.task_started(mock_task)
 
-        captured = capsys.readouterr()
-        assert "Warning: 'shush' hook has no effect - " in captured.out
-        assert "no compatible processor found in chain. Outputs are not going to be suppressed." in captured.out
+            assert "Warning: 'shush' hook has no effect - no compatible processor found in chain. Outputs are not going to be suppressed." in caplog.text
         assert not hasattr(mock_task.nornir, '_nornflow_suppressed_tasks')
 
     def test_task_started_sets_suppression_marker_with_compatible_processor(self, mock_task, mock_processor_compatible):
diff --git a/tests/unit/cli/test_show.py b/tests/unit/cli/test_show.py
index 9d7c13e..2ac4e6a 100644
--- a/tests/unit/cli/test_show.py
+++ b/tests/unit/cli/test_show.py
@@ -6,7 +6,9 @@ import typer
 import yaml
 
 from nornflow.cli.show import (
+    render_blueprints_catalog_table_data,
     render_filters_catalog_table_data,
+    render_j2_filters_catalog_table_data,
     render_nornir_cfgs_table_data,
     render_settings_table_data,
     render_task_catalog_table_data,
@@ -175,7 +177,7 @@ class TestShowCommand:
 
         with pytest.raises(typer.BadParameter):
             show(mock_ctx, catalog=False, catalogs=False, tasks=False, filters=False,
-                 workflows=False, blueprints=False, settings=False, nornir_configs=False, all=False)
+                 workflows=False, blueprints=False, j2_filters=False, settings=False, nornir_configs=False, all=False)
 
     @patch("nornflow.cli.show.NornFlowBuilder")
     @patch("nornflow.cli.show.CLIShowError")
@@ -256,7 +258,7 @@ class TestShowHelpers:
 
         show_catalog(mock_nornflow)
 
-        assert mock_show_table.call_count == 4
+        assert mock_show_table.call_count == 5
         calls = [
             call(
                 "TASKS CATALOG",
@@ -276,6 +278,18 @@ class TestShowHelpers:
                 ["Workflow Name", "Description", "Source (file path)"],
                 mock_nornflow,
             ),
+            call(
+                "BLUEPRINTS CATALOG",
+                render_blueprints_catalog_table_data,
+                ["Blueprint Name", "Description", "Source (file path)"],
+                mock_nornflow,
+            ),
+            call(
+                "JINJA2 FILTERS CATALOG",
+                render_j2_filters_catalog_table_data,
+                ["Filter Name", "Description", "Source"],
+                mock_nornflow,
+            ),
         ]
         mock_show_table.assert_has_calls(calls)
 
@@ -393,6 +407,57 @@ class TestTableRenderers:
         for row in result:
             assert len(row) == 3
 
+    @patch("nornflow.cli.show.get_source_from_catalog")
+    def test_render_j2_filters_catalog_table_data(self, mock_get_source):
+        """Test render_j2_filters_catalog_table_data generates Jinja2 filters catalog table data."""
+        mock_nornflow = MagicMock()
+        filter_func1 = MagicMock(__doc__="Test Jinja2 filter 1 description.")
+        filter_func2 = MagicMock(__doc__="Test Jinja2 filter 2 description.")
+        
+        mock_j2_filters_catalog = MagicMock()
+        mock_j2_filters_catalog.get_builtin_items.return_value = ["j2_filter1"]
+        mock_j2_filters_catalog.get_custom_items.return_value = ["j2_filter2"]
+        mock_j2_filters_catalog.__getitem__.side_effect = lambda x: filter_func1 if x == "j2_filter1" else filter_func2
+        mock_nornflow.j2_filters_catalog = mock_j2_filters_catalog
+        
+        mock_get_source.return_value = "test.module"
+
+        result = render_j2_filters_catalog_table_data(mock_nornflow)
+
+        assert len(result) == 2
+        for row in result:
+            assert len(row) == 3
+
+    @patch("yaml.safe_load")
+    @patch("nornflow.cli.show.get_source_from_catalog")
+    def test_render_blueprints_catalog_table_data(self, mock_get_source, mock_safe_load):
+        """Test render_blueprints_catalog_table_data generates blueprints catalog table data."""
+        mock_nornflow = MagicMock()
+        blueprint_path1 = MagicMock(spec=Path)
+        blueprint_path2 = MagicMock(spec=Path)
+        
+        mock_nornflow.blueprints_catalog.items.return_value = [
+            ("blueprint1", blueprint_path1),
+            ("blueprint2", blueprint_path2),
+        ]
+
+        mock_safe_load.side_effect = [
+            {"description": "Test blueprint 1"},
+            {"blueprint": {"description": "Test blueprint 2"}},
+        ]
+        
+        mock_get_source.return_value = "./blueprints/test.yaml"
+
+        with patch("pathlib.Path.open", create=True) as mock_open:
+            mock_file = MagicMock()
+            mock_open.return_value.__enter__.return_value = mock_file
+
+            result = render_blueprints_catalog_table_data(mock_nornflow)
+
+            assert len(result) == 2
+            for row in result:
+                assert len(row) == 3
+
     def test_render_settings_table_data(self):
         """Test render_settings_table_data generates settings table data."""
         mock_nornflow = MagicMock()
@@ -413,4 +478,4 @@ class TestTableRenderers:
 
         assert len(result) == 2
         for row in result:
-            assert len(row) == 2
+            assert len(row) == 2
\ No newline at end of file
diff --git a/tests/unit/core/test_failure_strategies.py b/tests/unit/core/test_failure_strategies.py
index 67b1e0f..ddb7b90 100644
--- a/tests/unit/core/test_failure_strategies.py
+++ b/tests/unit/core/test_failure_strategies.py
@@ -427,6 +427,7 @@ class TestNornFlowFailureStrategy:
         }
         
         workflow_model = MagicMock(spec=WorkflowModel)
+        workflow_model.name = "test_workflow"
         workflow_model.failure_strategy = FailureStrategy.SKIP_FAILED
         
         settings = NornFlowSettings(nornir_config_file="mock_config.yaml")
@@ -453,6 +454,7 @@ class TestNornFlowFailureStrategy:
         }
         
         workflow_model = MagicMock(spec=WorkflowModel)
+        workflow_model.name = "test_workflow"
         workflow_model.failure_strategy = FailureStrategy.RUN_ALL
         
         settings = NornFlowSettings(nornir_config_file="mock_config.yaml")
diff --git a/tests/unit/core/test_nornir_manager.py b/tests/unit/core/test_nornir_manager.py
index 7e2e45f..72a260d 100644
--- a/tests/unit/core/test_nornir_manager.py
+++ b/tests/unit/core/test_nornir_manager.py
@@ -86,6 +86,9 @@ class TestNornirManager:
         """Test applying valid filters."""
         manager = NornirManager(nornir_settings="dummy_config.yaml")
 
+        mock_nornir.inventory = MagicMock()
+        mock_nornir.inventory.hosts = {}
+
         # Apply some filters
         result = manager.apply_filters(name="device1", group="routers")
 
@@ -110,6 +113,8 @@ class TestNornirManager:
         """Test applying valid processors."""
         manager = NornirManager(nornir_settings="dummy_config.yaml")
 
+        mock_nornir.processors = []
+
         # Apply the processor
         result = manager.apply_processors([mock_processor])
 
diff --git a/tests/unit/core/test_nornir_manager_filtering.py b/tests/unit/core/test_nornir_manager_filtering.py
index b599b13..8f3b331 100644
--- a/tests/unit/core/test_nornir_manager_filtering.py
+++ b/tests/unit/core/test_nornir_manager_filtering.py
@@ -20,6 +20,7 @@ class TestNornirManagerFilters:
         mock = Mock()
         # Setup .filter() to return the mock itself for chaining
         mock.filter.return_value = mock
+        mock.inventory.hosts = {}
         return mock
 
     @pytest.fixture
@@ -52,7 +53,7 @@ class TestNornirManagerFilters:
         test_tuple = ("host1", "host2")
         manager.apply_filters(hosts=test_tuple)
 
-        # Verify filter was called with tuple intact
+        # Verify filter was called with hosts as tuple
         mock_nornir.filter.assert_called_with(hosts=test_tuple)
 
     def test_multiple_filters(self, manager, mock_nornir):
diff --git a/tests/unit/core/test_processors.py b/tests/unit/core/test_processors.py
index 2459067..5de9c91 100644
--- a/tests/unit/core/test_processors.py
+++ b/tests/unit/core/test_processors.py
@@ -1,3 +1,4 @@
+# filepath: test_processors.py
 from unittest.mock import MagicMock, patch
 
 import pytest
@@ -60,6 +61,9 @@ class TestNornFlowProcessors:
         # Mock the nornir config file check to avoid file not found errors
         settings.nornir_config_file = None
 
+        # Configure logger settings to return proper strings
+        settings.logger = {"directory": "/tmp/logs", "level": "INFO"}
+
         # Create kwargs with processors
         kwargs_processors = [
             {
@@ -95,6 +99,9 @@ class TestProcessorPrecedence:
         settings.nornir_config_file = None  # Avoid file not found errors
         settings.dry_run = False
 
+        # Configure logger settings to return proper strings
+        settings.logger = {"directory": "/tmp/logs", "level": "INFO"}
+
         # Create workflow with processors - must include at least one task
         workflow_dict = {
             "workflow": {
@@ -117,6 +124,7 @@ class TestProcessorPrecedence:
                 {"class": "tests.unit.core.test_processors_utils.TestProcessor", "args": {"name": "WorkflowProc"}}
             ]
             workflow_model.dry_run = False
+            workflow_model.name = "Test"
             mock_create.return_value = workflow_model
 
             # Create kwargs processors
@@ -152,4 +160,4 @@ class TestProcessorPrecedence:
                 # Verify that kwargs processors took precedence over workflow processors
                 assert len(nornflow.processors) == 1
                 assert isinstance(nornflow.processors[0], TestProcessor2)
-                assert nornflow.processors[0].name == "KwargsProc"
+                assert nornflow.processors[0].name == "KwargsProc"
\ No newline at end of file
diff --git a/tests/unit/core/test_utils.py b/tests/unit/core/test_utils.py
index 3fa258d..1d21945 100644
--- a/tests/unit/core/test_utils.py
+++ b/tests/unit/core/test_utils.py
@@ -607,7 +607,7 @@ class TestConvertListsToTuples:
         assert result is None
 
     def test_convert_empty_dict(self):
-        """Test converting empty dictionary."""
+        """Test converting empty dictionary returns empty dict."""
         input_dict = HashableDict({})
 
         result = convert_lists_to_tuples(input_dict)
@@ -793,6 +793,23 @@ class TestFormatVariableValue:
 class TestPrintWorkflowOverview:
     """Tests for print_workflow_overview function."""
 
+    def _create_mock_vars_manager(
+        self,
+        env_vars: dict | None = None,
+        default_vars: dict | None = None,
+        domain_vars: dict | None = None,
+        inline_workflow_vars: dict | None = None,
+        cli_vars: dict | None = None,
+    ) -> Mock:
+        """Create a mock NornFlowVariablesManager with specified vars."""
+        mock_manager = Mock()
+        mock_manager.env_vars = env_vars or {}
+        mock_manager.default_vars = default_vars or {}
+        mock_manager.domain_vars = domain_vars or {}
+        mock_manager.inline_workflow_vars = inline_workflow_vars or {}
+        mock_manager.cli_vars = cli_vars or {}
+        return mock_manager
+
     @patch("nornflow.utils.Console")
     def test_print_basic_overview(self, mock_console):
         """Test printing basic workflow overview."""
@@ -805,9 +822,8 @@ class TestPrintWorkflowOverview:
             effective_dry_run=False,
             hosts_count=5,
             inventory_filters={},
-            workflow_vars={},
-            vars={},
-            failure_strategy=FailureStrategy.FAIL_FAST
+            failure_strategy=FailureStrategy.FAIL_FAST,
+            vars_manager=None,
         )
 
         mock_console.return_value.print.assert_called_once()
@@ -819,14 +835,18 @@ class TestPrintWorkflowOverview:
         workflow_model.name = "Test"
         workflow_model.description = None
 
+        vars_manager = self._create_mock_vars_manager(
+            inline_workflow_vars={"var1": "value1"},
+            cli_vars={"var2": "value2"},
+        )
+
         print_workflow_overview(
             workflow_model=workflow_model,
             effective_dry_run=True,
             hosts_count=3,
             inventory_filters={"platform": "ios", "groups": ["core"]},
-            workflow_vars={"var1": "value1"},
-            vars={"var2": "value2"},
-            failure_strategy=None
+            failure_strategy=None,
+            vars_manager=vars_manager,
         )
 
         mock_console.return_value.print.assert_called_once()
@@ -838,14 +858,18 @@ class TestPrintWorkflowOverview:
         workflow_model.name = "Test"
         workflow_model.description = None
 
+        vars_manager = self._create_mock_vars_manager(
+            inline_workflow_vars={"workflow_var": "wf_value"},
+            cli_vars={"cli_var": "cli_value"},
+        )
+
         print_workflow_overview(
             workflow_model=workflow_model,
             effective_dry_run=False,
             hosts_count=1,
             inventory_filters={},
-            workflow_vars={"workflow_var": "wf_value"},
-            vars={"cli_var": "cli_value"},
-            failure_strategy=FailureStrategy.SKIP_FAILED
+            failure_strategy=FailureStrategy.SKIP_FAILED,
+            vars_manager=vars_manager,
         )
 
         mock_console.return_value.print.assert_called_once()
@@ -862,9 +886,8 @@ class TestPrintWorkflowOverview:
             effective_dry_run=False,
             hosts_count=10,
             inventory_filters={},
-            workflow_vars={},
-            vars={},
-            failure_strategy=FailureStrategy.FAIL_FAST
+            failure_strategy=FailureStrategy.FAIL_FAST,
+            vars_manager=None,
         )
 
         mock_console.return_value.print.assert_called_once()
@@ -876,6 +899,14 @@ class TestPrintWorkflowOverview:
         workflow_model.name = "Complete Workflow"
         workflow_model.description = "A comprehensive test"
 
+        vars_manager = self._create_mock_vars_manager(
+            env_vars={"env_var": "from_env"},
+            default_vars={"global_var": "from_defaults"},
+            domain_vars={"domain_var": "from_domain"},
+            inline_workflow_vars={"timeout": 30, "retries": 3},
+            cli_vars={"user": "admin", "debug": True},
+        )
+
         print_workflow_overview(
             workflow_model=workflow_model,
             effective_dry_run=True,
@@ -885,15 +916,8 @@ class TestPrintWorkflowOverview:
                 "site": "DC1",
                 "groups": ["core", "edge"]
             },
-            workflow_vars={
-                "timeout": 30,
-                "retries": 3
-            },
-            vars={
-                "user": "admin",
-                "debug": True
-            },
-            failure_strategy=FailureStrategy.SKIP_FAILED
+            failure_strategy=FailureStrategy.SKIP_FAILED,
+            vars_manager=vars_manager,
         )
 
         mock_console.return_value.print.assert_called_once()
\ No newline at end of file
diff --git a/tests/unit/hooks/test_base.py b/tests/unit/hooks/test_base.py
index abfe074..1834ef5 100644
--- a/tests/unit/hooks/test_base.py
+++ b/tests/unit/hooks/test_base.py
@@ -112,14 +112,11 @@ class TestHook:
         assert HOOK_REGISTRY["test_auto_hook"] == TestAutoHook
         assert len(HOOK_REGISTRY) == initial_registry_size + 1
 
-    def test_no_registration_without_hook_name(self):
-        """Test that hooks without hook_name are not registered."""
-        initial_registry_size = len(HOOK_REGISTRY)
-        
-        class TestNoNameHook(Hook):
-            pass
-        
-        assert len(HOOK_REGISTRY) == initial_registry_size
+    def test_no_hook_name_raises_error(self):
+        """Test that missing hook_name raises HookRegistrationError."""
+        with pytest.raises(HookRegistrationError):
+            class NoNameHook(Hook):
+                pass
 
     def test_duplicate_registration_same_class(self):
         """Test that re-importing same class doesn't raise error."""
@@ -149,4 +146,4 @@ class TestHook:
         assert HOOK_REGISTRY["set_to"] == SetToHook
         
         assert "shush" in HOOK_REGISTRY
-        assert HOOK_REGISTRY["shush"] == ShushHook
+        assert HOOK_REGISTRY["shush"] == ShushHook
\ No newline at end of file
diff --git a/tests/unit/j2/__init__.py b/tests/unit/j2/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/unit/j2/conftest.py b/tests/unit/j2/conftest.py
new file mode 100644
index 0000000..d41068e
--- /dev/null
+++ b/tests/unit/j2/conftest.py
@@ -0,0 +1,16 @@
+"""Shared fixtures for j2 unit tests."""
+
+import pytest
+
+from nornflow.j2 import Jinja2Service
+
+
+@pytest.fixture
+def jinja2_service():
+    """Provide a fresh Jinja2Service instance for testing."""
+    # Reset singleton for isolation
+    Jinja2Service._instance = None
+    service = Jinja2Service()
+    yield service
+    # Cleanup after test
+    Jinja2Service._instance = None
diff --git a/tests/unit/j2/test_core.py b/tests/unit/j2/test_core.py
new file mode 100644
index 0000000..e08c7bb
--- /dev/null
+++ b/tests/unit/j2/test_core.py
@@ -0,0 +1,289 @@
+"""Unit tests for nornflow.j2.core module."""
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+from jinja2 import Environment
+
+from nornflow.j2 import Jinja2Service
+from nornflow.j2.exceptions import Jinja2ServiceError, TemplateError, TemplateValidationError
+
+
+class TestJinja2Service:
+    """Test suite for Jinja2Service singleton."""
+
+    def test_singleton_behavior(self):
+        """Test that Jinja2Service is a singleton."""
+        service1 = Jinja2Service()
+        service2 = Jinja2Service()
+        assert service1 is service2
+
+    def test_environment_property(self):
+        """Test that environment property returns a valid Environment instance."""
+        service = Jinja2Service()
+        env = service.environment
+        assert isinstance(env, Environment)
+
+    @patch.object(Jinja2Service, "compile_template")
+    def test_resolve_string_success(self, mock_compile):
+        """Test resolve_string with valid template."""
+        mock_template = MagicMock()
+        mock_template.render.return_value = "resolved"
+        mock_compile.return_value = mock_template
+
+        service = Jinja2Service()
+        result = service.resolve_string("{{ var }}", {"var": "value"})
+
+        assert result == "resolved"
+        mock_compile.assert_called_once_with("{{ var }}")
+        mock_template.render.assert_called_once_with({"var": "value"})
+
+    @patch.object(Jinja2Service, "compile_template")
+    def test_resolve_string_undefined_error(self, mock_compile):
+        """Test resolve_string raises TemplateError on undefined variable."""
+        from jinja2 import UndefinedError
+        mock_compile.side_effect = UndefinedError("Undefined variable")
+
+        service = Jinja2Service()
+        with pytest.raises(TemplateError, match="Undefined variable"):
+            service.resolve_string("{{ invalid }}", {})
+
+    @patch.object(Jinja2Service, "compile_template")
+    def test_resolve_string_syntax_error(self, mock_compile):
+        """Test resolve_string raises TemplateError on syntax error."""
+        from jinja2 import TemplateSyntaxError
+        mock_compile.side_effect = TemplateSyntaxError("Syntax error", 1, "template")
+
+        service = Jinja2Service()
+        with pytest.raises(TemplateError, match="Template syntax error"):
+            service.resolve_string("{{ invalid", {})
+
+    def test_resolve_string_non_template(self):
+        """Test resolve_string returns plain string if not a template."""
+        service = Jinja2Service()
+        result = service.resolve_string("plain text", {})
+        assert result == "plain text"
+
+    def test_resolve_string_invalid_type(self):
+        """Test resolve_string raises error for non-string input."""
+        service = Jinja2Service()
+        with pytest.raises(TemplateValidationError, match="Expected string"):
+            service.resolve_string(123, {})
+
+    def test_resolve_data_dict(self):
+        """Test resolve_data with dict input."""
+        with patch.object(Jinja2Service, "resolve_string") as mock_resolve:
+            mock_resolve.return_value = "resolved_value"
+
+            service = Jinja2Service()
+            data = {"key": "{{ var }}"}
+            context = {"var": "test"}
+
+            result = service.resolve_data(data, context)
+
+            assert result == {"key": "resolved_value"}
+            mock_resolve.assert_called_once_with("{{ var }}", context, "")
+
+    def test_resolve_data_list(self):
+        """Test resolve_data with list input."""
+        with patch.object(Jinja2Service, "resolve_string") as mock_resolve:
+            mock_resolve.return_value = "item1"
+
+            service = Jinja2Service()
+            data = ["{{ var }}"]
+            context = {"var": "item1"}
+
+            result = service.resolve_data(data, context)
+
+            assert result == ["item1"]
+            mock_resolve.assert_called_once_with("{{ var }}", context, "")
+
+    def test_resolve_data_tuple(self):
+        """Test resolve_data with tuple input (normalized to list)."""
+        with patch.object(Jinja2Service, "resolve_string") as mock_resolve:
+            mock_resolve.return_value = "item"
+
+            service = Jinja2Service()
+            data = ("{{ var }}",)
+            context = {"var": "item"}
+
+            result = service.resolve_data(data, context)
+
+            assert result == ["item"]
+
+    def test_resolve_data_non_string(self):
+        """Test resolve_data with non-string values."""
+        service = Jinja2Service()
+        data = {"key": 123}
+        context = {}
+
+        result = service.resolve_data(data, context)
+
+        assert result == {"key": 123}
+
+    def test_resolve_to_bool_true(self):
+        """Test resolve_to_bool with boolean True."""
+        service = Jinja2Service()
+        result = service.resolve_to_bool(True, {})
+        assert result is True
+
+    def test_resolve_to_bool_false(self):
+        """Test resolve_to_bool with boolean False."""
+        service = Jinja2Service()
+        result = service.resolve_to_bool(False, {})
+        assert result is False
+
+    def test_resolve_to_bool_template(self):
+        """Test resolve_to_bool with template resolving to truthy."""
+        with patch.object(Jinja2Service, "resolve_string") as mock_resolve:
+            mock_resolve.return_value = "yes"
+
+            service = Jinja2Service()
+            result = service.resolve_to_bool("{{ var }}", {"var": "yes"})
+
+            assert result is True
+            mock_resolve.assert_called_once_with("{{ var }}", {"var": "yes"})
+
+    def test_resolve_to_bool_plain_string(self):
+        """Test resolve_to_bool with plain truthy string."""
+        service = Jinja2Service()
+        result = service.resolve_to_bool("true", {})
+        assert result is True
+
+    def test_resolve_to_bool_falsy_string(self):
+        """Test resolve_to_bool with falsy string."""
+        service = Jinja2Service()
+        result = service.resolve_to_bool("false", {})
+        assert result is False
+
+    def test_to_bool_various_values(self):
+        """Test to_bool with various inputs."""
+        service = Jinja2Service()
+
+        assert service.to_bool(True) is True
+        assert service.to_bool(False) is False
+        assert service.to_bool("yes") is True
+        assert service.to_bool("no") is False
+        assert service.to_bool("1") is True
+        assert service.to_bool("0") is False
+        assert service.to_bool(123) is True
+        assert service.to_bool(0) is False
+
+    def test_validate_template_success(self):
+        """Test validate_template with valid template."""
+        service = Jinja2Service()
+        is_valid, error = service.validate_template("{{ valid }}")
+        assert is_valid is True
+        assert error == ""
+
+    def test_validate_template_error(self):
+        """Test validate_template with invalid template."""
+        service = Jinja2Service()
+        is_valid, error = service.validate_template("{{ invalid")
+        assert is_valid is False
+        assert "unexpected end of template" in error
+
+    def test_is_template_true(self):
+        """Test is_template detects Jinja2 markers."""
+        service = Jinja2Service()
+        assert service.is_template("{{ var }}") is True
+        assert service.is_template("{% if %}") is True
+        assert service.is_template("{# comment #}") is True
+        assert service.is_template("{{- var -}}") is True
+
+    def test_is_template_false(self):
+        """Test is_template returns False for non-templates."""
+        service = Jinja2Service()
+        assert service.is_template("plain text") is False
+        assert service.is_template("") is False
+        assert service.is_template("no markers here") is False
+
+    @patch("nornflow.j2.core.ALL_BUILTIN_J2_FILTERS", {"test_filter": lambda x: x})
+    def test_initialize_environment_registers_filters(self):
+        """Test _initialize_environment registers built-in filters."""
+        with patch("nornflow.j2.core.CallableCatalog") as mock_catalog_class:
+            mock_catalog = MagicMock()
+            mock_catalog_class.return_value = mock_catalog
+
+            service = Jinja2Service()
+            service._initialize_environment(service)
+
+            assert service._j2_filters_catalog is mock_catalog
+            mock_catalog.register.assert_called()
+
+    def test_register_custom_filters(self):
+        """Test register_custom_filters discovers and registers filters."""
+        # Reset singleton to ensure patch applies
+        Jinja2Service._instance = None
+        with patch("nornflow.j2.core.CallableCatalog") as mock_catalog_class, \
+             patch("nornflow.j2.core.is_public_callable") as mock_predicate, \
+             patch("os.path.exists", return_value=True), \
+             patch("os.listdir", return_value=["filter.py"]):
+
+            mock_catalog = MagicMock()
+            mock_catalog_class.return_value = mock_catalog
+
+            service = Jinja2Service()
+            service.register_custom_filters(["/fake/dir"])
+
+            mock_catalog.discover_items_in_dir.assert_called_with("/fake/dir", predicate=mock_predicate)
+
+    def test_get_registered_j2_filters(self):
+        """Test get_registered_j2_filters returns environment filters."""
+        service = Jinja2Service()
+        result = service.get_registered_j2_filters()
+        assert isinstance(result, dict)
+        assert len(result) > 0  # Should have built-in filters
+
+    def test_j2_filters_catalog_property(self):
+        """Test j2_filters_catalog property."""
+        service = Jinja2Service()
+        catalog = service.j2_filters_catalog
+        assert catalog is not None
+
+    def test_j2_filters_catalog_setter_raises_error(self):
+        """Test j2_filters_catalog setter raises error."""
+        service = Jinja2Service()
+        with pytest.raises(Jinja2ServiceError, match="J2 filters catalog cannot be set directly"):
+            service.j2_filters_catalog = "invalid"
+
+    def test_environment_setter_invalid_type(self):
+        """Test environment setter raises error for invalid type."""
+        service = Jinja2Service()
+        with pytest.raises(Jinja2ServiceError, match="Expected Environment instance"):
+            service.environment = "invalid"
+
+    @patch("nornflow.j2.core.Environment.from_string")
+    def test_compile_template_success(self, mock_from_string):
+        """Test compile_template caches and returns template."""
+        mock_template = MagicMock()
+        mock_from_string.return_value = mock_template
+
+        service = Jinja2Service()
+        result1 = service.compile_template("{{ test }}")
+        result2 = service.compile_template("{{ test }}")
+
+        assert result1 is mock_template
+        assert result1 is result2  # Cached
+        mock_from_string.assert_called_once()
+
+    @patch("nornflow.j2.core.Environment.from_string")
+    def test_compile_template_error(self, mock_from_string):
+        """Test compile_template raises TemplateValidationError on error."""
+        from jinja2 import TemplateSyntaxError
+        mock_from_string.side_effect = TemplateSyntaxError("Error", 1, "template")
+
+        service = Jinja2Service()
+        with pytest.raises(TemplateValidationError):
+            service.compile_template("{{ invalid")
+
+    @patch("nornflow.j2.core.CallableCatalog")
+    def test_initialize_with_settings(self, mock_catalog_class):
+        """Test initialize_with_settings calls register_custom_filters."""
+        with patch.object(Jinja2Service, "register_custom_filters") as mock_register:
+            from nornflow.settings import NornFlowSettings
+
+            settings = NornFlowSettings(local_j2_filters=["/dir"], nornir_config_file="/fake/config")
+            Jinja2Service.initialize_with_settings(settings)
+
+            mock_register.assert_called_once_with(["/dir"])
\ No newline at end of file
diff --git a/tests/unit/models/test_workflow_model.py b/tests/unit/models/test_workflow_model.py
index 7aba53f..05b1e96 100644
--- a/tests/unit/models/test_workflow_model.py
+++ b/tests/unit/models/test_workflow_model.py
@@ -508,7 +508,7 @@ class TestWorkflowModelBlueprintExpansion:
             }
         }
         
-        with pytest.raises(BlueprintError, match="Blueprint must contain ONLY 'tasks' key"):
+        with pytest.raises(BlueprintError, match=r"(?s)Field required.*Extra inputs are not permitted"):
             WorkflowModel.create(
                 workflow_dict,
                 blueprints_catalog=blueprints_catalog,
@@ -537,7 +537,7 @@ class TestWorkflowModelBlueprintExpansion:
             }
         }
         
-        with pytest.raises(BlueprintError, match="'tasks' must be a list"):
+        with pytest.raises(BlueprintError, match=r"Input should be a valid list"):
             WorkflowModel.create(
                 workflow_dict,
                 blueprints_catalog=blueprints_catalog,
diff --git a/tests/unit/vars/conftest.py b/tests/unit/vars/conftest.py
index cb66fa6..9855441 100644
--- a/tests/unit/vars/conftest.py
+++ b/tests/unit/vars/conftest.py
@@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
-from nornflow.builtins.jinja2_filters import ALL_FILTERS
+from nornflow.builtins.jinja2_filters import ALL_BUILTIN_J2_FILTERS
 from nornflow.vars.manager import NornFlowVariablesManager
 from nornflow.vars.processors import NornFlowVariableProcessor
 
@@ -116,8 +116,8 @@ def basic_manager(tmp_path) -> NornFlowVariablesManager:
     temp_vars_dir = tmp_path / "temp_vars"
     temp_vars_dir.mkdir()
     manager = NornFlowVariablesManager(vars_dir=str(temp_vars_dir))
-    for name, func in ALL_FILTERS.items():
-        manager._jinja2_manager.env.filters[name] = func
+    for name, func in ALL_BUILTIN_J2_FILTERS.items():
+        manager.jinja2.environment.filters[name] = func
     return manager
 
 
@@ -149,8 +149,8 @@ def setup_manager(
     manager.set_runtime_variable("override_var", "runtime_value", "test_device")
     manager.set_runtime_variable("complex_var", {"key": "value", "list": [1, 2, 3]}, "test_device")
 
-    for name, func in ALL_FILTERS.items():
-        manager._jinja2_manager.env.filters[name] = func
+    for name, func in ALL_BUILTIN_J2_FILTERS.items():
+        manager.jinja2.environment.filters[name] = func
 
     with patch.dict(
         "os.environ",
diff --git a/tests/unit/vars/test_error_conditions.py b/tests/unit/vars/test_error_conditions.py
index 9602031..e8d3a74 100644
--- a/tests/unit/vars/test_error_conditions.py
+++ b/tests/unit/vars/test_error_conditions.py
@@ -2,7 +2,8 @@ from pathlib import Path
 
 import pytest
 
-from nornflow.vars.exceptions import TemplateError, VariableError
+from nornflow.j2.exceptions import TemplateError
+from nornflow.vars.exceptions import VariableError
 from nornflow.vars.manager import NornFlowVariablesManager
 
 
diff --git a/tests/unit/vars/test_manager.py b/tests/unit/vars/test_manager.py
index c4a34fa..041f478 100644
--- a/tests/unit/vars/test_manager.py
+++ b/tests/unit/vars/test_manager.py
@@ -64,18 +64,17 @@ class TestVariableManager:
         assert setup_manager.get_nornflow_variable("global_var", "test_device") == "global_value"
 
     def test_jinja2_manager_initialized(self, basic_manager):
-        """Test that Jinja2EnvironmentManager is properly initialized."""
-        assert basic_manager._jinja2_manager is not None
-        assert basic_manager._jinja2_manager.env is not None
+        """Test that Jinja2Service is properly initialized."""
+        assert basic_manager.jinja2 is not None
 
     def test_jinja2_environment_accessible(self, basic_manager):
         """Test that Jinja2 environment is accessible through the manager."""
-        env = basic_manager._jinja2_manager.env
+        env = basic_manager.jinja2.environment
         assert env is not None
         assert hasattr(env, 'filters')
 
     def test_custom_filters_registered(self, basic_manager):
         """Test that NornFlow custom filters are registered in Jinja2 environment."""
-        env = basic_manager._jinja2_manager.env
+        env = basic_manager.jinja2.environment
         assert "is_set" in env.filters
         assert "flatten_list" in env.filters
\ No newline at end of file
diff --git a/tests/unit/vars/test_resolution.py b/tests/unit/vars/test_resolution.py
index 400603f..e7c006c 100644
--- a/tests/unit/vars/test_resolution.py
+++ b/tests/unit/vars/test_resolution.py
@@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
-from nornflow.builtins.jinja2_filters import ALL_FILTERS
+from nornflow.builtins.jinja2_filters import ALL_BUILTIN_J2_FILTERS
 from nornflow.vars.manager import NornFlowVariablesManager
 from nornflow.vars.processors import NornFlowVariableProcessor
 
@@ -118,7 +118,7 @@ def basic_manager(tmp_path) -> NornFlowVariablesManager:
     temp_vars_dir = tmp_path / "temp_vars"
     temp_vars_dir.mkdir()
     manager = NornFlowVariablesManager(vars_dir=str(temp_vars_dir))
-    for name, func in ALL_FILTERS.items():
+    for name, func in ALL_BUILTIN_J2_FILTERS.items():
         manager.jinja_env.filters[name] = func
     return manager
 
@@ -151,7 +151,7 @@ def setup_manager(
     manager.set_runtime_variable("override_var", "runtime_value", "test_device")
     manager.set_runtime_variable("complex_var", {"key": "value", "list": [1, 2, 3]}, "test_device")
 
-    for name, func in ALL_FILTERS.items():
+    for name, func in ALL_BUILTIN_J2_FILTERS.items():
         manager.jinja_env.filters[name] = func
 
     with patch.dict(
diff --git a/uv.lock b/uv.lock
index dab9fa2..2218db6 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1086,7 +1086,7 @@ wheels = [
 
 [[package]]
 name = "nornflow"
-version = "0.6.0"
+version = "0.7.0"
 source = { editable = "." }
 dependencies = [
     { name = "jmespath" },
@@ -1117,7 +1117,7 @@ manual-tests = [
 
 [package.metadata]
 requires-dist = [
-    { name = "black", marker = "extra == 'dev'", specifier = ">=25.1.0" },
+    { name = "black", marker = "extra == 'dev'", specifier = ">=25.12.0" },
     { name = "jmespath", specifier = ">=1.0.1" },
     { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.15.0" },
     { name = "nornir", specifier = ">=3.5.0" },
@@ -1126,12 +1126,12 @@ requires-dist = [
     { name = "nornir-netbox", marker = "extra == 'manual-tests'", specifier = ">=0.3.0" },
     { name = "nornir-netmiko", marker = "extra == 'manual-tests'", specifier = ">=1.0.1" },
     { name = "nornir-utils", specifier = ">=0.2.0" },
-    { name = "pydantic-serdes", specifier = ">=1.0.3" },
+    { name = "pydantic-serdes", specifier = ">=1.0.4" },
     { name = "pydantic-settings", specifier = ">=2.12.0" },
     { name = "pytest", specifier = ">=8.3.4" },
     { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=6.0.0" },
     { name = "pyyaml", specifier = ">=6.0.2" },
-    { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.9.4" },
+    { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.14.10" },
     { name = "tabulate", specifier = ">=0.9.0" },
     { name = "termcolor", specifier = ">=2.5.0" },
     { name = "typer", specifier = ">=0.15.1" },
@@ -1451,7 +1451,7 @@ wheels = [
 
 [[package]]
 name = "pydantic-serdes"
-version = "1.0.3"
+version = "1.0.4"
 source = { registry = "https://pypi.org/simple" }
 dependencies = [
     { name = "email-validator" },
@@ -1461,9 +1461,9 @@ dependencies = [
     { name = "sortedcontainers" },
     { name = "toml" },
 ]
-sdist = { url = "https://files.pythonhosted.org/packages/01/6d/69ef956be3a481545aa6685a936fc35725ce66782e56805b62cc09cabe26/pydantic_serdes-1.0.3.tar.gz", hash = "sha256:16774a8f6728727403c010cca59af54f14369c342db69e2948d09195e192deea", size = 17799, upload-time = "2025-07-30T15:25:19.375Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/dd/1a/d7e35b4169de7f88f44cb17090175bc7806e9eb6d3b903425a9c18af84df/pydantic_serdes-1.0.4.tar.gz", hash = "sha256:f95401ac237941d3e0d404f42123d9013d4e8d7914d02fe36e58c34c4711bc52", size = 17816, upload-time = "2026-01-07T20:49:58.783Z" }
 wheels = [
-    { url = "https://files.pythonhosted.org/packages/61/3c/34c82eece029502808401b3bfd0b906038df1a746c03bcb1af522c15b8f1/pydantic_serdes-1.0.3-py3-none-any.whl", hash = "sha256:c77f4fd59f11d2ec24cc3ab93d42bbfc247c12b51cc479dfe8dab13a21c6c4f0", size = 21151, upload-time = "2025-07-30T15:25:18.527Z" },
+    { url = "https://files.pythonhosted.org/packages/e5/95/cfbf366614653817672c315f4acd2fa605bb63113867a7d827f865c13ef4/pydantic_serdes-1.0.4-py3-none-any.whl", hash = "sha256:3d8ce88409734235d42f560b2bf1cf5c6c646d21404aabcbb8cb00ea9497b34f", size = 21160, upload-time = "2026-01-07T20:49:57.626Z" },
 ]
 
 [[package]]
