This file is a merged representation of a subset of the codebase, containing files not matching ignore patterns, combined into a single document by Repomix. The content has been processed where empty lines have been removed.

================================================================
File Summary
================================================================

Purpose:
--------
This file contains a packed representation of the entire repository's contents.
It is designed to be easily consumable by AI systems for analysis, code review,
or other automated processes.

File Format:
------------
The content is organized as follows:
1. This summary section
2. Repository information
3. Directory structure
4. Multiple file entries, each consisting of:
  a. A separator line (================)
  b. The file path (File: path/to/file)
  c. Another separator line
  d. The full contents of the file
  e. A blank line

Usage Guidelines:
-----------------
- This file should be treated as read-only. Any changes should be made to the
  original repository files, not this packed version.
- When processing this file, use the file path to distinguish
  between different files in the repository.
- Be aware that this file may contain sensitive information. Handle it with
  the same level of security as you would the original repository.

Notes:
------
- Some files may have been excluded based on .gitignore rules and Repomix's configuration
- Binary files are not included in this packed representation. Please refer to the Repository Structure section for a complete list of file paths, including binary files
- Files matching these patterns are excluded: .specstory/**/*.md, .venv/**, _private/**, CLEANUP.txt, **/*.json, *.lock
- Files matching patterns in .gitignore are excluded
- Files matching default ignore patterns are excluded
- Empty lines have been removed from all files

Additional Info:
----------------

================================================================
Directory Structure
================================================================
.cursor/
  rules/
    filetree.mdc
.github/
  workflows/
    push.yml
    release.yml
src/
  opero/
    __init__.py
    _version.py
    concurrency.py
    core.py
    decorators.py
    exceptions.py
    opero.py
    rate_limit.py
    retry.py
    utils.py
tests/
  test_core.py
  test_decorators.py
  test_package.py
.gitignore
.pre-commit-config.yaml
CHANGELOG.md
cleanup.py
LICENSE
package.toml
pyproject.toml
README.md
TODO.md

================================================================
Files
================================================================

================
File: .cursor/rules/filetree.mdc
================
---
description: File tree of the project
globs:
---
[ 864]  .
├── [  64]  .benchmarks
├── [  96]  .cursor
│   └── [  96]  rules
│       └── [1.6K]  filetree.mdc
├── [  96]  .github
│   └── [ 128]  workflows
│       ├── [2.7K]  push.yml
│       └── [1.4K]  release.yml
├── [3.5K]  .gitignore
├── [ 532]  .pre-commit-config.yaml
├── [  96]  .specstory
│   └── [ 128]  history
│       ├── [2.6K]  .what-is-this.md
│       └── [574K]  2025-03-04_03-16-comprehensive-plan-for-opero-package-implementation.md
├── [2.4K]  CHANGELOG.md
├── [ 160]  CLEANUP.txt
├── [1.0K]  LICENSE
├── [9.5K]  README.md
├── [2.6K]  TODO.md
├── [ 13K]  cleanup.py
├── [  96]  dist
│   └── [   0]  .gitkeep
├── [ 426]  package.toml
├── [6.1K]  pyproject.toml
├── [ 160]  src
│   ├── [  64]  .benchmarks
│   └── [ 448]  opero
│       ├── [1.1K]  __init__.py
│       ├── [ 130]  _version.py
│       ├── [9.9K]  concurrency.py
│       ├── [9.5K]  core.py
│       ├── [3.1K]  decorators.py
│       ├── [ 563]  exceptions.py
│       ├── [1.6K]  opero.py
│       ├── [4.0K]  rate_limit.py
│       ├── [7.8K]  retry.py
│       └── [1.9K]  utils.py
├── [ 192]  tests
│   ├── [4.2K]  test_core.py
│   ├── [4.1K]  test_decorators.py
│   └── [ 139]  test_package.py
├── [ 54K]  twat_search.txt
└── [109K]  uv.lock

13 directories, 31 files

================
File: .github/workflows/push.yml
================
name: Build & Test
on:
  push:
    branches: [main]
    tags-ignore: ["v*"]
  pull_request:
    branches: [main]
  workflow_dispatch:
permissions:
  contents: write
  id-token: write
concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: true
jobs:
  quality:
    name: Code Quality
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - name: Run Ruff lint
        uses: astral-sh/ruff-action@v3
        with:
          version: "latest"
          args: "check --output-format=github"
      - name: Run Ruff Format
        uses: astral-sh/ruff-action@v3
        with:
          version: "latest"
          args: "format --check --respect-gitignore"
  test:
    name: Run Tests
    needs: quality
    strategy:
      matrix:
        python-version: ["3.10", "3.11", "3.12"]
        os: [ubuntu-latest]
      fail-fast: true
    runs-on: ${{ matrix.os }}
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}
      - name: Install UV
        uses: astral-sh/setup-uv@v5
        with:
          version: "latest"
          python-version: ${{ matrix.python-version }}
          enable-cache: true
          cache-suffix: ${{ matrix.os }}-${{ matrix.python-version }}
      - name: Install test dependencies
        run: |
          uv pip install --system --upgrade pip
          uv pip install --system ".[test]"
      - name: Run tests with Pytest
        run: uv run pytest -n auto --maxfail=1 --disable-warnings --cov-report=xml --cov-config=pyproject.toml --cov=src/opero --cov=tests tests/
      - name: Upload coverage report
        uses: actions/upload-artifact@v4
        with:
          name: coverage-${{ matrix.python-version }}-${{ matrix.os }}
          path: coverage.xml
  build:
    name: Build Distribution
    needs: test
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - name: Install UV
        uses: astral-sh/setup-uv@v5
        with:
          version: "latest"
          python-version: "3.12"
          enable-cache: true
      - name: Install build tools
        run: uv pip install build hatchling hatch-vcs
      - name: Build distributions
        run: uv run python -m build --outdir dist
      - name: Upload distribution artifacts
        uses: actions/upload-artifact@v4
        with:
          name: dist-files
          path: dist/
          retention-days: 5

================
File: .github/workflows/release.yml
================
name: Release
on:
  push:
    tags: ["v*"]
permissions:
  contents: write
  id-token: write
jobs:
  release:
    name: Release to PyPI
    runs-on: ubuntu-latest
    environment:
      name: pypi
      url: https://pypi.org/p/opero
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - name: Install UV
        uses: astral-sh/setup-uv@v5
        with:
          version: "latest"
          python-version: "3.12"
          enable-cache: true
      - name: Install build tools
        run: uv pip install build hatchling hatch-vcs
      - name: Build distributions
        run: uv run python -m build --outdir dist
      - name: Verify distribution files
        run: |
          ls -la dist/
          test -n "$(find dist -name '*.whl')" || (echo "Wheel file missing" && exit 1)
          test -n "$(find dist -name '*.tar.gz')" || (echo "Source distribution missing" && exit 1)
      - name: Publish to PyPI
        uses: pypa/gh-action-pypi-publish@release/v1
        with:
          password: ${{ secrets.PYPI_TOKEN }}
      - name: Create GitHub Release
        uses: softprops/action-gh-release@v1
        with:
          files: dist/*
          generate_release_notes: true
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

================
File: src/opero/__init__.py
================


================
File: src/opero/_version.py
================


================
File: src/opero/concurrency.py
================
T = TypeVar("T")  # Input type
R = TypeVar("R")  # Return type
logger = logging.getLogger(__name__)
class MultiprocessConfig:
class ConcurrencyConfig:
class ProcessPoolWrapper:
    def __init__(self, max_workers: int | None = None, backend: str | None = None):
        self.max_workers = max_workers or multiprocessing.cpu_count()
    async def __aenter__(self) -> "ProcessPoolWrapper":
                logger.warning("Pathos not available, falling back to multiprocessing")
                self.pool = pathos.multiprocessing.ProcessPool(nodes=self.max_workers)
            self.pool = multiprocessing.Pool(processes=self.max_workers)
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
            self.pool.close()
            self.pool.join()
    async def map(self, func: Callable[[T], R], items: Iterable[T]) -> list[R]:
            raise RuntimeError(msg)
        loop = asyncio.get_event_loop()
        items_list = list(items)
            return await loop.run_in_executor(
                None, lambda: self.pool.map(func, items_list)
class ThreadPoolWrapper:
    def __init__(self, max_workers: int | None = None):
    async def __aenter__(self) -> "ThreadPoolWrapper":
            self.pool = pathos.threading.ThreadPool(nodes=self.max_workers)
            self.pool = ThreadPoolExecutor(max_workers=self.max_workers)
            if hasattr(self.pool, "close"):
                self.pool.shutdown()
                None, lambda: list(self.pool.map(func, items_list))
class AsyncPoolWrapper:
    async def __aenter__(self) -> "AsyncPoolWrapper":
            raise ImportError(msg)
        self.pool = aiomultiprocess.Pool(processes=self.max_workers)
            await self.pool.close()
            await self.pool.join()
        return await self.pool.map(func, items_list)
async def get_pool(
        return await ProcessPoolWrapper(
        ).__aenter__()
        return await ThreadPoolWrapper(max_workers=max_workers).__aenter__()
        return await AsyncPoolWrapper(max_workers=max_workers).__aenter__()
        raise ValueError(msg)
async def create_pool(
            pool = ProcessPoolWrapper(max_workers=max_workers, backend=backend)
            await pool.__aenter__()
            pool = ThreadPoolWrapper(max_workers=max_workers)
            pool = AsyncPoolWrapper(max_workers=max_workers)
            await pool.__aexit__(None, None, None)

================
File: src/opero/core.py
================
T = TypeVar("T")  # Input type
R = TypeVar("R")  # Return type
logger = logging.getLogger(__name__)
class OrchestratorConfig:
    fallbacks: list[Callable[..., Any]] = field(default_factory=list)
class FallbackChain:
    def __init__(
        elif callable(fallbacks):
    async def __call__(self, *args: Any, **kwargs: Any) -> Any:
        return await self.execute(*args, **kwargs)
    async def execute(self, *args: Any, **kwargs: Any) -> Any:
            self.logger.debug("Trying primary function")
            result = await ensure_async(self.primary_func, *args, **kwargs)
            self.logger.debug("Primary function succeeded")
            self.logger.warning(f"Primary function failed: {e}")
                self.logger.error("No fallbacks available")
        for i, fallback in enumerate(self.fallbacks):
                self.logger.debug(f"Trying fallback {i + 1}/{len(self.fallbacks)}")
                result = await ensure_async(fallback, *args, **kwargs)
                self.logger.debug(f"Fallback {i + 1} succeeded")
                self.logger.warning(f"Fallback {i + 1} failed: {e}")
        self.logger.error("All fallbacks failed")
            raise AllFailedError(msg) from last_exception
        raise AllFailedError(msg)
class Orchestrator:
    def __init__(self, *, config: OrchestratorConfig | None = None):
        self.config = config or OrchestratorConfig()
    async def execute(self, func: Callable[..., R], *args: Any, **kwargs: Any) -> R:
                    if asyncio.iscoroutinefunction(fallback_func):
                        async def retry_wrapped_async_fallback(
                            return await retry_async(
                        retry_wrapped_fallbacks.append(retry_wrapped_async_fallback)
                        def retry_wrapped_sync_fallback(*a: Any, **kw: Any) -> Any:
                            return retry_sync(
                        retry_wrapped_fallbacks.append(retry_wrapped_sync_fallback)
                chain = FallbackChain(func, retry_wrapped_fallbacks)
                chain = FallbackChain(func, self.config.fallbacks)
                if asyncio.iscoroutinefunction(func):
                    result = await retry_async(
                    result = await ensure_async(
                result = await chain.execute(*args, **kwargs)
                result = retry_sync(
            result = await ensure_async(func, *args, **kwargs)
        return cast(R, result)
    async def process(
            result = await self.execute(func, *args, **kwargs)
            results.append(result)

================
File: src/opero/decorators.py
================
T = TypeVar("T")  # Input type
R = TypeVar("R")  # Return type
def orchestrate(
    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        config = OrchestratorConfig(
        orchestrator = Orchestrator(config=config)
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            return await orchestrator.execute(func, *args, **kwargs)
        async def process_wrapper(*args: Any) -> Any:
            return await orchestrator.process([func], *args)

================
File: src/opero/exceptions.py
================
class OperoError(Exception):
class AllFailedError(OperoError):
    def __init__(self, message="All fallback operations failed."):
        super().__init__(self.message)
    def __str__(self) -> str:

================
File: src/opero/opero.py
================
logging.basicConfig(
logger = logging.getLogger(__name__)
class Config:
def process_data(
        logger.setLevel(logging.DEBUG)
        logger.debug("Debug mode enabled")
        raise ValueError(msg)
def main() -> None:
        config = Config(name="default", value="test", options={"key": "value"})
        result = process_data([], config=config)
        logger.info("Processing completed: %s", result)
        logger.error("An error occurred: %s", str(e))
    main()

================
File: src/opero/rate_limit.py
================
R = TypeVar("R")
logger = logging.getLogger(__name__)
class RateLimitConfig:
def get_rate_limiter(rate: float) -> Limiter:
    return Limiter(rate)
async def with_rate_limit(
        return await ensure_async(func, *args, **kwargs)
class RateLimiter:
    def __init__(self, rate: float):
        self.limiter = Limiter(rate)
    async def __aenter__(self) -> "RateLimiter":
        await self.limiter.acquire()
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    async def limit_async(self, func: Callable[..., R], *args: Any, **kwargs: Any) -> R:
            return await func(*args, **kwargs)
    async def limit_sync(self, func: Callable[..., R], *args: Any, **kwargs: Any) -> R:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
    async def limit(self, func: Callable[..., R], *args: Any, **kwargs: Any) -> R:
        return await with_rate_limit(self.limiter, func, *args, **kwargs)

================
File: src/opero/retry.py
================
P = ParamSpec("P")
R = TypeVar("R")
logger = logging.getLogger(__name__)
class RetryConfig:
    def get_retry_arguments(self) -> dict[str, Any]:
            "stop": stop_after_attempt(self.max_attempts),
            "wait": wait_exponential(
            "retry": retry_if_exception_type(self.retry_exceptions),
            "before_sleep": lambda retry_state: logger.debug(
def with_retry(
    config = RetryConfig(
    return _with_retry_config(config)
def _with_retry_config(
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        if inspect.iscoroutinefunction(func):
            @functools.wraps(func)
            async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
                return await retry_async(func, *args, config=config, **kwargs)
            return cast(Callable[P, R], async_wrapper)
            def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
                return retry_sync(func, *args, config=config, **kwargs)
            return cast(Callable[P, R], sync_wrapper)
def retry_sync(
    retry_config = config or RetryConfig()
    retry_args = retry_config.get_retry_arguments()
    retrying = Retrying(**retry_args)
        return retrying(func, *args, **kwargs)
        logger.error(f"All {retry_config.max_attempts} retry attempts failed")
            if e.last_attempt.exception() is not None:
                raise e.last_attempt.exception() from e
async def retry_async(
    async def async_func(*a: Any, **kw: Any) -> Any:
        result = await ensure_async(func, *a, **kw)
            logger.debug(f"Attempt {attempt_number}/{retry_config.max_attempts}")
            result = await async_func(*args, **kwargs)
            logger.debug(f"Attempt {attempt_number} succeeded")
            return cast(R, result)
            logger.warning(f"Attempt {attempt_number} failed: {e!s}")
            wait_time = min(
            logger.debug(f"Waiting {wait_time} seconds before next attempt")
            await asyncio.sleep(wait_time)
    raise RetryError(msg)

================
File: src/opero/utils.py
================
T = TypeVar("T")
logger = logging.getLogger(__name__)
async def ensure_async(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    if inspect.iscoroutinefunction(func):
        return await func(*args, **kwargs)
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def get_logger(name: str, level: int = logging.INFO) -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(level)
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
        handler.setFormatter(formatter)
        logger.addHandler(handler)

================
File: tests/test_core.py
================
async def async_success(value):
async def async_fail(value):
    raise ValueError(error_msg)
async def async_fallback(value):
async def async_process_success(*args):
    if len(args) == 1:
async def async_process_fallback(*args):
def sync_success(value):
def sync_fail(value):
def sync_fallback(value):
async def test_fallback_chain_success():
    chain = FallbackChain(async_success, async_fallback)
    result = await chain(1)
async def test_fallback_chain_fallback():
    chain = FallbackChain(async_fail, async_fallback)
async def test_fallback_chain_all_fail():
    chain = FallbackChain(async_fail, async_fail)
    with pytest.raises(AllFailedError):
        await chain(1)
async def test_fallback_chain_sync_function():
    chain = FallbackChain(sync_fail, sync_success)
async def test_orchestrator_execute_success():
    orchestrator = Orchestrator()
    result = await orchestrator.execute(async_success, 1)
async def test_orchestrator_execute_fallback():
    config = OrchestratorConfig(fallbacks=[async_fallback])
    orchestrator = Orchestrator(config=config)
    result = await orchestrator.execute(async_fail, 1)
async def test_orchestrator_process():
    config = OrchestratorConfig(fallbacks=[async_process_fallback])
    results = await orchestrator.process([async_process_success], 1, 2, 3)
async def test_orchestrator_process_with_concurrency():
    config = OrchestratorConfig(
        concurrency_config=ConcurrencyConfig(limit=2),
async def test_orchestrator_with_retry():
    mock_func = AsyncMock(side_effect=[ValueError("First attempt"), "Success"])
    config = OrchestratorConfig(retry_config=RetryConfig(max_attempts=2))
    result = await orchestrator.execute(mock_func, 1)

================
File: tests/test_decorators.py
================
async def async_success(value):
async def async_fail(value):
    raise ValueError(error_msg)
async def async_fallback(value):
def sync_success(value):
def sync_fail(value):
def sync_fallback(value):
async def test_orchestrate_decorator_basic():
    @orchestrate()
    async def decorated_func(value):
        return await async_success(value)
    result = await decorated_func(1)
async def test_orchestrate_decorator_fallback():
    @orchestrate(fallbacks=[async_fallback])
        return await async_fail(value)
async def test_orchestrate_decorator_retry():
    mock_func = AsyncMock(side_effect=[ValueError("First attempt"), "Success"])
    @orchestrate(retry_config=RetryConfig(max_attempts=2))
        return await mock_func(value)
async def test_orchestrate_decorator_process():
    with patch.object(
        decorated_func.process = lambda *args: Orchestrator(
            config=OrchestratorConfig(fallbacks=[async_fallback])
        ).process([decorated_func], *args)
        results = await decorated_func.process(1, 2, 3)
async def test_orchestrate_decorator_with_concurrency():
        @orchestrate(
            concurrency_config=ConcurrencyConfig(limit=2),
            config=OrchestratorConfig(
async def test_orchestrate_decorator_with_sync_function():
    @orchestrate(fallbacks=[sync_fallback])
    def decorated_func(value):
        return sync_success(value)

================
File: tests/test_package.py
================
def test_version():

================
File: .gitignore
================
*_autogen/
.DS_Store
__version__.py
__pycache__/
_Chutzpah*
_deps
_NCrunch_*
_pkginfo.txt
_Pvt_Extensions
_ReSharper*/
_TeamCity*
_UpgradeReport_Files/
!?*.[Cc]ache/
!.axoCover/settings.json
!.vscode/extensions.json
!.vscode/launch.json
!.vscode/settings.json
!.vscode/tasks.json
!**/[Pp]ackages/build/
!Directory.Build.rsp
.*crunch*.local.xml
.axoCover/*
.builds
.cr/personal
.fake/
.history/
.ionide/
.localhistory/
.mfractor/
.ntvs_analysis.dat
.paket/paket.exe
.sass-cache/
.vs/
.vscode
.vscode/*
.vshistory/
[Aa][Rr][Mm]/
[Aa][Rr][Mm]64/
[Bb]in/
[Bb]uild[Ll]og.*
[Dd]ebug/
[Dd]ebugPS/
[Dd]ebugPublic/
[Ee]xpress/
[Ll]og/
[Ll]ogs/
[Oo]bj/
[Rr]elease/
[Rr]eleasePS/
[Rr]eleases/
[Tt]est[Rr]esult*/
[Ww][Ii][Nn]32/
*_h.h
*_i.c
*_p.c
*_wpftmp.csproj
*- [Bb]ackup ([0-9]).rdl
*- [Bb]ackup ([0-9][0-9]).rdl
*- [Bb]ackup.rdl
*.[Cc]ache
*.[Pp]ublish.xml
*.[Rr]e[Ss]harper
*.a
*.app
*.appx
*.appxbundle
*.appxupload
*.aps
*.azurePubxml
*.bim_*.settings
*.bim.layout
*.binlog
*.btm.cs
*.btp.cs
*.build.csdef
*.cab
*.cachefile
*.code-workspace
*.coverage
*.coveragexml
*.d
*.dbmdl
*.dbproj.schemaview
*.dll
*.dotCover
*.DotSettings.user
*.dsp
*.dsw
*.dylib
*.e2e
*.exe
*.gch
*.GhostDoc.xml
*.gpState
*.ilk
*.iobj
*.ipdb
*.jfm
*.jmconfig
*.la
*.lai
*.ldf
*.lib
*.lo
*.log
*.mdf
*.meta
*.mm.*
*.mod
*.msi
*.msix
*.msm
*.msp
*.ncb
*.ndf
*.nuget.props
*.nuget.targets
*.nupkg
*.nvuser
*.o
*.obj
*.odx.cs
*.opendb
*.opensdf
*.opt
*.out
*.pch
*.pdb
*.pfx
*.pgc
*.pgd
*.pidb
*.plg
*.psess
*.publishproj
*.publishsettings
*.pubxml
*.pyc
*.rdl.data
*.rptproj.bak
*.rptproj.rsuser
*.rsp
*.rsuser
*.sap
*.sbr
*.scc
*.sdf
*.sln.docstates
*.sln.iml
*.slo
*.smod
*.snupkg
*.so
*.suo
*.svclog
*.tlb
*.tlh
*.tli
*.tlog
*.tmp
*.tmp_proj
*.tss
*.user
*.userosscache
*.userprefs
*.vbp
*.vbw
*.VC.db
*.VC.VC.opendb
*.VisualState.xml
*.vsp
*.vspscc
*.vspx
*.vssscc
*.xsd.cs
**/[Pp]ackages/*
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.HTMLClient/GeneratedArtifacts
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
*~
~$*
$tf/
AppPackages/
artifacts/
ASALocalRun/
AutoTest.Net/
Backup*/
BenchmarkDotNet.Artifacts/
bld/
BundleArtifacts/
ClientBin/
cmake_install.cmake
CMakeCache.txt
CMakeFiles
CMakeLists.txt.user
CMakeScripts
CMakeUserPresets.json
compile_commands.json
coverage*.info
coverage*.json
coverage*.xml
csx/
CTestTestfile.cmake
dlldata.c
DocProject/buildhelp/
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/*.HxC
DocProject/Help/*.HxT
DocProject/Help/html
DocProject/Help/Html2
ecf/
FakesAssemblies/
FodyWeavers.xsd
Generated_Code/
Generated\ Files/
healthchecksdb
install_manifest.txt
ipch/
Makefile
MigrationBackup/
mono_crash.*
nCrunchTemp_*
node_modules/
nunit-*.xml
OpenCover/
orleans.codegen.cs
Package.StoreAssociation.xml
paket-files/
project.fragment.lock.json
project.lock.json
publish/
PublishScripts/
rcf/
ScaffoldingReadMe.txt
ServiceFabricBackup/
StyleCopReport.xml
Testing
TestResult.xml
UpgradeLog*.htm
UpgradeLog*.XML
x64/
x86/
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# Distribution / packaging
!dist/.gitkeep

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
.ruff_cache/

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# IDE
.idea/
.vscode/
*.swp
*.swo
*~

# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Project specific
__version__.py
_private

================
File: .pre-commit-config.yaml
================
repos:
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.3.4
    hooks:
      - id: ruff
        args: [--fix]
      - id: ruff-format
        args: [--respect-gitignore]
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-toml
      - id: check-added-large-files
      - id: debug-statements
      - id: check-case-conflict
      - id: mixed-line-ending
        args: [--fix=lf]

================
File: CHANGELOG.md
================
---
this_file: CHANGELOG.md
---

# Changelog

All notable changes to the Opero project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.0] - 2024-03-04

### Added

- Initial project structure and core functionality
- Core orchestration classes:
  - `Orchestrator` class for managing resilient operations
  - `FallbackChain` class for sequential fallback execution
  - `OrchestratorConfig` for unified configuration
- Configuration classes:
  - `RetryConfig` for configuring retry behavior
  - `RateLimitConfig` for rate limiting operations
  - `MultiprocessConfig` for multiprocessing settings
  - `ConcurrencyConfig` for concurrency control
- Decorator interface:
  - `@orchestrate` decorator for applying orchestration to functions
- Retry mechanism:
  - Integration with `tenacity` for robust retry behavior
  - Support for both synchronous and asynchronous functions
  - Configurable retry policies (attempts, wait times, exceptions)
- Rate limiting:
  - Integration with `asynciolimiter` for consistent rate control
  - Support for both synchronous and asynchronous functions
- Concurrency support:
  - Abstraction over different concurrency backends
  - Support for multiprocessing, threading, and asyncio
  - Resource management via context managers
- Exception handling:
  - Custom exceptions for better error reporting
  - `OperoError` as base exception class
  - `AllFailedError` for when all fallback operations fail
- Utility functions:
  - Async/sync function adapters
  - Logging utilities
- Comprehensive test suite:
  - Unit tests for core components
  - Integration tests for common usage patterns
  - Tests for both synchronous and asynchronous execution
- Project infrastructure:
  - Hatch-based project management
  - Comprehensive type hints
  - Linting and formatting configuration
  - CI/CD setup

### Fixed

- Proper handling of coroutine objects in retry mechanism
- Correct error propagation in fallback chains
- Type compatibility issues in async/sync conversions
- Proper execution of individual functions in process method

### Changed

- Refactored `Orchestrator` to use unified `OrchestratorConfig`
- Improved error handling in `FallbackChain` to raise `AllFailedError`
- Enhanced logging throughout the codebase
- Optimized retry logic for better performance

================
File: cleanup.py
================
LOG_FILE = Path("CLEANUP.txt")
os.chdir(Path(__file__).parent)
def new() -> None:
    if LOG_FILE.exists():
        LOG_FILE.unlink()
def prefix() -> None:
    readme = Path(".cursor/rules/0project.mdc")
    if readme.exists():
        log_message("\n=== PROJECT STATEMENT ===")
        content = readme.read_text()
        log_message(content)
def suffix() -> None:
    todo = Path("TODO.md")
    if todo.exists():
        log_message("\n=== TODO.md ===")
        content = todo.read_text()
def log_message(message: str) -> None:
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with LOG_FILE.open("a") as f:
        f.write(log_line)
def run_command(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess:
        result = subprocess.run(
            log_message(result.stdout)
        log_message(f"Command failed: {' '.join(cmd)}")
        log_message(f"Error: {e.stderr}")
        return subprocess.CompletedProcess(cmd, 1, "", str(e))
def check_command_exists(cmd: str) -> bool:
        subprocess.run(["which", cmd], check=True, capture_output=True)
class Cleanup:
    def __init__(self) -> None:
        self.workspace = Path.cwd()
    def _check_required_files(self) -> bool:
            if not (self.workspace / file).exists():
                log_message(f"Error: {file} is missing")
    def _venv(self) -> None:
        log_message("Setting up virtual environment")
            run_command(["uv", "venv"])
            if venv_path.exists():
                os.environ["VIRTUAL_ENV"] = str(self.workspace / ".venv")
                log_message("Virtual environment created and activated")
                log_message(
            log_message(f"Failed to create virtual environment: {e}")
    def _install(self) -> None:
        log_message("Installing package with all extras")
            self._venv()
            run_command(["uv", "pip", "install", "-e", ".[test,dev]"])
            log_message("Package installed successfully")
            log_message(f"Failed to install package: {e}")
    def status(self) -> None:
        prefix()  # Add README.md content at start
        _print_header("Current Status")
        self._check_required_files()
        _generate_tree()
        result = run_command(["git", "status"], check=False)
        _print_header("Environment Status")
        self._install()
        _run_checks()
        suffix()  # Add TODO.md content at end
    def venv(self) -> None:
        _print_header("Virtual Environment Setup")
    def install(self) -> None:
        _print_header("Package Installation")
    def update(self) -> None:
        self.status()
        if _git_status():
            log_message("Changes detected in repository")
                run_command(
                run_command(["pre-commit", "run", "--all-files"])
                run_command(["git", "add", "-A", "."])
                run_command(["git", "commit", "-m", commit_msg])
                log_message("Changes committed successfully")
                log_message(f"Failed to commit changes: {e}")
            log_message("No changes to commit")
    def push(self) -> None:
        _print_header("Pushing Changes")
            run_command(["git", "push"])
            log_message("Changes pushed successfully")
            log_message(f"Failed to push changes: {e}")
def _generate_tree() -> None:
    if not check_command_exists("tree"):
        rules_dir = Path(".cursor/rules")
        rules_dir.mkdir(parents=True, exist_ok=True)
        tree_result = run_command(
        with open(rules_dir / "filetree.mdc", "w") as f:
            f.write("---\ndescription: File tree of the project\nglobs: \n---\n")
            f.write(tree_text)
        log_message("\nProject structure:")
        log_message(tree_text)
        log_message(f"Failed to generate tree: {e}")
def _git_status() -> bool:
    result = run_command(["git", "status", "--porcelain"], check=False)
    return bool(result.stdout.strip())
def _run_checks() -> None:
    log_message("Running code quality checks")
        log_message(">>> Running code fixes...")
        log_message(">>>Running type checks...")
        run_command(["python", "-m", "mypy", "src", "tests"], check=False)
        log_message(">>> Running tests...")
        run_command(["python", "-m", "pytest", "tests"], check=False)
        log_message("All checks completed")
        log_message(f"Failed during checks: {e}")
def _print_header(message: str) -> None:
    log_message(f"\n=== {message} ===")
def repomix(
            cmd.append("--compress")
            cmd.append("--remove-empty-lines")
            cmd.append("-i")
            cmd.append(ignore_patterns)
        cmd.extend(["-o", output_file])
        run_command(cmd)
        log_message(f"Repository content mixed into {output_file}")
        log_message(f"Failed to mix repository: {e}")
def print_usage() -> None:
    log_message("Usage:")
    log_message("  cleanup.py status   # Show current status and run all checks")
    log_message("  cleanup.py venv     # Create virtual environment")
    log_message("  cleanup.py install  # Install package with all extras")
    log_message("  cleanup.py update   # Update and commit changes")
    log_message("  cleanup.py push     # Push changes to remote")
def main() -> None:
    new()  # Clear log file
    if len(sys.argv) < 2:
        print_usage()
        sys.exit(1)
    cleanup = Cleanup()
        cleanup.status()
        cleanup.venv()
        cleanup.install()
        cleanup.update()
        cleanup.push()
    repomix()
    main()

================
File: LICENSE
================
MIT License

Copyright (c) 2025 Adam Twardoch

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

================
File: package.toml
================
# Package configuration
[package]
include_cli = true        # Include CLI boilerplate
include_logging = true    # Include logging setup
use_pydantic = true      # Use Pydantic for data validation
use_rich = true          # Use Rich for terminal output

[features]
mkdocs = false           # Enable MkDocs documentation
vcs = true              # Initialize Git repository
github_actions = true   # Add GitHub Actions workflows

================
File: pyproject.toml
================
# this_file: pyproject.toml
[project]
name = "opero"
dynamic = ["version"]
description = "Resilient, parallel task orchestration for Python"
readme = "README.md"
requires-python = ">=3.8"
license = { file = "LICENSE" }
keywords = ["orchestration", "resilience", "retry", "fallback", "parallel", "concurrency", "rate-limiting"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Programming Language :: Python",
    "Programming Language :: Python :: 3.8",
    "Programming Language :: Python :: 3.9",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: Implementation :: CPython",
    "Programming Language :: Python :: Implementation :: PyPy",
]

dependencies = [
    "tenacity>=8.0.0",
    "asynciolimiter>=1.0.0",
]


[project.optional-dependencies]

dev = [
    "black>=23.1.0",
    "mypy>=1.0.0",
    "pytest>=7.0.0",
    "pytest-cov>=4.0.0",
    "pytest-asyncio>=0.21.0",
    "ruff>=0.0.243",
]

test = [
    "pytest>=7.0.0",
    "pytest-cov>=4.0.0",
    "pytest-asyncio>=0.21.0",
    "tenacity>=8.0.0",
    "asynciolimiter>=1.0.0",
    "pathos>=0.3.0",
    "aiomultiprocess>=0.9.0",
]

pathos = [
    "pathos>=0.3.0",
]

aiomultiprocess = [
    "aiomultiprocess>=0.9.0",
]

all = [
    "pathos>=0.3.0",
    "aiomultiprocess>=0.9.0",
]

[project.scripts]
# CLINAME = "opero.__main__:main"



[[project.authors]]
name = "Adam Twardoch"
email = "adam@twardoch.com"

[project.urls]
Documentation = "https://github.com/twardoch/opero#readme"
Issues = "https://github.com/twardoch/opero/issues"
Source = "https://github.com/twardoch/opero"


[build-system]
requires = [
    "hatchling>=1.27.0",     # Core build backend for Hatch
    "hatch-vcs>=0.4.0",      # Version Control System plugin for Hatch
]
build-backend = "hatchling.build"  # Use Hatchling as the build backend


[tool.coverage.paths]
opero = ["src/opero", "*/opero/src/opero"]
tests = ["tests", "*/opero/tests"]



[tool.coverage.report]
exclude_lines = [
    "no cov",
    "if __name__ == .__main__.:",
    "if TYPE_CHECKING:",
]

[tool.coverage.run]
source_pkgs = ["opero", "tests"]
branch = true
parallel = true
omit = [
    "src/opero/__about__.py",
]



[tool.hatch.build.hooks.vcs]
version-file = "src/opero/__version__.py"


[tool.hatch.build.targets.wheel]
packages = ["src/opero"]



[tool.hatch.envs.default]
dependencies = [
    "pytest",
    "pytest-cov",
    "pytest-asyncio",
    "tenacity",
    "asynciolimiter",
    "pathos",
    "aiomultiprocess",
]

[[tool.hatch.envs.all.matrix]]
python = ["3.8", "3.9", "3.10", "3.11", "3.12"]


[tool.hatch.envs.default.scripts]
test = "pytest {args:tests}"
test-cov = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/opero --cov=tests {args:tests}"
type-check = "mypy src/opero tests"
lint = ["ruff check src/opero tests", "ruff format --respect-gitignore src/opero tests"]
fix = ["ruff check  --fix --unsafe-fixes src/opero tests", "ruff format --respect-gitignore src/opero tests"]



[tool.hatch.envs.lint]
detached = true
dependencies = [
    "tenacity>=8.0.0",
    "asynciolimiter>=1.0.0",
]


[tool.hatch.envs.lint.scripts]
typing = "mypy --install-types --non-interactive {args:src/opero tests}"
style = ["ruff check {args:.}", "ruff format --respect-gitignore {args:.}"]
fmt = ["ruff format --respect-gitignore {args:.}", "ruff check --fix {args:.}"]
all = ["style", "typing"]


[tool.hatch.envs.test]
dependencies = [
    "tenacity>=8.0.0",
    "asynciolimiter>=1.0.0",
    "pytest-asyncio>=0.21.0",
]

[tool.hatch.envs.test.scripts]
test = "python -m pytest -n auto -p no:briefcase {args:tests}"
test-cov = "python -m pytest -n auto -p no:briefcase --cov-report=term-missing --cov-config=pyproject.toml --cov=src/opero --cov=tests {args:tests}"
bench = "python -m pytest -v -p no:briefcase tests/test_benchmark.py --benchmark-only"
bench-save = "python -m pytest -v -p no:briefcase tests/test_benchmark.py --benchmark-only --benchmark-json=benchmark/results.json"

[tool.hatch.version]
source = "vcs"
path = 'src/opero/__version__.py'
pattern = "__version__\\s*=\\s*version\\s*=\\s*['\"](?P<version>[^'\"]+)['\"]"

[tool.hatch.version.raw-options]
version_scheme = "post-release"


[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true


[tool.ruff]
target-version = "py310"
line-length = 88

[tool.ruff.lint]
extend-select = [
    "A",
    "ARG",
    "B",
    "C",
    "DTZ",
    "E",
    "EM",
    "F",
    "FBT",
    "I",
    "ICN",
    "ISC",
    "N",
    "PLC",
    "PLE",
    "PLR",
    "PLW",
    "Q",
    "RUF",
    "S",
    "T",
    "TID",
    "UP",
    "W",
    "YTT",
]
ignore = ["ARG001", "E501", "I001", "RUF001", "PLR2004", "EXE003", "ISC001"]



[tool.ruff.per-file-ignores]
"tests/*" = ["S101"]





[tool.pytest.ini_options]
addopts = "-v --durations=10 -p no:briefcase"
asyncio_mode = "auto"
console_output_style = "progress"
filterwarnings = ["ignore::DeprecationWarning", "ignore::UserWarning"]
log_cli = true
log_cli_level = "INFO"
markers = [
  "benchmark: marks tests as benchmarks (select with '-m benchmark')",
  "unit: mark a test as a unit test",
  "integration: mark a test as an integration test",
  "permutation: tests for permutation functionality",
  "parameter: tests for parameter parsing",
  "prompt: tests for prompt parsing",
]
norecursedirs = [
  ".*",
  "build",
  "dist",
  "venv",
  "__pycache__",
  "*.egg-info",
  "_private",
]

python_classes = ["Test*"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
testpaths = ["tests"]


[tool.pytest-benchmark]
min_rounds = 100
min_time = 0.1
histogram = true
storage = "file"
save-data = true
compare = [
    "min",    # Minimum time
    "max",    # Maximum time
    "mean",   # Mean time
    "stddev", # Standard deviation
    "median", # Median time
    "iqr",    # Inter-quartile range
    "ops",    # Operations per second
    "rounds", # Number of rounds
]

================
File: README.md
================
---
this_file: README.md
---

# Opero

[![PyPI version](https://img.shields.io/pypi/v/opero.svg)](https://pypi.org/project/opero/)
[![Python versions](https://img.shields.io/pypi/pyversions/opero.svg)](https://pypi.org/project/opero/)
[![License](https://img.shields.io/github/license/twardoch/opero.svg)](https://github.com/twardoch/opero/blob/main/LICENSE)

**Opero** is a Python package that provides a clean, Pythonic interface for orchestrating resilient, parallelized operations with fallback mechanisms, retry logic, rate limiting, and multiprocessing support. The name "Opero" comes from the Latin word for "to work" or "to operate."

## Key Features

- **Fallback Chains**: Automatically try alternative functions when primary operations fail
- **Automatic Retries**: Robust retry mechanism with configurable backoff strategies
- **Rate Limiting**: Control operation frequency to prevent resource exhaustion
- **Parallel Processing**: Execute operations concurrently with configurable limits
- **Async First**: Built for modern async workflows while fully supporting sync functions
- **Unified Interface**: Both class-based (`Orchestrator`) and decorator-based (`@orchestrate`) APIs
- **Composable**: Layer different resilience mechanisms as needed
- **Type Safety**: Comprehensive type hints for better IDE integration and error detection

## Installation

```bash
pip install opero
```

For optional dependencies:

```bash
# For enhanced multiprocessing support
pip install opero[pathos]

# For async multiprocessing
pip install opero[aiomultiprocess]

# Install all optional dependencies
pip install opero[all]
```

## Quick Start

### Basic Usage with Fallbacks

```python
import asyncio
from opero import Orchestrator, OrchestratorConfig

async def primary_function(value):
    if value % 2 == 0:
        raise ValueError(f"Failed for value: {value}")
    return f"Primary: {value}"

async def fallback_function(value):
    return f"Fallback: {value}"

async def main():
    # Create an orchestrator with a fallback
    orchestrator = Orchestrator(
        config=OrchestratorConfig(
            fallbacks=[fallback_function]
        )
    )

    # Execute a single operation
    result = await orchestrator.execute(primary_function, 2)
    print(result)  # "Fallback: 2"

    # Process multiple items
    results = await orchestrator.process([primary_function], 1, 2, 3, 4)
    print(results)  # ["Primary: 1", "Fallback: 2", "Primary: 3", "Fallback: 4"]

asyncio.run(main())
```

### Using the Decorator with Retry

```python
import asyncio
from opero import orchestrate, RetryConfig, OrchestratorConfig

@orchestrate(
    config=OrchestratorConfig(
        retry_config=RetryConfig(
            max_attempts=3,
            wait_min=1.0,
            wait_max=5.0,
            wait_multiplier=1.5
        )
    )
)
async def unreliable_function(value):
    # This function will be retried up to 3 times if it fails
    if value % 3 == 0:
        raise ValueError(f"Failed for value: {value}")
    return f"Success: {value}"

async def main():
    # The function will be retried automatically if it fails
    result = await unreliable_function(3)  # Will retry but eventually use fallback
    print(result)

asyncio.run(main())
```

### Rate Limiting and Concurrency

```python
import asyncio
from opero import Orchestrator, OrchestratorConfig, RateLimitConfig, ConcurrencyConfig

async def api_call(item):
    # Simulate an API call
    await asyncio.sleep(0.1)
    return f"Result: {item}"

async def main():
    # Create an orchestrator with rate limiting and concurrency control
    orchestrator = Orchestrator(
        config=OrchestratorConfig(
            rate_limit_config=RateLimitConfig(rate=5),  # 5 operations per second
            concurrency_config=ConcurrencyConfig(limit=3)  # Max 3 concurrent operations
        )
    )

    # Process multiple items with controlled concurrency and rate
    items = list(range(10))
    results = await orchestrator.process([api_call], *items)
    print(results)

asyncio.run(main())
```

## Core Components

### Orchestrator

The central class for managing resilient operations:

```python
from opero import Orchestrator, OrchestratorConfig, RetryConfig

# Create an orchestrator with various configurations
orchestrator = Orchestrator(
    config=OrchestratorConfig(
        retry_config=RetryConfig(max_attempts=3),
        fallbacks=[backup_function1, backup_function2],
        # Other configurations...
    )
)

# Execute a function with the configured resilience mechanisms
result = await orchestrator.execute(my_function, *args, **kwargs)

# Process multiple items with the same function
results = await orchestrator.process([my_function], *items)
```

### FallbackChain

Manages sequential execution of fallback functions:

```python
from opero import FallbackChain

# Create a fallback chain with a primary function and fallbacks
chain = FallbackChain(primary_function, [fallback1, fallback2])

# Execute the chain - will try each function in order until one succeeds
result = await chain.execute(*args, **kwargs)
```

### Configuration Classes

#### OrchestratorConfig

Unified configuration for the Orchestrator:

```python
from opero import OrchestratorConfig, RetryConfig, RateLimitConfig

config = OrchestratorConfig(
    retry_config=RetryConfig(max_attempts=3),
    rate_limit_config=RateLimitConfig(rate=10),
    fallbacks=[backup_function],
    # Other configurations...
)
```

#### RetryConfig

Configure retry behavior:

```python
from opero import RetryConfig

retry_config = RetryConfig(
    max_attempts=3,                          # Maximum number of retry attempts
    wait_min=1.0,                            # Minimum wait time between retries (seconds)
    wait_max=60.0,                           # Maximum wait time between retries (seconds)
    wait_multiplier=1.0,                     # Multiplier for exponential backoff
    retry_exceptions=(ValueError, KeyError), # Exception types that trigger a retry
    reraise=True                             # Whether to reraise the last exception
)
```

#### RateLimitConfig

Configure rate limiting:

```python
from opero import RateLimitConfig

# Limit operations to 10 per second
rate_limit_config = RateLimitConfig(rate=10.0)
```

#### ConcurrencyConfig

Configure concurrency limits:

```python
from opero import ConcurrencyConfig

# Limit to 5 concurrent operations
concurrency_config = ConcurrencyConfig(limit=5)
```

#### MultiprocessConfig

Configure multiprocessing:

```python
from opero import MultiprocessConfig

# Use 4 worker processes with the pathos backend
multiprocess_config = MultiprocessConfig(max_workers=4, backend="pathos")
```

### @orchestrate Decorator

Apply orchestration to functions:

```python
from opero import orchestrate, OrchestratorConfig, RetryConfig

@orchestrate(
    config=OrchestratorConfig(
        retry_config=RetryConfig(max_attempts=3),
        fallbacks=[backup_function]
    )
)
async def my_function(arg):
    # Function implementation
    pass

# The function now has retry and fallback capabilities
result = await my_function(some_arg)

# For processing multiple items
results = await my_function.process(item1, item2, item3)
```

## Advanced Usage

### Mixing Sync and Async Functions

Opero seamlessly handles both synchronous and asynchronous functions:

```python
from opero import Orchestrator, OrchestratorConfig

# Synchronous function
def sync_function(value):
    if value % 2 == 0:
        raise ValueError("Sync function failed")
    return f"Sync: {value}"

# Asynchronous function
async def async_function(value):
    return f"Async: {value}"

# Mix them in a fallback chain
orchestrator = Orchestrator(
    config=OrchestratorConfig(
        fallbacks=[async_function]
    )
)

# Works with both sync and async primary functions
result1 = await orchestrator.execute(sync_function, 2)   # "Async: 2"
result2 = await orchestrator.execute(async_function, 1)  # "Async: 1"
```

### Custom Retry Logic

Fine-tune retry behavior for specific exception types:

```python
from opero import RetryConfig, orchestrate, OrchestratorConfig

# Only retry on specific exceptions
retry_config = RetryConfig(
    max_attempts=5,
    retry_exceptions=(ConnectionError, TimeoutError),
    wait_min=0.5,
    wait_max=10.0,
    wait_multiplier=2.0
)

@orchestrate(
    config=OrchestratorConfig(
        retry_config=retry_config
    )
)
async def network_operation(url):
    # Implementation...
    pass
```

### Multiprocessing for CPU-Bound Tasks

Use multiprocessing for CPU-intensive operations:

```python
from opero import Orchestrator, OrchestratorConfig, MultiprocessConfig

def cpu_intensive_task(data):
    # Heavy computation...
    return result

orchestrator = Orchestrator(
    config=OrchestratorConfig(
        multiprocess_config=MultiprocessConfig(
            max_workers=4,
            backend="pathos"  # More flexible serialization
        )
    )
)

# Process items in parallel using multiple processes
results = await orchestrator.process([cpu_intensive_task], *large_data_items)
```

## Development

This project uses [Hatch](https://hatch.pypa.io/) for development workflow management.

### Setup Development Environment

```bash
# Install hatch if you haven't already
pip install hatch

# Create and activate development environment
hatch shell

# Run tests
hatch run test

# Run tests with coverage
hatch run test-cov

# Run linting
hatch run lint

# Format code
hatch run format
```

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

================
File: TODO.md
================
---
this_file: TODO.md
---

# TODO

Tip: Periodically run `./cleanup.py status` to see results of lints and tests.

This document outlines the remaining tasks for the Opero project.

## Phase 1

- [ ] Improve error handling in `retry_async` function to properly handle coroutine objects
- [ ] Fix type compatibility issues in async/sync conversions
- [ ] Ensure proper execution of individual functions in the `process` method
- [ ] Implement proper handling of `AllFailedError` in all fallback scenarios
- [ ] Add comprehensive logging throughout the codebase
- [ ] Optimize retry logic for better performance

## Phase 2

- [ ] Add more unit tests for edge cases in retry mechanism
- [ ] Create integration tests for complex scenarios combining multiple features
- [ ] Add performance benchmarks for key operations
- [ ] Implement stress tests for concurrency and rate limiting
- [ ] Add tests for multiprocessing with different backends

## Phase 3

- [ ] Create comprehensive API documentation with Sphinx
- [ ] Add more usage examples for common patterns
- [ ] Document best practices for error handling
- [ ] Create tutorials for advanced use cases
- [ ] Add docstrings to all public functions and classes

## Features

- [ ] Implement middleware support for function transformation
- [ ] Add metrics collection for performance monitoring
- [ ] Create a CLI interface using `fire` and `rich`
- [ ] Add support for distributed task queues
- [ ] Implement streaming support with backpressure handling

## Infrastructure

- [ ] Set up CI/CD pipeline for automated testing and deployment
- [ ] Configure code coverage reporting
- [ ] Add pre-commit hooks for code quality
- [ ] Create GitHub Actions workflow for publishing to PyPI
- [ ] Set up automated dependency updates

## Optimization

- [ ] Profile and optimize critical paths
- [ ] Reduce memory usage for large-scale operations
- [ ] Optimize concurrency management for high-throughput scenarios
- [ ] Improve serialization for multiprocessing
- [ ] Minimize overhead in the orchestration layer

## Compatibility

- [ ] Ensure compatibility with Python 3.8+
- [ ] Test with different versions of dependencies
- [ ] Add explicit support for asyncio event loop policies
- [ ] Ensure thread safety for shared resources
- [ ] Test on different operating systems

## Next Release (v0.2.0) Priorities

1. [ ] Fix all linter errors in the codebase
2. [ ] Complete the test suite with 90%+ coverage
3. [ ] Improve error handling and type compatibility
4. [ ] Add comprehensive logging
5. [ ] Create proper API documentation
6. [ ] Optimize performance for high-throughput scenarios



================================================================
End of Codebase
================================================================
