"""
Generated DI resolver module.

Generated by: diwire._internal.resolvers.assembly.renderer.ResolversAssemblyRenderer.get_providers_code
diwire version used for generation: 0.1.0.post109.dev0+cab17f0

Generation configuration:
- root scope level: 1
- managed scopes: app:1, session:2, request:3, action:4, step:5
- graph has async specs: False
- cleanup support enabled in graph: False
- provider count: 2
- cached provider count: 2
- thread lock count: 2
- async lock count: 0
- provider slots: 1, 2

Examples:
>>> root = build_root_resolver(registrations)
>>> service = root.resolve(SomeService)
>>> async_service = await root.aresolve(SomeAsyncService)
>>> request_scope = root.enter_scope()
>>> scoped_service = request_scope.resolve(RequestScopedService)
"""

from __future__ import annotations


import threading


from types import TracebackType
from typing import Any, NoReturn

from diwire.exceptions import (
    DIWireAsyncDependencyInSyncContextError,
    DIWireDependencyNotRegisteredError,
    DIWireScopeMismatchError,
)
from diwire._internal.markers import (
    is_async_provider_annotation,
    is_all_annotation,
    component_base_key,
    is_from_context_annotation,
    is_maybe_annotation,
    is_provider_annotation,
    strip_all_annotation,
    strip_from_context_annotation,
    strip_maybe_annotation,
    strip_provider_annotation,
)
from diwire._internal.providers import ProvidersRegistrations

_MISSING_RESOLVER: Any = object()
_MISSING_CACHE: Any = object()
_MISSING_PROVIDER: Any = object()
_all_slots_by_key: dict[Any, tuple[int, ...]] = {}
_dep_registered_keys: set[Any] = set()

_scope_obj_1: Any = 1
_scope_obj_2: Any = 2
_scope_obj_3: Any = 3
_scope_obj_4: Any = 4
_scope_obj_5: Any = 5
_dep_1_type: Any = _MISSING_PROVIDER
_provider_1: Any = _MISSING_PROVIDER
_dep_2_type: Any = _MISSING_PROVIDER
_provider_2: Any = _MISSING_PROVIDER


_dep_1_thread_lock = threading.Lock()
_dep_2_thread_lock = threading.Lock()

class RootResolver:

    """
    Generated resolver for scope 'app' (level 1).

    This class is generated and optimized for direct slot-based dependency resolution.
    All visible provider slots: 1, 2.
    Providers declared in this exact scope: none.
    """



    __slots__ = (
        "_root_resolver",
        "_context",
        "_parent_context_resolver",
        "__dict__",
        "_owned_scope_resolvers",
    )


    def __init__(
        self,
        context: Any | None = None,
        parent_context_resolver: Any = None,
    ) -> None:

        """
        Initialize resolver state for the current scope.

        The constructor wires scope ancestry references, cache slots, and optional cleanup state.
        Root scope class: RootResolver.
        Stateless scope reuse enabled: False.
        """

        self._root_resolver = self
        self._context = context
        self._parent_context_resolver = parent_context_resolver
        self._owned_scope_resolvers: tuple[Any, ...] = ()

    def enter_scope(
        self,
        scope: Any | None = None,
        *,
        context: Any | None = None,
    ) -> RootResolver | _SessionResolver | _RequestResolver | _ActionResolver | _StepResolver:

        """
        Open a deeper scope resolver from this resolver.

        Current scope: app:1.
        Allowed explicit transitions: session:2, request:3, action:4, step:5.
        Passing None follows the default transition for the scope graph.
        Optional context mapping is attached to the opened target scope.
        """

        if scope is _scope_obj_3 or scope == 3:
            return _RequestResolver(
                self,
                context,
                self,
            )
        if scope is None:
            return _RequestResolver(
                self,
                context,
                self,
            )
        target_scope_level = scope
        if target_scope_level is _scope_obj_1 or target_scope_level == 1:
            return self
        if target_scope_level <= 1:
            msg = f"Cannot enter scope level {target_scope_level} from level 1."
            raise DIWireScopeMismatchError(msg)
        if target_scope_level is _scope_obj_2 or target_scope_level == 2:
            return _SessionResolver(
                self,
                context,
                self,
            )
        if target_scope_level is _scope_obj_4 or target_scope_level == 4:
            request_resolver = _RequestResolver(
                self,
                None,
                self,
            )
            action_resolver = _ActionResolver(
                request_resolver._root_resolver,
                context,
                request_resolver,
                request_resolver._session_resolver,
                request_resolver,
            )
            action_resolver._owned_scope_resolvers = (request_resolver,)
            return action_resolver
        if target_scope_level is _scope_obj_5 or target_scope_level == 5:
            request_resolver = _RequestResolver(
                self,
                None,
                self,
            )
            action_resolver = _ActionResolver(
                request_resolver._root_resolver,
                None,
                request_resolver,
                request_resolver._session_resolver,
                request_resolver,
            )
            step_resolver = _StepResolver(
                action_resolver._root_resolver,
                context,
                action_resolver,
                action_resolver._session_resolver,
                request_resolver,
            )
            step_resolver._owned_scope_resolvers = (request_resolver, action_resolver)
            return step_resolver
        msg = f"Scope level {target_scope_level} is not a valid next transition from level 1."
        raise DIWireScopeMismatchError(msg)

    def resolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated synchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks to avoid reflective dispatch.
        if dependency is _dep_1_type:
            return RootResolver.resolve_1(self)
        if dependency is _dep_2_type:
            return RootResolver.resolve_2(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return self.resolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(self.resolve_1())
                    continue
                if slot == 2:
                    results.append(self.resolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    async def aresolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated asynchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks for asynchronous resolution.
        if dependency is _dep_1_type:
            return await RootResolver.aresolve_1(self)
        if dependency is _dep_2_type:
            return await RootResolver.aresolve_2(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return await self.aresolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(await self.aresolve_1())
                    continue
                if slot == 2:
                    results.append(await self.aresolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    def _resolve_from_context(self, key: Any) -> Any:
        context = self._context
        if context is not None and key in context:
            return context[key]
        parent_context_resolver = self._parent_context_resolver
        while parent_context_resolver is not None:
            parent_context = parent_context_resolver._context
            if parent_context is not None and key in parent_context:
                return parent_context[key]
            parent_context_resolver = parent_context_resolver._parent_context_resolver
        msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
        raise DIWireDependencyNotRegisteredError(msg)

    def _is_registered_dependency(self, dependency: Any) -> bool:
        return dependency in _dep_registered_keys

    def __enter__(self) -> RootResolver:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    def close(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return self.__exit__(exc_type, exc_value, traceback)

    async def __aenter__(self) -> RootResolver:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    async def aclose(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return await self.__aexit__(exc_type, exc_value, traceback)


    def resolve_1(self) -> Any:

        """
        Provider slot 1 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: RootResolver
        Dependency wiring: none
        Behavior: This resolver cannot access the provider yet because the required scope is deeper than the current resolver scope.
        """

        msg = "Provider slot 1 requires opened scope level 2."
        raise DIWireScopeMismatchError(msg)

    async def aresolve_1(self) -> Any:

        """
        Provider slot 1 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: RootResolver
        Dependency wiring: none
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_1()

    def resolve_2(self) -> Any:

        """
        Provider slot 2 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: RootResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: This resolver cannot access the provider yet because the required scope is deeper than the current resolver scope.
        """

        msg = "Provider slot 2 requires opened scope level 3."
        raise DIWireScopeMismatchError(msg)

    async def aresolve_2(self) -> Any:

        """
        Provider slot 2 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: RootResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_2()


class _SessionResolver:

    """
    Generated resolver for scope 'session' (level 2).

    This class is generated and optimized for direct slot-based dependency resolution.
    All visible provider slots: 1, 2.
    Providers declared in this exact scope: 1.
    """



    __slots__ = (
        "_root_resolver",
        "_context",
        "_parent_context_resolver",
        "_owned_scope_resolvers",
        "_cache_1",
    )


    def __init__(
        self,
        root_resolver: RootResolver,
        context: Any | None = None,
        parent_context_resolver: Any = None,
    ) -> None:

        """
        Initialize resolver state for the current scope.

        The constructor wires scope ancestry references, cache slots, and optional cleanup state.
        Root scope class: RootResolver.
        Stateless scope reuse enabled: False.
        """

        self._root_resolver = root_resolver
        self._context = context
        self._parent_context_resolver = parent_context_resolver
        self._owned_scope_resolvers: tuple[Any, ...] = ()
        self._cache_1 = _MISSING_CACHE

    def enter_scope(
        self,
        scope: Any | None = None,
        *,
        context: Any | None = None,
    ) -> _SessionResolver | _RequestResolver | _ActionResolver | _StepResolver:

        """
        Open a deeper scope resolver from this resolver.

        Current scope: session:2.
        Allowed explicit transitions: request:3, action:4, step:5.
        Passing None follows the default transition for the scope graph.
        Optional context mapping is attached to the opened target scope.
        """

        if scope is _scope_obj_3 or scope == 3:
            return _RequestResolver(
                self._root_resolver,
                context,
                self,
                self,
            )
        if scope is None:
            return _RequestResolver(
                self._root_resolver,
                context,
                self,
                self,
            )
        target_scope_level = scope
        if target_scope_level is _scope_obj_2 or target_scope_level == 2:
            return self
        if target_scope_level <= 2:
            msg = f"Cannot enter scope level {target_scope_level} from level 2."
            raise DIWireScopeMismatchError(msg)
        if target_scope_level is _scope_obj_4 or target_scope_level == 4:
            request_resolver = _RequestResolver(
                self._root_resolver,
                None,
                self,
                self,
            )
            action_resolver = _ActionResolver(
                request_resolver._root_resolver,
                context,
                request_resolver,
                request_resolver._session_resolver,
                request_resolver,
            )
            action_resolver._owned_scope_resolvers = (request_resolver,)
            return action_resolver
        if target_scope_level is _scope_obj_5 or target_scope_level == 5:
            request_resolver = _RequestResolver(
                self._root_resolver,
                None,
                self,
                self,
            )
            action_resolver = _ActionResolver(
                request_resolver._root_resolver,
                None,
                request_resolver,
                request_resolver._session_resolver,
                request_resolver,
            )
            step_resolver = _StepResolver(
                action_resolver._root_resolver,
                context,
                action_resolver,
                action_resolver._session_resolver,
                request_resolver,
            )
            step_resolver._owned_scope_resolvers = (request_resolver, action_resolver)
            return step_resolver
        msg = f"Scope level {target_scope_level} is not a valid next transition from level 2."
        raise DIWireScopeMismatchError(msg)

    def resolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated synchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks to avoid reflective dispatch.
        if dependency is _dep_1_type:
            return _SessionResolver.resolve_1(self)
        if dependency is _dep_2_type:
            return _SessionResolver.resolve_2(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return self.resolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(self.resolve_1())
                    continue
                if slot == 2:
                    results.append(self.resolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    async def aresolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated asynchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks for asynchronous resolution.
        if dependency is _dep_1_type:
            return await _SessionResolver.aresolve_1(self)
        if dependency is _dep_2_type:
            return await _SessionResolver.aresolve_2(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return await self.aresolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(await self.aresolve_1())
                    continue
                if slot == 2:
                    results.append(await self.aresolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    def _resolve_from_context(self, key: Any) -> Any:
        context = self._context
        if context is not None and key in context:
            return context[key]
        parent_context_resolver = self._parent_context_resolver
        while parent_context_resolver is not None:
            parent_context = parent_context_resolver._context
            if parent_context is not None and key in parent_context:
                return parent_context[key]
            parent_context_resolver = parent_context_resolver._parent_context_resolver
        msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
        raise DIWireDependencyNotRegisteredError(msg)

    def _is_registered_dependency(self, dependency: Any) -> bool:
        return dependency in _dep_registered_keys

    def __enter__(self) -> _SessionResolver:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    def close(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return self.__exit__(exc_type, exc_value, traceback)

    async def __aenter__(self) -> _SessionResolver:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    async def aclose(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return await self.__aexit__(exc_type, exc_value, traceback)


    def resolve_1(self) -> Any:

        """
        Provider slot 1 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _SessionResolver
        Dependency wiring: none
        Behavior: Builds the provider value in this resolver, enforcing scope guards and cache policy.
        """

        cached_value = self._cache_1
        if cached_value is not _MISSING_CACHE:
            return cached_value
        with _dep_1_thread_lock:
            if (cached_value := self._cache_1) is not _MISSING_CACHE:
                return cached_value
            value = _provider_1()
            self._cache_1 = value
            return value

    async def aresolve_1(self) -> Any:

        """
        Provider slot 1 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _SessionResolver
        Dependency wiring: none
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_1()

    def resolve_2(self) -> Any:

        """
        Provider slot 2 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _SessionResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: This resolver cannot access the provider yet because the required scope is deeper than the current resolver scope.
        """

        msg = "Provider slot 2 requires opened scope level 3."
        raise DIWireScopeMismatchError(msg)

    async def aresolve_2(self) -> Any:

        """
        Provider slot 2 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _SessionResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_2()


class _RequestResolver:

    """
    Generated resolver for scope 'request' (level 3).

    This class is generated and optimized for direct slot-based dependency resolution.
    All visible provider slots: 1, 2.
    Providers declared in this exact scope: 2.
    """



    __slots__ = (
        "_root_resolver",
        "_context",
        "_parent_context_resolver",
        "_owned_scope_resolvers",
        "_session_resolver",
        "_cache_2",
    )


    def __init__(
        self,
        root_resolver: RootResolver,
        context: Any | None = None,
        parent_context_resolver: Any = None,
        session_resolver: Any = _MISSING_RESOLVER,
    ) -> None:

        """
        Initialize resolver state for the current scope.

        The constructor wires scope ancestry references, cache slots, and optional cleanup state.
        Root scope class: RootResolver.
        Stateless scope reuse enabled: False.
        """

        self._root_resolver = root_resolver
        self._context = context
        self._parent_context_resolver = parent_context_resolver
        self._owned_scope_resolvers: tuple[Any, ...] = ()
        self._session_resolver = session_resolver
        self._cache_2 = _MISSING_CACHE

    def enter_scope(
        self,
        scope: Any | None = None,
        *,
        context: Any | None = None,
    ) -> _RequestResolver | _ActionResolver | _StepResolver:

        """
        Open a deeper scope resolver from this resolver.

        Current scope: request:3.
        Allowed explicit transitions: action:4, step:5.
        Passing None follows the default transition for the scope graph.
        Optional context mapping is attached to the opened target scope.
        """

        if scope is _scope_obj_4 or scope == 4:
            return _ActionResolver(
                self._root_resolver,
                context,
                self,
                self._session_resolver,
                self,
            )
        if scope is None:
            return _ActionResolver(
                self._root_resolver,
                context,
                self,
                self._session_resolver,
                self,
            )
        target_scope_level = scope
        if target_scope_level is _scope_obj_3 or target_scope_level == 3:
            return self
        if target_scope_level <= 3:
            msg = f"Cannot enter scope level {target_scope_level} from level 3."
            raise DIWireScopeMismatchError(msg)
        if target_scope_level is _scope_obj_5 or target_scope_level == 5:
            action_resolver = _ActionResolver(
                self._root_resolver,
                None,
                self,
                self._session_resolver,
                self,
            )
            step_resolver = _StepResolver(
                action_resolver._root_resolver,
                context,
                action_resolver,
                action_resolver._session_resolver,
                action_resolver._request_resolver,
            )
            step_resolver._owned_scope_resolvers = (action_resolver,)
            return step_resolver
        msg = f"Scope level {target_scope_level} is not a valid next transition from level 3."
        raise DIWireScopeMismatchError(msg)

    def resolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated synchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks to avoid reflective dispatch.
        if dependency is _dep_2_type:
            return _RequestResolver.resolve_2(self)
        if dependency is _dep_1_type:
            return _RequestResolver.resolve_1(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return self.resolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(self.resolve_1())
                    continue
                if slot == 2:
                    results.append(self.resolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    async def aresolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated asynchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks for asynchronous resolution.
        if dependency is _dep_2_type:
            return await _RequestResolver.aresolve_2(self)
        if dependency is _dep_1_type:
            return await _RequestResolver.aresolve_1(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return await self.aresolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(await self.aresolve_1())
                    continue
                if slot == 2:
                    results.append(await self.aresolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    def _resolve_from_context(self, key: Any) -> Any:
        context = self._context
        if context is not None and key in context:
            return context[key]
        parent_context_resolver = self._parent_context_resolver
        while parent_context_resolver is not None:
            parent_context = parent_context_resolver._context
            if parent_context is not None and key in parent_context:
                return parent_context[key]
            parent_context_resolver = parent_context_resolver._parent_context_resolver
        msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
        raise DIWireDependencyNotRegisteredError(msg)

    def _is_registered_dependency(self, dependency: Any) -> bool:
        return dependency in _dep_registered_keys

    def __enter__(self) -> _RequestResolver:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    def close(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return self.__exit__(exc_type, exc_value, traceback)

    async def __aenter__(self) -> _RequestResolver:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    async def aclose(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return await self.__aexit__(exc_type, exc_value, traceback)


    def resolve_1(self) -> Any:

        """
        Provider slot 1 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _RequestResolver
        Dependency wiring: none
        Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers.
        """

        owner_resolver = self._session_resolver
        if owner_resolver is _MISSING_RESOLVER:
            msg = "Provider slot 1 requires opened scope level 2."
            raise DIWireScopeMismatchError(msg)
        return owner_resolver.resolve_1()

    async def aresolve_1(self) -> Any:

        """
        Provider slot 1 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _RequestResolver
        Dependency wiring: none
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_1()

    def resolve_2(self) -> Any:

        """
        Provider slot 2 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _RequestResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: Builds the provider value in this resolver, enforcing scope guards and cache policy.
        """

        cached_value = self._cache_2
        if cached_value is not _MISSING_CACHE:
            return cached_value
        with _dep_2_thread_lock:
            if (cached_value := self._cache_2) is not _MISSING_CACHE:
                return cached_value
            value = _provider_2(
                self.resolve_1(),
            )
            self._cache_2 = value
            return value

    async def aresolve_2(self) -> Any:

        """
        Provider slot 2 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _RequestResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_2()


class _ActionResolver:

    """
    Generated resolver for scope 'action' (level 4).

    This class is generated and optimized for direct slot-based dependency resolution.
    All visible provider slots: 1, 2.
    Providers declared in this exact scope: none.
    """



    __slots__ = (
        "_root_resolver",
        "_context",
        "_parent_context_resolver",
        "_owned_scope_resolvers",
        "_session_resolver",
        "_request_resolver",
    )


    def __init__(
        self,
        root_resolver: RootResolver,
        context: Any | None = None,
        parent_context_resolver: Any = None,
        session_resolver: Any = _MISSING_RESOLVER,
        request_resolver: Any = _MISSING_RESOLVER,
    ) -> None:

        """
        Initialize resolver state for the current scope.

        The constructor wires scope ancestry references, cache slots, and optional cleanup state.
        Root scope class: RootResolver.
        Stateless scope reuse enabled: False.
        """

        self._root_resolver = root_resolver
        self._context = context
        self._parent_context_resolver = parent_context_resolver
        self._owned_scope_resolvers: tuple[Any, ...] = ()
        self._session_resolver = session_resolver
        self._request_resolver = request_resolver

    def enter_scope(
        self,
        scope: Any | None = None,
        *,
        context: Any | None = None,
    ) -> _ActionResolver | _StepResolver:

        """
        Open a deeper scope resolver from this resolver.

        Current scope: action:4.
        Allowed explicit transitions: step:5.
        Passing None follows the default transition for the scope graph.
        Optional context mapping is attached to the opened target scope.
        """

        if scope is _scope_obj_5 or scope == 5:
            return _StepResolver(
                self._root_resolver,
                context,
                self,
                self._session_resolver,
                self._request_resolver,
            )
        if scope is None:
            return _StepResolver(
                self._root_resolver,
                context,
                self,
                self._session_resolver,
                self._request_resolver,
            )
        target_scope_level = scope
        if target_scope_level is _scope_obj_4 or target_scope_level == 4:
            return self
        if target_scope_level <= 4:
            msg = f"Cannot enter scope level {target_scope_level} from level 4."
            raise DIWireScopeMismatchError(msg)
        msg = f"Scope level {target_scope_level} is not a valid next transition from level 4."
        raise DIWireScopeMismatchError(msg)

    def resolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated synchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks to avoid reflective dispatch.
        if dependency is _dep_2_type:
            return _ActionResolver.resolve_2(self)
        if dependency is _dep_1_type:
            return _ActionResolver.resolve_1(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return self.resolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(self.resolve_1())
                    continue
                if slot == 2:
                    results.append(self.resolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    async def aresolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated asynchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks for asynchronous resolution.
        if dependency is _dep_2_type:
            return await _ActionResolver.aresolve_2(self)
        if dependency is _dep_1_type:
            return await _ActionResolver.aresolve_1(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return await self.aresolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(await self.aresolve_1())
                    continue
                if slot == 2:
                    results.append(await self.aresolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    def _resolve_from_context(self, key: Any) -> Any:
        context = self._context
        if context is not None and key in context:
            return context[key]
        parent_context_resolver = self._parent_context_resolver
        while parent_context_resolver is not None:
            parent_context = parent_context_resolver._context
            if parent_context is not None and key in parent_context:
                return parent_context[key]
            parent_context_resolver = parent_context_resolver._parent_context_resolver
        msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
        raise DIWireDependencyNotRegisteredError(msg)

    def _is_registered_dependency(self, dependency: Any) -> bool:
        return dependency in _dep_registered_keys

    def __enter__(self) -> _ActionResolver:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    def close(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return self.__exit__(exc_type, exc_value, traceback)

    async def __aenter__(self) -> _ActionResolver:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    async def aclose(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return await self.__aexit__(exc_type, exc_value, traceback)


    def resolve_1(self) -> Any:

        """
        Provider slot 1 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _ActionResolver
        Dependency wiring: none
        Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers.
        """

        owner_resolver = self._session_resolver
        if owner_resolver is _MISSING_RESOLVER:
            msg = "Provider slot 1 requires opened scope level 2."
            raise DIWireScopeMismatchError(msg)
        return owner_resolver.resolve_1()

    async def aresolve_1(self) -> Any:

        """
        Provider slot 1 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _ActionResolver
        Dependency wiring: none
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_1()

    def resolve_2(self) -> Any:

        """
        Provider slot 2 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _ActionResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers.
        """

        owner_resolver = self._request_resolver
        if owner_resolver is _MISSING_RESOLVER:
            msg = "Provider slot 2 requires opened scope level 3."
            raise DIWireScopeMismatchError(msg)
        return owner_resolver.resolve_2()

    async def aresolve_2(self) -> Any:

        """
        Provider slot 2 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _ActionResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_2()


class _StepResolver:

    """
    Generated resolver for scope 'step' (level 5).

    This class is generated and optimized for direct slot-based dependency resolution.
    All visible provider slots: 1, 2.
    Providers declared in this exact scope: none.
    """



    __slots__ = (
        "_root_resolver",
        "_context",
        "_parent_context_resolver",
        "_owned_scope_resolvers",
        "_session_resolver",
        "_request_resolver",
    )


    def __init__(
        self,
        root_resolver: RootResolver,
        context: Any | None = None,
        parent_context_resolver: Any = None,
        session_resolver: Any = _MISSING_RESOLVER,
        request_resolver: Any = _MISSING_RESOLVER,
    ) -> None:

        """
        Initialize resolver state for the current scope.

        The constructor wires scope ancestry references, cache slots, and optional cleanup state.
        Root scope class: RootResolver.
        Stateless scope reuse enabled: False.
        """

        self._root_resolver = root_resolver
        self._context = context
        self._parent_context_resolver = parent_context_resolver
        self._owned_scope_resolvers: tuple[Any, ...] = ()
        self._session_resolver = session_resolver
        self._request_resolver = request_resolver

    def enter_scope(
        self,
        scope: Any | None = None,
        *,
        context: Any | None = None,
    ) -> NoReturn:

        """
        Open a deeper scope resolver from this resolver.

        Current scope: step:5.
        Allowed explicit transitions: none.
        Passing None follows the default transition for the scope graph.
        Optional context mapping is attached to the opened target scope.
        """

        msg = "Cannot enter deeper scope from level 5."
        raise DIWireScopeMismatchError(msg)

    def resolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated synchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks to avoid reflective dispatch.
        if dependency is _dep_2_type:
            return _StepResolver.resolve_2(self)
        if dependency is _dep_1_type:
            return _StepResolver.resolve_1(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return self.resolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(self.resolve_1())
                    continue
                if slot == 2:
                    results.append(self.resolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    async def aresolve(self, dependency: Any) -> Any:

        """
        Route a dependency token to a generated asynchronous provider resolver method.

        Known provider slots: 1, 2.
        Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
        """

        # Fast path identity checks for asynchronous resolution.
        if dependency is _dep_2_type:
            return await _StepResolver.aresolve_2(self)
        if dependency is _dep_1_type:
            return await _StepResolver.aresolve_1(self)
        if is_maybe_annotation(dependency):
            inner = strip_maybe_annotation(dependency)
            if is_provider_annotation(inner):
                provider_inner = strip_provider_annotation(inner)
                if is_async_provider_annotation(inner):
                    return lambda: self.aresolve(provider_inner)
                return lambda: self.resolve(provider_inner)
            if is_from_context_annotation(inner):
                key = strip_from_context_annotation(inner)
                try:
                    return self._resolve_from_context(key)
                except DIWireDependencyNotRegisteredError:
                    return None
            if not self._is_registered_dependency(inner):
                return None
            return await self.aresolve(inner)
        if is_provider_annotation(dependency):
            inner = strip_provider_annotation(dependency)
            if is_async_provider_annotation(dependency):
                return lambda: self.aresolve(inner)
            return lambda: self.resolve(inner)
        if is_from_context_annotation(dependency):
            key = strip_from_context_annotation(dependency)
            return self._resolve_from_context(key)
        if is_all_annotation(dependency):
            inner = strip_all_annotation(dependency)
            slots = _all_slots_by_key.get(inner, ())
            if not slots:
                return ()
            results: list[Any] = []
            for slot in slots:
                if slot == 1:
                    results.append(await self.aresolve_1())
                    continue
                if slot == 2:
                    results.append(await self.aresolve_2())
                    continue
                msg = f"Dependency {dependency!r} is not registered."
                raise DIWireDependencyNotRegisteredError(msg)
            return tuple(results)
        # Any dependency not pre-bound in build_root_resolver is unknown here.
        msg = f"Dependency {dependency!r} is not registered."
        raise DIWireDependencyNotRegisteredError(msg)

    def _resolve_from_context(self, key: Any) -> Any:
        context = self._context
        if context is not None and key in context:
            return context[key]
        parent_context_resolver = self._parent_context_resolver
        while parent_context_resolver is not None:
            parent_context = parent_context_resolver._context
            if parent_context is not None and key in parent_context:
                return parent_context[key]
            parent_context_resolver = parent_context_resolver._parent_context_resolver
        msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
        raise DIWireDependencyNotRegisteredError(msg)

    def _is_registered_dependency(self, dependency: Any) -> bool:
        return dependency in _dep_registered_keys

    def __enter__(self) -> _StepResolver:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    def close(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return self.__exit__(exc_type, exc_value, traceback)

    async def __aenter__(self) -> _StepResolver:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return None

    async def aclose(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        return await self.__aexit__(exc_type, exc_value, traceback)


    def resolve_1(self) -> Any:

        """
        Provider slot 1 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _StepResolver
        Dependency wiring: none
        Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers.
        """

        owner_resolver = self._session_resolver
        if owner_resolver is _MISSING_RESOLVER:
            msg = "Provider slot 1 requires opened scope level 2."
            raise DIWireScopeMismatchError(msg)
        return owner_resolver.resolve_1()

    async def aresolve_1(self) -> Any:

        """
        Provider slot 1 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession
        Declared scope: session (level 2)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 2
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _StepResolver
        Dependency wiring: none
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_1()

    def resolve_2(self) -> Any:

        """
        Provider slot 2 resolver (sync method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _StepResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers.
        """

        owner_resolver = self._request_resolver
        if owner_resolver is _MISSING_RESOLVER:
            msg = "Provider slot 2 requires opened scope level 3."
            raise DIWireScopeMismatchError(msg)
        return owner_resolver.resolve_2()

    async def aresolve_2(self) -> Any:

        """
        Provider slot 2 resolver (async method).

        Returns: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Provider spec kind: concrete_type
        Provider target: tests.unit.internal.test_resolver_assembly_expected_output._SnapshotRequest
        Declared scope: request (level 3)
        Declared lifetime: scoped
        Cache policy: cached
        Cache owner scope: 3
        Declared lock mode: auto
        Effective lock mode: thread
        Sync path thread lock: True
        Async path async lock: False
        Provider declared async: False
        Graph requires async: False
        Cleanup callbacks required: False
        Resolver class handling this call: _StepResolver
        Dependency wiring: session (positional_or_keyword) -> slot 1 [tests.unit.internal.test_resolver_assembly_expected_output._SnapshotSession]
        Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
        """

        return self.resolve_2()

def build_root_resolver(
    registrations: ProvidersRegistrations,
    *,
    cleanup_enabled: bool = True,
) -> RootResolver:

    """
    Build and return the generated root resolver instance.

    This function rebinds module-level provider globals for the supplied registrations.
    Global rebinding makes `resolve_<slot>` methods run without registration lookups.
    Provider slots configured during bootstrap: 1, 2.
    Root resolver class: RootResolver.

    Examples:
    >>> root = build_root_resolver(registrations)
    >>> root.resolve(SomeService)
    >>> await root.aresolve(SomeAsyncService)
    >>> scoped = root.enter_scope()
    """

    # Bind module-level globals to this container registration snapshot.
    # This keeps hot paths in resolver methods free from dictionary lookups.
    global _all_slots_by_key
    global _dep_registered_keys

    # Rebuild All[...] indexes for collect-all dependency dispatch.
    _all_slots_by_key = {}
    _dep_registered_keys = set()
    all_slots_by_key: dict[Any, list[int]] = {}

    global _dep_1_type, _provider_1
    global _dep_2_type, _provider_2

    # --- Provider slot 1 bootstrap metadata ---
    # Read provider spec by stable slot id from registrations.
    registration_1 = registrations.get_by_slot(1)
    # Capture dependency identity token used by `resolve`/`aresolve` dispatch.
    _dep_1_type = registration_1.provides
    # Track dependency keys that are directly registered in this compiled graph.
    _dep_registered_keys.add(_dep_1_type)
    # Capture provider object (instance/type/factory/generator/context manager).
    _provider_1 = registration_1.concrete_type
    # Index provider slot for All[...] dependency dispatch.
    component_base_1 = component_base_key(_dep_1_type)
    base_key_1 = component_base_1
    if base_key_1 is None and not hasattr(_dep_1_type, '__metadata__'):
        base_key_1 = _dep_1_type
    if base_key_1 is not None:
        all_slots_by_key.setdefault(base_key_1, []).append(1)

    # --- Provider slot 2 bootstrap metadata ---
    # Read provider spec by stable slot id from registrations.
    registration_2 = registrations.get_by_slot(2)
    # Capture dependency identity token used by `resolve`/`aresolve` dispatch.
    _dep_2_type = registration_2.provides
    # Track dependency keys that are directly registered in this compiled graph.
    _dep_registered_keys.add(_dep_2_type)
    # Capture provider object (instance/type/factory/generator/context manager).
    _provider_2 = registration_2.concrete_type
    # Index provider slot for All[...] dependency dispatch.
    component_base_2 = component_base_key(_dep_2_type)
    base_key_2 = component_base_2
    if base_key_2 is None and not hasattr(_dep_2_type, '__metadata__'):
        base_key_2 = _dep_2_type
    if base_key_2 is not None:
        all_slots_by_key.setdefault(base_key_2, []).append(2)

    # Freeze All[...] indexes for stable repr and to discourage mutation.
    _all_slots_by_key = {
        key: tuple(slots) for key, slots in all_slots_by_key.items()
    }

    # Construct a fresh root resolver configured with optional cleanup callbacks.
    return RootResolver()